top of page

Baixe grátis nossos e-books

Explore o mundo da Engenharia de Software e Data de forma mais eficiente e acessível com nossos eBooks!

  • Foto do escritorJP

Como criar um Kinesis Firehose Data Transformation com Terraform e Java


Kinesis firehose

Introdução


AWS fornece diferentes maneiras de transformar dados por meio de seus serviços e um dos meus favoritos é o Kinesis Firehose Data Transformation. Essa é uma estratégia para transformar dados aproveitando o serviço de Streaming na entregar dos dados.


Para este tutorial, vamos ensinar em como criar um Kinesis Firehose Data Transformation baseado na estratégia abaixo. O Kinesis Firehose enviará dados e, em vez de escrever no S3, invocará um Lambda para transformar esses dados e, em seguida, enviá-los de volta ao Kinesis Firehose, que entregará os mesmos dados ao S3.


Kinesis Firehose Data Transformation
Kinesis Firehose Data Transformation

Criando o projeto


Para este post usaremos Java como linguagem e Maven como gerenciador de dependências. Portanto, é necessário gerar um projeto Maven que criará a estrutura do nosso projeto.


Se você não sabe como gerar um projeto Maven, recomendo ler o post Gerando um projeto Maven sem IDE em 2 minutos, onde mostro como gerá-lo.


Estrutura do projeto

Kinesis Firehose Data Transformation

Após gerar o projeto Maven e importá-lo para sua IDE, vamos criar os mesmos arquivos e pacotes conforme imagem ao lado, exceto o pom.xml que foi criado pelo gerador maven.


Dentro da pasta java, crie um pacote chamado coffee.tips.lambda e crie também uma classe Java chamada Handler.java dentro deste mesmo pacote.


Agora, crie um pacote chamado model dentro do pacote coffee.tips e então, crie duas classes java:

  1. Record.java

  2. Records.java

Por fim, crie um novo pacote chamado status e também crie um enum chamada Status.


Criando a classe Record

Por que precisamos da classe Record?


Kinesis Firehose espera um objeto como retorno contendo os campos acima. Isso acontece quando o Kinesis Firehose invoca o Lambda para transformar os dados e o mesmo Lambda deve retornar um objeto contendo esses campos preenchidos.


recordId

Este valor de campo deve conter o mesmo id vindo do Kinesis


result

Esse valor de campo controla o resultado do status da transformação. Os valores possíveis são:

  • Ok: Registro transformado com sucesso.

  • Dropped: O registro rejeitado intencionalmente de acordo com sua lógica de processamento.

  • ProcessingFailed: Os dados não puderam ser transformados.

data

Este campo deve conter os dados após serem convertidos usando o algoritmo de conversão base64.


Este modelo deve conter os seguintes parâmetros. Caso contrário, o Kinesis Firehose rejeita e define como falha na transformação.


Criando a classe Records

A classe Records será nossa classe Java onde será utilizada como retorno na classe Handler.java onde abordaremos mais a frente, contendo uma lista do tipo Record.


Criando o Enum Status


Decidi criar o Enum acima apenas para criar um código mais elegante, mas é útil quando precisamos mapear valores diferentes para um contexto específico.


Criando a classe Handler


A classe Handler será nosso controlador para o Lambda. Este Lambda será invocado pelo Kinesis Firehose passando alguns parâmetros contendo os dados a serem transformados.


Observe que, para o método handleRequest, um parâmetro chamado input do tipo KinesisFirehoseEvent contém os registros enviados pelo Kinesis Firehose e o mesmo método retornará um objeto do tipo Records contendo uma lista de registros que posteriormente serão enviados de volta ao Kinesis Firerose entregando ao S3.


Dentro da iteração usando o Java Stream, criamos algumas condições apenas para explorar como o campo de result funciona. Dependendo da condição, definimos o valor do result como Dropped, o que significa que os dados não serão entregues ao Kinesis Firehose. Caso contrário, para aqueles que foram definidos como Ok, os dados serão enviados para o Kinesis Firehose.


Outro detalhe é que você pode alterar os valores durante a execução. Definimos "TECH" como o valor para o campo TICKER_SYMBOL quando o valor do campo SECTOR for TECHNOLOGY. É uma forma de transformar dados aplicando uma certa logica.


Por fim, foram criados outros dois métodos apenas para decodificar e codificar os dados como requisito para o bom funcionamento do processamento.


Atualizando o arquivo pom.xml


Após gerar nosso projeto via Maven, precisamos adicionar algumas dependências e um plugin para empacotar o código e as bibliotecas para implantação. Seguindo o conteúdo do pom.xml abaixo:

Criando recursos via Terraform


Em vez de criar o Kinesis Firehose, Lambda, policies e roles manualmente por meio do console, vamos criar por meio do Terraform. Se você não conhece muito sobre Terraform, recomendo ver este tutorial Primeiros passos utilizando Terraform na AWS.


Dentro da pasta terraform, crie os seguintes arquivos:


Conteúdo do arquivo vars.tf


O arquivo vars.tf é onde declaramos as variáveis. As variáveis ​​fornecem flexibilidade quando precisamos trabalhar com recursos diferentes.

Conteúdo do arquivo vars.tfvars


Agora precisamos definir os valores dessas variáveis. Então, vamos criar uma pasta chamada development dentro da pasta terraform.

Kinesis Firehose Data Transformation

Após a criação da pasta. Crie um arquivo chamado vars.tfvars como na imagem e cole o conteúdo abaixo.



Observe o campo bucket, você deve especificar o nome do seu próprio bucket. O nome do bucket deve ser único.


Conteúdo do arquivo main.tf


Para este arquivo, apenas declaramos o Provider. Provider é o serviço de nuvem que usaremos para criar nossos recursos. Neste caso, estamos usando a AWS como provider e o Terraform fará o download dos pacotes necessários para criar os recursos.

Observe que, para o campo region, estamos usando a palavra-chave var para atribuir o valor da região já declarado no arquivo vars.tfvars.


Conteúdo do arquivo s3.tf


Este arquivo é onde declaramos os recursos relacionados ao S3. Nesse caso, estamos criando apenas o bucket S3. Mas, se você deseja criar mais recursos relacionados ao S3, como policies, roles e etc, pode declará-lo aqui.


Conteúdo do arquivo lambda.tf


O conteúdo abaixo será responsável pela criação do AWS Lambda e suas roles e policies. Observe que no mesmo arquivo estamos criando um recurso chamado aws_s3_object. É uma estratégia carregar o arquivo Jar diretamente no S3 após o empacotamento. Manter alguns arquivos no S3 é uma forma inteligente quando trabalhamos com arquivos grandes.



Entendendo o conteúdo do lambda.tf


1. Declaramos aws_iam_policy_document que descrevem quais ações os recursos que serão atribuídos a essas políticas podem executar.


2. aws_iam_role fornece a função IAM e controlará algumas ações do Lambda.


3. Declaramos o recurso aws_s3_object porque queremos armazenar nosso arquivo Jar que será criado no S3. Assim, durante a fase de implantação, o Terraform obterá o arquivo Jar que será criado na pasta target e fará o upload para o S3.

  • depends_on: O recurso atual somente será criado caso o recurso atribuído neste campo, já estivesse sido criado.

  • bucket: É o nome do bucket onde também irá armazenar o arquivo Jar.

  • key: Nome do arquivo Jar.

  • source: Localização do arquivo de origem.

  • etag: Aciona atualizações quando o valor muda.


4. aws_lambda_function é o recurso responsável por criar o Lambda e precisamos preencher alguns campos como:

  • function_name: nome Lambda.

  • role: Função do Lambda declarada nas etapas anteriores que fornece acesso aos serviços e recursos da AWS.

  • handler: Neste campo você precisa passar o caminho da classe principal.

  • source_code_hash: Este campo é responsável por acionar as atualizações do lambda.

  • s3_bucket: É o nome do bucket onde também irá armazenar o arquivo Jar gerado durante o deploy.

  • s3_key: Nome do arquivo Jar.

  • runtime: Aqui você pode definir a linguagem de programação suportada pelo Lambda. Para este exemplo, java 11.

  • timeout: Tempo limite de execução do Lambda. Lembrando que o timeout máximo é de minutos.

5. aws_iam_policy fornece políticas IAM para os recursos onde definimos algumas ações a serem executadas. Nesse caso, definimos ações como invocação do Lambda e habilitação aos logs no CloudWatch.


6. Para o recurso aws_iam_role_policy_attachment, podemos anexar policies do IAM às roles. Nesse caso, anexamos as policies lambda_iam_role criadas anteriormente.


7. Finalmente, temos o recurso aws_lambda_permission. Precisamos desse recurso para dar permissão ao Kinesis Firehose para invocar o Lambda.


Conteúdo do arquivo kinesis.tf

Entendendo o conteúdo do kinesis.tf


1. Declaramos o recurso aws_kinesis_firehose_delivery_stream e seus campos, seguindo os detalhes:

  • destination: Esse é o próprio destino, o Kinesis fornece um mecanismo para entregar dados ao S3(extended_s3), Redshift, Elasticsearch (OpenSearch serviço AWS), splunk e http_endpoint.

  • name: Nome Kinesis Firehose.

  • depends_on: O recurso atual somente será criado caso o recurso atribuído neste campo, já estivesse sido criado, ou seja, Kinesis Firehose será criado se o S3 Bucket já existir.

  • extended_s3_configuration:

1. bucket_arn: S3 Bucket utilizando o prefixo arn.

2. role_arn: ARN role.

3. prefix: Pasta do S3 Bucket onde os dados serão armazenados.

Você pode especificar o formato de pastas usando as seguintes expressões,

"/year=! {timestamp:yyyy}/month=!{timestamp:MM}/".


4. error_output_prefix: Para este campo, você pode definir um caminho para armazenar os resultados da falha do processo.

5. buffer_interval: Buffer do Kinesis Firehose utilizando configurações de intervalo para a entrega do dado.

6. buffer_size: Buffer do Kinesis Firehose utilizando configurações de tamanho de dados para a entrega do dado.

7. compression_format: Existem algumas opções de formato de compactação como ZIP, Snappy, HADOOP SNAPPY e GZIP. Para este tutorial, escolhemos GZIP.

  • processing_configuration:Esse é o bloco onde definimos qual recurso será processado. Para este caso, AWS Lambda.

1. enabled: true para habilitar e false para desabilitar.

2. type: Tipo do processador. No caso, Lambda.

3. parameter_value: Nome da função Lambda com prefixo arn.


2. Declaramos aws_iam_policy_document que descrevem quais ações os recursos que serão atribuídos a essas políticas podem executar.


3. aws_iam_role recurso que fornece a função do IAM para controlar algumas ações do Kinesis.


4. aws_iam_policy fornece IAM policies para os recursos onde definimos algumas ações a serem executadas. Nesse caso, definimos S3 e algumas ações do Lambda.


5. Para o recurso aws_iam_role_policy_attachment, podemos anexar policies do IAM às roles. Nesse caso, anexamos as policies firehose_iam_role e firehose_policies criadas anteriormente.


Packaging


Criamos nosso projeto Maven, classe Handler com arquivos Java e Terraform para criar nossos recursos na AWS. Agora, vamos executar os seguintes comandos para o deploy do nosso projeto.


Primeiro, abra o terminal e verifique se você está no diretório raiz do projeto e execute o seguinte comando maven:

mvn package

Kinesis Firehose Data Transformation

O comando acima empacotará o projeto criando o arquivo Jar a ser implantado e carregado no S3. Para ter certeza, verifique a pasta target e veja se alguns arquivos foram criados, incluindo o arquivo lambda-kinesis-transform-1.0.jar.




Executando Terraform


Agora, vamos executar alguns comandos do Terraform. Dentro da pasta terraform, execute os seguintes comandos no terminal:

terraform init

O comando acima iniciará o terraform, baixando as bibliotecas do terraform e também validará os arquivos do terraform.


Para o próximo comando, vamos executar o comando plan para verificar quais recursos serão criados.

terraform plan -var-file=development/vars.tfvars

Após a execução, você verá logs semelhantes no console:

Kinesis Firehose Data Transformation




















Kinesis Firehose Data Transformation






Kinesis Firehose Data Transformation

















Kinesis Firehose Data Transformation
Kinesis Firehose Data Transformation


Kinesis Firehose Data Transformation

Kinesis Firehose Data Transformation

Kinesis Firehose Data Transformation






























Por fim, podemos solicitar a criação dos recursos através do seguinte comando:

terraform apply -var-file=development/vars.tfvars

Após a execução, você deve confirmar para executar as ações, digite "sim".

Kinesis Firehose Data Transformation




Agora a provisão foi concluída!

Kinesis Firehose Data Transformation



Enviando mensagens


Bem, agora precisamos enviar algumas mensagens para serem transformadas e vamos enviá-las via console do Kinesis Firehose. Obviamente existem outras formas de enviar, mas para este tutorial vamos enviar pela forma mais fácil.


Abra o console do Kinesis Firehose, acesse a opção Delivery Stream conforme a imagem abaixo.

Kinesis Firehose Data Transformation

Na seção Test with demo data, clique no botão Start sending demo data para iniciar o envio.

Kinesis Firehose Data Transformation

Após clicar, as mensagens serão enviadas pelo Kinesis Firehose e de acordo com as configurações do buffer, o Kinesis levará 2 minutos para entregar os dados ou caso atinja 1 MIB em quantidade de dados.


Vamos dar uma olhada no nosso Lambda e ver as métricas:

Kinesis Firehose Data Transformation

Kinesis Firehose Data Transformation

Clique na guia Monitor e depois, na opção Metrics e observe que o Lambda está sendo invocado e não há erros.

Kinesis Firehose Data Transformation

Resultado dos Dados Transformados


Agora que sabemos que tudo está funcionando bem, vamos dar uma olhada nos dados transformados diretamente no Amazon S3. Vá e acesse o S3 Bucket criado.

Kinesis Firehose Data Transformation

Observe que muitos arquivos foram criados. Vamos ler um deles e ver os dados transformados.

Kinesis Firehose Data Transformation

Escolha um arquivo como na imagem abaixo e clique no botão Actions e depois na opção Query with S3 Select.

Kinesis Firehose Data Transformation

Seguindo as opções selecionadas na imagem abaixo, clique no botão Run SQL query para ver o resultado.

Kinesis Firehose Data Transformation

Kinesis Firehose Data Transformation

Com base na imagem acima, você pode ver que, de acordo com a classe Handler.java, definimos um algoritmo para descartar dados com valor do campo CHANGE menor que zero e para aqueles com valor de campo SECTOR igual a TECHNOLOGY, definiríamos o valor de campo TICKER_SYMBOL como TECH.


Este foi um exemplo de como você pode transformar dados usando Kinesis Firehose Data Transformation e Lambda como um componente barato para transformar dados.


Parar de enviar mensagens


Você pode parar de enviar mensagens antes de destruir os recursos criados via Terraform procurando economizar dinheiro. Então, basta voltar ao console do Kinesis Firehose e clicar no botão Stop sending demo data.


Destroy


As cobranças da AWS ocorrerão se você não destruir esses recursos. Portanto, recomendo destruí-los evitando algumas cobranças desnecessárias. Para evitá-lo, execute o comando abaixo.

terraform destroy -var-file=development/vars.tfvars

Lembra que você precisa confirmar essa operação, ok?


Conclusão


Definitivamente, Kinesis Firehose não é apenas um serviço de Streaming. Existe flexibilidade integrando os serviços da AWS e a possibilidade de entregar dados para diferentes destinos possibilitando a transformação de dados e aplicando a lógica de acordo com o seu caso de uso.


Repositório GitHub


Espero que tenha curtido!

Posts recentes

Ver tudo

Comentários


bottom of page