Asynchronous pattern using Amazon SQS
HTTP (HyperText Transfer Protocol),the underlying protocol in the world of web is commonly used for synchronous communications. However there are some use cases which needs the activities to be done in asynchronous way. Message brokers can help to integrate systems using asynchronous pattern. In this post, let see how Amazon SQS services provided by AWS(Amazon Web Services) can be used to design the asynchronous pattern.
Synchronous pattern
Let’s see a simple example of synchronous communication using HTTP.
- Client sends request to server.
- Server persists the data from request into database.
- Until the time server process the request and persist the data, client waits for the response from server.
As we can see, the synchronous communication is simple and straight forward. But when there are heavy burst of requests, both client and server will have tremendous load. When server goes down with maximum load, there is a potential of message loss and requests getting timed out.
Asynchronous pattern
Let’s introduce Amazon SQS into the above synchronous pattern.
- Client sends request to server.
- Server put the message into SQS queue.
- The server returns the acknowledgement response(“Your request has been submitted successfully for processing”) to client immediately.
- The consumer applications then pulls the message from SQS queue to do further computation.
Why Amazon SQS?
Alright! After taking a quick look into both synchronous and asynchronous patterns, let’s acknowledge that having message queues into our designs will help for distributed architectures. But why to choose Amazon SQS in particular?
- Amazon SQS is a fully managed message queuing service.
- Easy to scale and highly distributed without any administrative overhead.
- Integrates seamlessly with Amazon Lambda. (If the entire architecture stack is decided to be hosted in AWS cloud offering)
- SQS operations can be performed via AWS console, SDK of choice, Command line interface.
SQS – Visibility Timeout
One of the important concept to consider when choosing SQS is visibility timeout.
- Visibility timeout is the period for which the messages will be hidden from consumers.
- Once the message is consumed for first time, the visibility timeout starts. Default is 30 seconds. Maximum is 12 hours.
- Consumers of message have to explicitly delete the message after receiving.
- It’s good to have visibility timeout period to be configured with value more than the time consumer needed for processing the message.
Amazon SQS – Examples
Let’s see some examples to create queue, put messages to queue, consume messages from queue using AWS SDK for Java.
1 2 3 4 5 6 7 8 |
/* Create New Queue */ public void createQueueWithName(String queueName){ AmazonSQS sqsClient = SQSClientProvider.getForProfile("sqs_demo"); CreateQueueRequest create_queue_request = new CreateQueueRequest() .withQueueName(queueName); sqsClient.createQueue(create_queue_request); System.out.println("Queue created with name :: "+queueName); } |
Send messages to Queue
1 2 3 4 5 6 7 8 |
/* Send messages to Queue */ public void sendMessageTo(String queueName){ AmazonSQS sqsClient = SQSClientProvider.getForProfile("sqs_demo"); String queueUrl = SQSUtil.getQueueUrlFor(queueName); String message = SQSUtil.getSimpleTextMessage(); sqsClient.sendMessage(queueUrl, message); System.out.println("Message sent :: "+message); } |
Send Batch messages to Queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
/* Send batch messages to Queue */ public void sendBatchMessages(String queueName){ AmazonSQS sqsClient = SQSClientProvider.getForProfile("sqs_demo"); String queueUrl = SQSUtil.getQueueUrlFor(queueName); int numOfMessages = 0; int numOfMessagesToAdd = 0; int maxMessages = MAX_MESSAGES; while(numOfMessages < maxMessages){ if((numOfMessages + BATCH_COUNT) < maxMessages){ numOfMessagesToAdd = BATCH_COUNT; }else{ numOfMessagesToAdd = maxMessages - numOfMessages; } List<SendMessageBatchRequestEntry> batchMessages = SQSUtil.getBatchMessages(numOfMessages, numOfMessagesToAdd); if(!batchMessages.isEmpty()){ SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(queueUrl, batchMessages); sqsClient.sendMessageBatch(sendMessageBatchRequest); numOfMessages = numOfMessages+numOfMessagesToAdd; System.out.println(numOfMessagesToAdd+" messages sent, Total messages in Queue: "+numOfMessages); } } } |
SQS client provider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class SQSClientProvider { private static AmazonSQS sqsClient = null; private SQSClientProvider(){ // Private constructor } public static AmazonSQS get(){ if(sqsClient == null){ sqsClient = AmazonSQSClientBuilder.standard() .withRegion(Regions.US_WEST_2).build(); } return sqsClient; } public static AmazonSQS getForProfile(String profile){ if(sqsClient == null){ sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(new ProfileCredentialsProvider(profile)) .withRegion(Regions.US_WEST_2).build(); } return sqsClient; } } |
Util methods to build input messages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public static String getSimpleTextMessage(){ return "{\"id\":1, \"data\": \"Simple Text Message\" }"; } public static List<SendMessageBatchRequestEntry> getBatchMessages(int startIndex, int batchCount){ List<SendMessageBatchRequestEntry> batchMessageEntries = new ArrayList<>(); SendMessageBatchRequestEntry batchMessage = null; String messageBody = null; for(int id=startIndex+1; id<=(startIndex+batchCount); id++){ messageBody = "{\"id\": "+id+", \"data\": \"Batch Message number "+id+"\" }"; batchMessage = new SendMessageBatchRequestEntry("id"+Integer.toString(id), messageBody); batchMessageEntries.add(batchMessage); } return batchMessageEntries; } |
Consume messages from Queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/* Receive messages from Queue */ public void receiveMessagesFrom(String queueName){ AmazonSQS sqsClient = SQSClientProvider.getForProfile("sqs_demo"); String queueUrl = SQSUtil.getQueueUrlFor(queueName); System.out.println("Start - receive messages"); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(3); ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); List<Message> messages = receiveMessageResult.getMessages(); if(messages.isEmpty()){ System.out.println("No messages available"); } for(Message msg: messages){ System.out.println(msg.getBody()); // From here message can be sent further to external systems } } |
Consume multiple messages from Queue. Poll for messages in a loop as a standalone consumer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
/* Receive multiple messages from Queue */ public void receiveBatchMessages(String queueName){ AmazonSQS sqsClient = SQSClientProvider.getForProfile("sqs_demo"); String queueUrl = SQSUtil.getQueueUrlFor(queueName); System.out.println("Start - receive messages"); int pollcount = 1; while(true){ System.out.println("\n*************Polling attempt************* on "+new Date()+"\n"); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(2) .withWaitTimeSeconds(5); ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); List<Message> messages = receiveMessageResult.getMessages(); if(messages.isEmpty()){ System.out.println("No messages available"); }else{ for(Message msg: messages){ // From here message can be sent further to external systems // Or persist messages to database // But the messages needs to be deleted after consumption // System.out.println(msg.getBody()); DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, msg.getReceiptHandle()); sqsClient.deleteMessage(deleteMessageRequest); System.out.println(msg.getBody() +" (Processed and deleted)"); } } } } |
Amazon Lambda as event listener for SQS Queues
- SQS event can be made as a trigger for Amazon Lambda.
- Based on the messages coming into SQS, Lambda can increase or decrease polling frequency.
- Very helpful in reducing usage cost of SQS since we don’t have to keep on polling the queue all the time.
If Lambda is configured as event listener for SQS queues, then the event can be handled as below:
To illustrate sending error messages to Dead Letter Queue(DLQ), message with id=3 is moved to DLQ.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
/** * Handles a Lambda Function request for SQS event * * @param input The Lambda Function input * @param context The Lambda execution environment context object. * @return The Lambda Function output */ @Override public Void handleRequest(SQSEvent input, Context context) { System.out.println("SQS event message ::"+input); System.out.println("\n*************Receiving messages in Lambda event*************\n"); for(SQSEvent.SQSMessage msg : input.getRecords()){ String messageBody = msg.getBody(); // Parse message object ObjectMapper objectMapper = new ObjectMapper(); try { Message messageObj = objectMapper.readValue(messageBody, Message.class); System.out.println("Message number ==> "+messageObj.getId()); if(messageObj.getId() == 3){ // For Test, if messageId == 3 move it to DLQ moveToDLQ(messageBody, msg.getReceiptHandle()); }else{ System.out.println(messageBody +" (processed)"); } } catch (IOException e) { // Move the exceptional messages to DLQ System.out.println("Exception while processing..moving to DLQ"); moveToDLQ(messageBody, msg.getReceiptHandle()); } } return null; } private void moveToDLQ(String messageBody, String receiptHandle ){ System.out.println("Sending message to DLQ"); SQSClientProvider.get().sendMessage("SQS_INPUT_DLQ",messageBody); DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(SQSUtil.getQueueUrlFor("SQS_INPUTQ"), receiptHandle); SQSClientProvider.get().deleteMessage(deleteMessageRequest); } |
Dead Letter Queue(DLQ)
DLQ is yet another SQS queue. It can be configured to receive messages which are not consumed after certain attempts or to send any error messages explicitly.
When creating SQS queue, we can configure it’s DLQ as below:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public static CreateQueueRequest configureDLQ(CreateQueueRequest create_queue_request, String dlqName){ String queueUrl = getQueueUrlFor(dlqName); GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(queueUrl) .withAttributeNames("QueueArn"); GetQueueAttributesResult queueAttributesResult = SQSClientProvider.getForProfile("sqs_demo").getQueueAttributes(getQueueAttributesRequest); String dlqARN = ""; if(queueAttributesResult.getAttributes().keySet().contains("QueueArn")){ dlqARN = queueAttributesResult.getAttributes().get("QueueArn"); } String redrivePolicyVal = "{\"deadLetterTargetArn\":\"".concat(dlqARN).concat("\",\"maxReceiveCount\":20}"); create_queue_request.addAttributesEntry("RedrivePolicy",redrivePolicyVal); return create_queue_request; } |
Alternatives to SQS
Other message brokers as an alternative to SQS are:
- Amazon MQ – Managed service provided by AWS based on Active MQ
- Apache Active MQ – Open source alternative
- Rabbit MQ – Open source alternative
- IBM MQ
Conclusion
We have looked into the details of SQS service from Amazon offering. SQS service helps to build a scalable message queueing architectures. But that’s not the only option. If our architecture stack is not based on AWS, then open source alternatives like Active MQ and Rabbit MQ provides a reliable messaging solution.
Downloads
The source code examples can be downloaded from below location:
0 Comments