top of page
  • Writer's pictureRan Isenberg

Effective Amazon SQS Batch Handling with AWS Lambda Powertools

Updated: Aug 15, 2023


Photo by cottonbro studio: https://www.pexels.com/photo/silver-and-black-coffee-cups-5532672/
Photo by cottonbro studio: https://www.pexels.com/photo/silver-and-black-coffee-cups-5532672/

Amazon Simple Queue Service (SQS) is a powerful service designed to manage the messaging needs of robust, decoupled microservices.

It strongly integrates with AWS Lambda, its native partner, as a target destination. SQS can trigger a Lambda function as its target and send the batch items as input.

However, on the Lambda function side, handling this input can get tricky, especially when errors and retries come into mind.

This is the first article in three-series articles about SQS best practices.

In this article, the first in the series of three SQS best practices posts, you will learn how to efficiently handle Amazon SQS batches with AWS Lambda Powertools for Python and AWS CDK code examples.

In the second part of this series, you will learn about retrying batch item processing with safe retries.

In the third part of the series, you will learn about dead letter queue best practices and how to handle failures correctly.


All CDK code examples can be found here.

All Lambda code can be found here.

 

Table of Contents

 

SQS Introduction

Amazon Simple Queue Service (Amazon SQS) lets you send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available - AWS docs

Amazon SQS is one of the cornerstones of serverless services and asynchronous event-driven architectures. It is an AWS-managed service, and as such, it supports high volumes of data, guarantees no message loss, scales automatically, and offers both standard queues for maximum throughput and FIFO queues for exact order processing.

The SQS to Lambda has been my go-to and trusted pattern for several years.

However, the way I handle the batches, retries, and failures has evolved over the years.

In this post, you will find my current version and the most recommended way of iterating a batch of items, parsing them, and handling errors. In my eyes, it's the most straightforward way, which involves less code than how I used to handle it.

 

SQS Batch Processing

We will implement and write the SQS to Lambda pattern from scratch, with CDK code for the infrastructure part and the Lambda business code with batch-handling capabilities.

The SQS triggers our Lambda function with an SQS event containing batch items.

The function will iterate each item, process it and continue to the next item.


Infrastructure Setup

First, let's define our SQS and connect it to a Lambda function, so every batch of items is sent as input to the function. We'll use AWS CDK in Python:

In lines 9 to 15, we define the SQS.

In line 13, we enable the SQS-managed encryption so we'd have encryption at rest (security first!).

In line 14, we define the visibility timeout, which should be equal to or larger than the target Lambda function's timeout. To learn more about this parameter, follow this link.

In line 18, we connect the SQS to the function as an event source.

Head to my sample GitHub repository for the Lambda function creation code and look at the '_create_sqs_lambda' function.


SQS Event Input

The official documentation shows a sample SQS event containing multiple records, each containing metadata about the record and the business domain payload we care about passes in the 'body' parameter.

In this post, we will assume we get orders from customers as SQS batch items and process them.

The business payload in SQS is found at the Record[x]['body] parameter as a JSON encoded string. We will need to JSON serialize it into a dictionary and parse it into the Order class, which is a dictionary with a key 'item', which is a dictionary by itself:

and this is the Pydantic schema classes that matches this input and SQS payload:

For input validation and payload serialization, we will use AWS Lambda Powertools parser utility and define a new input schema that extends the SQS record model to match our business payload structure.

'OrderSqsRecord' extends the parser's SQS model (which contains all the SQS fields), but we need to change the 'body' parameter to match our Order schema. We add the JSON[] type in line 13 because the body comes as a JSON encoded string, and this tells Pydantic to parse it from the string into a dictionary and then serialize it as an 'Order' schema class.

If you wish to learn more about Lambda input validation with Pydantic and what are the best practices, check out my post about it here.

 

Lambda Handler

Now that we have set the infrastructure and input schemas, we can finally write the batch-processing code.

We will use AWS Lambda Powertools' batch utility. We will process each item in the batch, and in case an exception is raised, we will use the batch utility's feature of marking the specific batch item as a partial failure so it is returned to the SQS.

SQS will retry that specific item again and invoke the Lambda function again. In many cases, these multiple retry attempts might be enough. In part two of my SQS best practices series, we will dive deeply into this feature and extend the function's retry capabilities. And in part three, we will add a dead letter queue and automatic retries to take failure handling to the next level.


Let's take a look at the handler:

In line 12, we initialize the batch processor and put the Pydantic schema class we defined and that it's an SQS batch. The batch processor supports other batch types, such as Kinesis data streams and DynamoDB streams.

In lines 18-23, we send the event to the Powertools' batch utility start function. We provide it with the processor, the event, and our batch item handler. The item handler is our inner logic function that will handle each item in the batch.

Notice that I've mentioned that it should be a function in the logic layer, and in case you want to learn more about structuring your Lambda into layers (handler, logic, and data access), be sure to read my post about it.


All Lambda code can be found here.

 

record_handler Implementation

Let's take a look at the implementation of the item processing inner logic function.

In this function, you write your business logic payload handling and process a batch item. Theoretically, this function can raise exceptions and fail in your implementation.

In the next blog post, we will learn how to make automatic retries and turn this function into a safer one to retry.

 

Closing Thoughts

Batch processing is one of any serverless application's most basic use cases.

It's easy to get it wrong. The next two blog posts in the series will tackle retries, failures and dead letter queues best practices.

In addition, we've barely scratched the surface of the batch utility capabilities in this post.

It has numerous features, such as:

  1. SQS Fifo support.

  2. Async handlers support

  3. Data class instead of Pydantic support

  4. "Bring your own" batch processor support

  5. and more!








Comments


bottom of page