top of page
  • Foto del escritorRan Isenberg

Manejo eficaz de lotes de Amazon SQS con AWS Lambda Powertools


Fotografía de estudio Cottonbro: https://www.pexels.com/photo/tazas-de-cafe-plateadas-y-negras-5532672/
Fotografía de estudio Cottonbro: https://www.pexels.com/photo/tazas-de-cafe-plateadas-y-negras-5532672/

Amazon Simple Queue Service (SQS) es un poderoso servicio diseñado para administrar las necesidades de mensajería de microservicios robustos y desacoplados.

Se integra perfectamente con AWS Lambda, su socio nativo, como destino. SQS puede activar una función Lambda como destino y enviar los elementos del lote como entrada.

Sin embargo, en el lado de la función Lambda, manejar esta entrada puede resultar complicado, especialmente cuando se piensan errores y reintentos.

Este es el primer artículo de una serie de tres artículos sobre las mejores prácticas de SQS.

En este artículo, el primero de una serie de tres publicaciones sobre prácticas recomendadas de SQS, aprenderá a gestionar de manera eficiente lotes de Amazon SQS con ejemplos de código de AWS Lambda Powertools para Python y AWS CDK.

En la segunda parte de esta serie, aprenderá cómo reintentar el procesamiento de artículos por lotes con reintentos seguros.

En la tercera parte de la serie, aprenderá sobre las mejores prácticas para las colas de mensajes inactivos y cómo manejar las fallas correctamente.


Todos los ejemplos de código CDK se pueden encontrar aquí .

Todo el código Lambda se puede encontrar aquí .

 

Tabla de contenido

 

Introducción a SQS

Amazon Simple Queue Service (Amazon SQS) le permite enviar, almacenar y recibir mensajes entre componentes de software en cualquier volumen, sin perder mensajes ni requerir que otros servicios estén disponibles - Documentación de AWS

Amazon SQS es una de las piedras angulares de los servicios sin servidor y las arquitecturas asincrónicas basadas en eventos. Es un servicio administrado por AWS y, como tal, admite grandes volúmenes de datos, garantiza que no se pierdan mensajes, se escala automáticamente y ofrece colas estándar para obtener el máximo rendimiento y colas FIFO para procesar pedidos con precisión.

El patrón SQS a Lambda ha sido mi patrón preferido y de confianza durante varios años.

Sin embargo, la forma en que manejo los lotes, los reintentos y los fallos ha evolucionado a lo largo de los años.

En esta publicación, encontrará mi versión actual y la forma más recomendada de iterar un lote de elementos, analizarlos y gestionar los errores. En mi opinión, es la forma más sencilla y que implica menos código que la forma en que solía gestionarlo.

 

Procesamiento por lotes de SQS

Implementaremos y escribiremos el patrón SQS a Lambda desde cero, con el código CDK para la parte de infraestructura y el código comercial Lambda con capacidades de manejo de lotes.

SQS activa nuestra función Lambda con un evento SQS que contiene elementos del lote.

La función iterará cada elemento, lo procesará y continuará con el siguiente elemento.


Configuración de la infraestructura

Primero, definamos nuestro SQS y conectémoslo a una función Lambda, de modo que cada lote de elementos se envíe como entrada a la función. Usaremos AWS CDK en Python:

En las líneas 9 a 15, definimos el SQS.

En la línea 13, habilitamos el cifrado administrado por SQS para que tengamos cifrado en reposo (¡la seguridad es lo primero!).

En la línea 14, definimos el tiempo de espera de visibilidad, que debe ser igual o mayor que el tiempo de espera de la función Lambda de destino. Para obtener más información sobre este parámetro, siga este enlace .

En la línea 18, conectamos el SQS a la función como una fuente de evento.

Dirígete a mi repositorio de muestra de GitHub para obtener el código de creación de la función Lambda y observa la función '_create_sqs_lambda'.


Entrada de evento SQS

La documentación oficial muestra un evento SQS de muestra que contiene múltiples registros, cada uno con metadatos sobre el registro y la carga útil del dominio comercial que nos interesa pasa en el parámetro 'cuerpo'.

En esta publicación, asumiremos que recibimos pedidos de los clientes como artículos de lotes de SQS y los procesamos.

La carga útil comercial en SQS se encuentra en el parámetro Record[x]['body] como una cadena codificada en JSON. Necesitaremos serializarla en JSON en un diccionario y analizarla en la clase Order, que es un diccionario con una clave 'item', que es un diccionario en sí misma:

y estas son las clases del esquema Pydantic que coinciden con esta entrada y la carga útil de SQS:

Para la validación de entrada y la serialización de la carga útil, utilizaremos la utilidad de análisis de AWS Lambda Powertools y definiremos un nuevo esquema de entrada que amplíe el modelo de registro SQS para que coincida con nuestra estructura de carga útil comercial.

'OrderSqsRecord' extiende el modelo SQS del analizador (que contiene todos los campos SQS), pero necesitamos cambiar el parámetro 'body' para que coincida con nuestro esquema de pedido. Agregamos el tipo JSON[] en la línea 13 porque el cuerpo viene como una cadena codificada en JSON, y esto le indica a Pydantic que lo analice desde la cadena hasta un diccionario y luego lo serialice como una clase de esquema 'Order'.

Si desea obtener más información sobre la validación de entrada Lambda con Pydantic y cuáles son las mejores prácticas, consulte mi publicación al respecto aquí .

 

Manejador Lambda

Ahora que hemos configurado la infraestructura y los esquemas de entrada, finalmente podemos escribir el código de procesamiento por lotes.

Utilizaremos la utilidad de procesamiento por lotes de AWS Lambda Powertools. Procesaremos cada elemento del lote y, en caso de que se genere una excepción, utilizaremos la función de la utilidad de procesamiento por lotes para marcar el elemento del lote específico como una falla parcial, de modo que se devuelva al SQS.

SQS volverá a intentar ese elemento específico e invocará la función Lambda nuevamente. En muchos casos, estos múltiples intentos de reintento pueden ser suficientes. En la segunda parte de mi serie de prácticas recomendadas de SQS, analizaremos en profundidad esta función y ampliaremos las capacidades de reintento de la función. Y en la tercera parte, agregaremos una cola de mensajes fallidos y reintentos automáticos para llevar el manejo de errores al siguiente nivel.


Echemos un vistazo al controlador:

En la línea 12, inicializamos el procesador por lotes y ponemos la clase de esquema de Pydantic que definimos y que es un lote de SQS. El procesador por lotes admite otros tipos de lotes , como los flujos de datos de Kinesis y los flujos de DynamoDB.

En las líneas 18 a 23, enviamos el evento a la función de inicio de la utilidad de procesamiento por lotes de Powertools. Le proporcionamos el procesador, el evento y nuestro controlador de elementos del lote. El controlador de elementos es nuestra función lógica interna que manejará cada elemento del lote.

Tenga en cuenta que mencioné que debería ser una función en la capa lógica y, en caso de que desee obtener más información sobre cómo estructurar su Lambda en capas (controlador, lógica y acceso a datos), asegúrese de leer mi publicación al respecto.


Todo el código Lambda se puede encontrar aquí .

 

Implementación de record_handler

Echemos un vistazo a la implementación de la función lógica interna de procesamiento de elementos.

En esta función, escribes la lógica empresarial que maneja la carga útil y procesas un elemento por lotes. En teoría, esta función puede generar excepciones y fallar en tu implementación.

En la próxima publicación del blog, aprenderemos cómo realizar reintentos automáticos y convertir esta función en una más segura para reintentar.

 

Reflexiones finales

El procesamiento por lotes es uno de los casos de uso más básicos de cualquier aplicación sin servidor.

Es fácil equivocarse. Las dos próximas publicaciones de la serie abordarán las mejores prácticas en materia de reintentos, fallos y colas de mensajes no entregados.

Además, apenas hemos arañado la superficie de las capacidades de la utilidad por lotes en esta publicación.

Tiene numerosas características, tales como:

  1. Soporte para SQS Fifo.

  2. Compatibilidad con controladores asincrónicos

  3. Clase de datos en lugar de soporte de Pydantic

  4. Soporte para procesadores por lotes "Trae tu propio"

  5. ¡y mucho más!








Comentarios


bottom of page