O Amazon Simple Queue Service (SQS) é um serviço poderoso projetado para gerenciar as necessidades de mensagens de microsserviços robustos e desacoplados.
Ele se integra fortemente com o AWS Lambda, seu parceiro nativo, como um destino alvo. O SQS pode disparar uma função Lambda como seu alvo e enviar os itens de lote como entrada.
Entretanto, no lado da função Lambda, lidar com essa entrada pode ser complicado, especialmente quando erros e tentativas surgem.
Este é o primeiro artigo de uma série de três artigos sobre as melhores práticas do SQS.
Neste artigo, o primeiro de uma série de três postagens sobre as melhores práticas do SQS, você aprenderá como lidar com eficiência com lotes do Amazon SQS com o AWS Lambda Powertools para exemplos de código Python e AWS CDK.
Na segunda parte desta série, você aprenderá sobre como tentar novamente o processamento de itens em lote com tentativas seguras.
Na terceira parte da série, você aprenderá sobre as melhores práticas de fila de mensagens mortas e como lidar com falhas corretamente.
Todos os exemplos de código CDK podem ser encontrados aqui .
Todo o código Lambda pode ser encontrado aqui .
Índice
Introdução ao SQS
O Amazon Simple Queue Service (Amazon SQS) permite que você envie, armazene e receba mensagens entre componentes de software em qualquer volume, sem perder mensagens ou exigir que outros serviços estejam disponíveis - Documentação da AWS
O Amazon SQS é um dos pilares dos serviços sem servidor e arquiteturas assíncronas orientadas a eventos. É um serviço gerenciado pela AWS e, como tal, suporta altos volumes de dados, garante nenhuma perda de mensagem, dimensiona automaticamente e oferece filas padrão para rendimento máximo e filas FIFO para processamento exato de pedidos.
O SQS para Lambda tem sido meu padrão preferido e confiável há vários anos.
No entanto, a maneira como lido com lotes, tentativas e falhas evoluiu ao longo dos anos.
Neste post, você encontrará minha versão atual e a maneira mais recomendada de iterar um lote de itens, analisá-los e lidar com erros. Na minha opinião, é a maneira mais direta, que envolve menos código do que eu costumava lidar com isso.
Processamento em lote SQS
Implementaremos e escreveremos o padrão SQS para Lambda do zero, com código CDK para a parte de infraestrutura e o código comercial Lambda com recursos de manipulação em lote.
O SQS aciona nossa função Lambda com um evento SQS contendo itens de lote.
A função iterará cada item, processará e continuará para o próximo item.
Configuração de infraestrutura
Primeiro, vamos definir nosso SQS e conectá-lo a uma função Lambda, para que cada lote de itens seja enviado como entrada para a função. Usaremos o AWS CDK em Python:
Nas linhas 9 a 15, definimos o SQS.
Na linha 13, habilitamos a criptografia gerenciada pelo SQS para que tenhamos criptografia em repouso (segurança em primeiro lugar!).
Na linha 14, definimos o tempo limite de visibilidade, que deve ser igual ou maior que o tempo limite da função Lambda de destino. Para saber mais sobre esse parâmetro, siga este link .
Na linha 18, conectamos o SQS à função como uma fonte de evento.
Acesse meu repositório GitHub de exemplo para obter o código de criação da função Lambda e observe a função '_create_sqs_lambda'.
Entrada de evento SQS
A documentação oficial mostra um exemplo de evento SQS contendo vários registros, cada um contendo metadados sobre o registro e a carga útil do domínio de negócios com o qual nos importamos, passada no parâmetro 'body'.
Nesta postagem, assumiremos que recebemos pedidos de clientes como itens de lote SQS e os processaremos.
O payload de negócios no SQS é encontrado no parâmetro Record[x]['body] como uma string codificada JSON. Precisaremos serializá-lo JSON em um dicionário e analisá-lo na classe Order, que é um dicionário com uma chave 'item', que é um dicionário por si só:
e estas são as classes de esquema Pydantic que correspondem a esta entrada e à carga útil do SQS:
Para validação de entrada e serialização de carga útil, usaremos o utilitário analisador AWS Lambda Powertools e definiremos um novo esquema de entrada que estende o modelo de registro SQS para corresponder à nossa estrutura de carga útil de negócios.
'OrderSqsRecord' estende o modelo SQS do analisador (que contém todos os campos SQS), mas precisamos alterar o parâmetro 'body' para corresponder ao nosso esquema Order. Adicionamos o tipo JSON[] na linha 13 porque o corpo vem como uma string codificada JSON, e isso diz ao Pydantic para analisá-lo da string para um dicionário e então serializá-lo como uma classe de esquema 'Order'.
Se você deseja aprender mais sobre a validação de entrada do Lambda com o Pydantic e quais são as melhores práticas, confira minha postagem sobre isso aqui .
Manipulador Lambda
Agora que definimos a infraestrutura e os esquemas de entrada, podemos finalmente escrever o código de processamento em lote.
Usaremos o utilitário de lote do AWS Lambda Powertools. Processaremos cada item no lote e, caso uma exceção seja gerada, usaremos o recurso do utilitário de lote de marcar o item de lote específico como uma falha parcial para que ele seja retornado ao SQS.
O SQS tentará novamente aquele item específico e invocará a função Lambda novamente. Em muitos casos, essas múltiplas tentativas de repetição podem ser suficientes. Na parte dois da minha série de melhores práticas do SQS, vamos nos aprofundar nesse recurso e estender os recursos de repetição da função. E na parte três, adicionaremos uma fila de mensagens mortas e repetições automáticas para levar o tratamento de falhas para o próximo nível.
Vamos dar uma olhada no manipulador:
Na linha 12, inicializamos o processador de lote e colocamos a classe de esquema Pydantic que definimos e que é um lote SQS. O processador de lote suporta outros tipos de lote , como fluxos de dados Kinesis e fluxos DynamoDB.
Nas linhas 18-23, enviamos o evento para a função start do utilitário de lote do Powertools. Fornecemos a ele o processador, o evento e nosso manipulador de itens de lote. O manipulador de itens é nossa função lógica interna que manipulará cada item no lote.
Observe que mencionei que deveria ser uma função na camada lógica e, caso você queira aprender mais sobre como estruturar seu Lambda em camadas (manipulador, lógica e acesso a dados), não deixe de ler minha postagem sobre isso.
Todo o código Lambda pode ser encontrado aqui .
Implementação record_handler
Vamos dar uma olhada na implementação da função lógica interna de processamento de itens.
Nesta função, você escreve o tratamento de payload da sua lógica de negócios e processa um item de lote. Teoricamente, esta função pode levantar exceções e falhar na sua implementação.
Na próxima postagem do blog, aprenderemos como fazer novas tentativas automáticas e tornar essa função mais segura para novas tentativas.
Considerações Finais
O processamento em lote é um dos casos de uso mais básicos de qualquer aplicativo sem servidor.
É fácil errar. Os próximos dois posts de blog da série abordarão as melhores práticas de tentativas, falhas e filas de mensagens mortas.
Além disso, mal arranhamos a superfície dos recursos do utilitário em lote neste post.
Possui inúmeras funcionalidades, tais como:
Suporte SQS Fifo.
Suporte a manipuladores assíncronos
Classe de dados em vez de suporte Pydantic
Suporte para processador de lote "Traga seu próprio"
e muito mais!
Comments