AWS SQS is a Simple Queue Service. It provides REST based endpoints to interact, so it can be used with almost any programming language. NodeJS is getting popular day by day and its non-blocking asynchronous request handling makes it a best choice for web based applications. We will write a simple NodeJs Consumer for AWS SQS.
You can read more about AWS SQS and its benefits on the official Page. However there are few basic things about AWS SQS which I will list here so that we have an understanding about how it works:
- You can create multiple queues on AWS SQS service
- Every queue will have its URL to which you can send HTTP requests to connect, to read messages and to delete messages
- You can fetch single or multiple messages, however there is a limit of 10 messages(as of today when I am writing the post, it can change in future. Refer the official documentation for more details)
- When you fetch a message from SQS queue you can specify how long this message is not available to any other consumer or any other fetch request. This time can be the maximum time your consumer will take to process a message.
- When consumer is done processing the message, consumer has to make another HTTP request to delete the message from queue
- If the message is not deleted and the time specified to make the message unavailable to other consumer is over, the message will be available to any other consumer or the next fetch call
Based on the above understanding of AWS SQS we need to do the following things:
- Connect to SQS
- Fetch a message and provide the “message un available time”
- Process the message and then delete the message from the queue
Before we dive into the real code there are few things you need to know. I am a huge huge huge Docker fan so everything we are going to discuss and develop in this post will be within Docker containers, so you need to have some understanding of Docker and Docker-compose. There are tons of articles available online detailing the benefits of using Docker containers.
Development Environment for NodeJs Consumer for AWS SQS
We will use NodeJs based consumer to interact with our AWS SQS, however not every team has the luxury to have AWS SQS queues available for development, so we will use elasticmq. Elasticmq provides the same interface as AWS SQS, so our code will work with real AWS SQS without any change.
Following is the architecture diagram of our setup.
SQS Docker Container
The awesome developers at softwaremill has already created a docker image of elasticmq which we can use.
Create folder named “docker” in your development folder. We will use this folder for managing files related to all the supporting docker containers. In this “docker” directory create a folder named “elasticmq”. Within the “elasticmq” folder we will create two files, Dockerfile and elasticmq.conf.
Dockerfile
In the Dockerfile we will import the elasticmq image from docker hub and we will add a config file to configure our queue name. Following is how our Dockerfile will look:
# get latest ElasticMQ image
FROM softwaremill/elasticmq:latest
MAINTAINER Rajinder Deol <[email protected]>
#copy config file
ADD elasticmq.conf /opt/elasticmq.conf
Elasicmq.conf
In this file we will configure elasticmq to listen on port 9324 and provide the name of our queue. We can create multiple queues if we want. For development you can keep rest of the settings unchanged and just add the queue names which you want to create. I am creating two queues “MY_SQS_QUEUE” and “MY_SQS_QUEUE_2”. Our elasticmq.conf will look like :
include classpath("application.conf")
// node-address.host = "*" means the server will take "Host" header of incoming
// requests to generate queue URLs.
node-address {
protocol = http
host = "*"
port = 9324
context-path = ""
}
rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
// Possible values: relaxed, strict
sqs-limits = relaxed
}
queues {
MY_SQS_QUEUE {}
MY_SQS_QUEUE_2 {}
}
Our SQS container is done. When we will build and run the SQS container. It will have two queues pre created which we can use to push and retrieve messages. Now we will work on our nodejs consumer which will interact with our SQS
NodeJs Docker Container
To work with NodeJs we need to have npm so that we can manage our project dependencies, however we will not install npm on our development machine but we will use nodeJs docker container to run npm commands.
npm.sh
Lets create a file named npm.sh and wrap our docker run command into it. Following will be the code in this file:
#!/bin/bash docker run --rm \ --name=node-$ \ -v $(pwd):/usr/src/app \ -w "/usr/src/app" \ -e http_proxy=$DOCKER_LOCAL_PROXY \ -e https_proxy=$DOCKER_LOCAL_PROXY \ -e HTTP_PROXY=$DOCKER_LOCAL_PROXY \ -e HTTPS_PROXY=$DOCKER_LOCAL_PROXY \ node:8 ${*}
If you notice we are using node:8 docker image from Docker hub. This is the official nodeJs image managed by the nodeJs team. Follwing things are happening in this file:
- We are running nodeJs version 8 docker container from docker hub. To know more about how to use this image read docker hub page
- We have mapped our current development directory to “/usr/src/app” directory of the container. So the running container will have all the files in our development directory available in “/usr/sec/app” folder. This is the default app directory of nodeJs Docker container
- We have setup our working directory inside the container to “/usr/src/app” so that we can run npm commands on the code available in “/usr/src/app” folder
- We have setup few environment variable for http and https proxies, you can ignore them, they are required if you are working behind a corporate proxy.
- Then we are passing all the arguments coming to this file as arguments to nodeJs container
Make this file executable by running following command (linux specific, you can change it according to your OS):
chmod +x npm.sh
After that run the following command to check if the script file is working
sh npm.sh npm -v
This command should output the version of npm
Node Project
Now we will do some real nodeJs programming. Lets create our package.json file and add our dependencies. We will use the following node packages:
- nodemon : We will use this package for hot-reloads while development
- aws-sdk : this is required for communicating with AWS SQS
- sqs-consumer : we are using this package to handle all our communication with AWS SQS. This package has all the code pre-written which is required for communication with AWS SQS
- winston : This is an optional package. I use it to log messages. It is always a best practice to use a log aggregator instead of direct use of console.log
Lets create our package.json file and add all our dependencies. Our file will look like:
{
"name": "node_sqs_consumer",
"version": "1.0.0",
"description": "Simple AWS SQS consumer",
"author": "Rajinder Deol",
"main": "index.js",
"engines": {
"node": "8.x"
},
"scripts": {
"start": "node index.js",
"start:dev": "nodemon index.js",
"write": "node write_message.js"
},
"dependencies": {
"aws-sdk": "^2.299.0",
"sqs-consumer": "^3.8.0",
"winston": "^3.0.0"
},
"devDependencies": {
"nodemon": "^1.18.4"
}
}
If you notice in the “scripts” section in our package.json file, we have two scripts index.js and write_message.js, we will create these files shortly. Now if you run the following command it should download all the dependencies into your development folder
sh npm.sh npm install
After downloading the node packages required for this project lets write our application. Create a folder named “services” in the development folder. This folder will have all our files related to initiating our project. Lets create our first file in “services” directory named sqs.js. This file will have the code required for initiating aws-sdk and getting the object required for communication with AWS SQS. Following will be the code in this file:
const AWS = require('aws-sdk');
module.exports = (accessKeyId, secretAccessKey, region, apiVersion) => {
AWS.config = new AWS.Config({
accessKeyId,
secretAccessKey,
region
});
return new AWS.SQS({ apiVersion });
};
We also need to configure our winston logger. There are plenty of options available however we will use basic configuration. Create “logger.js” file and add the following code:
const { createLogger, format, transports } = require('winston');
const { combine, timestamp } = format;
const logger = createLogger({
level: 'info',
format: combine(format.json(), format.colorize(), timestamp()),
transports: [
new transports.Console({
format: combine(format.splat(), format.colorize(), timestamp())
})
],
exceptionHandlers: [
new transports.Console({
format: combine(format.splat(), format.colorize(), timestamp())
})
]
});
module.exports = logger;
Now we will create our handler. Handler is basically our javascript object which will receive the message from the SQS and process it. You can write your business logic in this function, however we will just receive the message and log it in logger. Create “handler.js” file in “services” directory and add the following code:
const logger = require('/usr/src/app/services/logger');
function handleMessage(message, done) {
// log the received message
logger.info(message.Body);
// call the done function this will delete the message from the queue
done();
}
If you notice, when the handler is called it will receive the message as well as a callback function. When we finish processing the message we need to call the callback function and it will delete the message from SQS.
Now we will write our index.js file. This file will initiate all the services and finally run our consumer. As a best practice I use environment variables for configuration so that I can easily deploy the application on different environments without changing the actual code. We will create the following environment variables:
- SQS_ACCESS_KEY: AWS SQS access key
- SQS_SECRET_KEY: AWS SQS secret key
- SQS_REGION: AWS SQS region
- SQS_QUEUE_NAME: AWS SQS name
- SQS_QUEUE_URL: AWS SQS url
- SQS_VISIBILITY_TIMEOUT: Time in seconds, which we provide with get request, during which message will not be available for any other request. For more info read the AWS docs
- SQS_API_VERSION: AWS SQS API version
- TZ: This is a nodeJs environment variable, as a best practice we should set this to get proper timestamps in our logs
We will use these variables and initiate our consumers. Our index.js will have the following code
const Consumer = require('sqs-consumer');
// initiate AWS SQS SDK
const sqs = require('/usr/src/app/services/sqs')(
process.env.SQS_ACCESS_KEY,
process.env.SQS_SECRET_KEY,
process.env.SQS_REGION,
process.env.SQS_API_VERSION
);
// initiate logger
const logger = require('/usr/src/app/services/logger');
// initiate Handler
const handleMessage = require('/usr/src/app/services/handler');
//initiate sqs-consumer
const app = Consumer.create({
sqs: sqs,
queueUrl: process.env.SQS_QUEUE_URL,
visibilityTimeout: process.env.SQS_VISIBILITY_TIMEOUT,
terminateVisibilityTimeout: true,
attributeNames: ['All'],
handleMessage
});
// attach event handlers to the sqs-consumer to log messages and track if there is any error
app.on('error', err => {
logger.error('Error in Team data aggregator Consumer : ' + err.message);
// stop the polling
app.stop();
});
app.on('message_received', message => {
logger.info(
'message received from queue : ' + message.Body
);
});
app.on('message_processed', message => {
logger.info('message processed : ' + message.Body);
});
app.on('empty', () => {
logger.info('Queue is Empty');
});
app.on('stopped', () => {
logger.info('Consumer has stopped');
process.exit(1);
});
logger.info('Starting the consumer');
app.start();
Consumer will poll the queue every second and process the message received. Lets write our Dockerfile to build and run our nodejs consumer.
Dockerfile
Create a file named “Dockerfile” in the development folder and add the following code:
# get the official nodejs container image from docker hub
FROM node:8
# set app work directory
WORKDIR /usr/src/app
#copy all files
COPY . .
CMD [ "npm", "start" ]
Both our SQS and nodejs consumer containers are now done. Lets create out “docker-compose.yaml” so that we can connect them and run them together
docker-compose.yaml
We will create two services in our docker-compose.yaml file. One for the sqs container and other for the nodejs consumer container. We will also create the required environment variables for the respective containers. Our “docker-compose.yaml” file will have the following code:
version: '3'
services:
# ElasticMQ to run SQS like interface on local for development
# https://github.com/adamw/elasticmq
# On production AWS SQS will be used
sqs:
build: docker/elasticmq
ports:
- "9324:9324"
my-sqs-consumer:
build: .
command: [ "npm", "run", "start:dev" ]
restart: always
depends_on:
- sqs
links:
- sqs:sqs
volumes:
- .:/usr/src/app
environment:
- SQS_ACCESS_KEY=asdf
- SQS_SECRET_KEY=asdf
- SQS_REGION=elasticmq
- SQS_QUEUE_NAME=MY_SQS_QUEUE
- SQS_QUEUE_URL=http://sqs:9324/queue/MY_SQS_QUEUE
- SQS_VISIBILITY_TIMEOUT=100
- SQS_API_VERSION=2012-11-05
- TZ=Australia/Sydney
If you notice in the docker-compose.yaml file we have used “links” and “depends_on” tags to link the two containers. Now lets build and run our containers.Run the following command to pull the docker images and build your containers
docker-compose pull
docker-compose build --force-rm
Run the following command to run all your containers in the docker-compose.yaml file
docker-compose up
After this command you should be able to see the log messages on the terminal and your consumer will be polling the queue for messages. Now we will work on writing messages to the queue.
Write messages to the SQS
To write messages to the SQS we will use AWS SQS sdk. If you have noticed, in our package.json file under “scripts” section we have added “write” key so that we can write a script to write messages to SQS. Lets create a new file named “write_message.js” in our development directory and add the following code:
// add our logger
const logger = require('/usr/src/app/services/logger');
// initiate our AWS SQS sdk
const sqs = require('/usr/src/app/services/sqs')(
process.env.SQS_ACCESS_KEY,
process.env.SQS_SECRET_KEY,
process.env.SQS_REGION,
process.env.SQS_API_VERSION
);
// Prepare a default test message, this message will be pushed if no message is supplied as an argument when calling this file
let messageBody = 'Test Message created at - ' + new Date().toISOString();
// if there is any message supplied in command line when calling this file, overwrite the test message
if (process.argv[2]) {
messageBody = process.argv[2];
}
// SQS params
let sqsParams = {
QueueUrl: process.env.SQS_QUEUE_URL,
MessageBody: messageBody
};
// finally push the message to SQS
sqs.sendMessage(sqsParams, function(err, data) {
if (err) {
logger.error('Error while writing message to queue', err);
} else {
logger.info('Message written Successfully to queue : ' + data.MessageId);
}
});
Now we will use docker-compose run command to run this script and push a default message to our queue. To run this script we need to know the name of the container we gave to our nodejs consumer i.e. my-sqs-consumer
. Run the following command and it will post a default message to your queue.
docker-compose run --rm my-sqs-consumer npm run write
To write a custom message pass the string with “–” to the command. For example :
docker-compose run --rm my-sqs-consumer npm run write -- '{"first_name":"Raj","last_name":"deol","love_to":"travel"}'
Your consumer should receive the message and log in on your command prompt. Full source code of for this post is available on my github repo here. Please feel free to ask any question in the comment section below.
3 Responses to “NodeJS Consumer for AWS SQS using Docker”
November 13, 2018
AMAN GARGit is simply awesome. I have used AWS SQS, it is very impressive and efficient.
And Docker is very hot in the market, it reduces the Dev Effort and provide us lots of environment at the same time. It also resolves the problem of different running environment.
Now I am looking forward to Node Js.
November 13, 2018
Rajinder DeolGood going Aman, let me know if you need any help.
November 5, 2020
Tony StarkThank you so much for the effort put into this tutorial.
This is exactly what I needed to add queues to my project, works like a charm!
Keep up the good work!