• JP

Criando Kinesis Firehose via Terraform e enviando eventos com Java



Neste post, vamos fazer um passo a passo de como criar um AWS Kinesis Firehose (Delivery Stream) via Terraform e enviar alguns eventos via aplicação Java.


Antes de começarmos, precisamos entender o que é o Kinesis Firehose e como é o seu funcionamento.

O Kinesis Firehose é um dos recursos do Amazon Kinesis, que se resume ao serviço de streaming de dados da AWS.


O Kinesis Firehose possibilita capturar um grande volume de dados através de uma origem, processar em tempo real e entregar os dados para um destino.


Para entender melhor, vamos pensar no cenário a seguir:


Imagine que você e seu time esteja trabalhando em um processo que será necessário enviar um grande volume de dados em tempo real para o S3. Estes objetos são arquivos no formato JSON representados por dados transacionais de compras e vendas de uma plataforma de e-commerce.


A ideia de enviar ao S3 é que, o time está planejando criar um Datalake no futuro, e já pensa em organizar os dados em um repositório único de dados, neste caso, o S3.


O time criou uma API que faz o PUT dos objetos no S3, e foram percebendo que aquele processo era limitado, custoso, não muito escalável pelo fato do volume de dados ser alto e o mais importante, não era resiliente.


Com base nas limitações, o time entendeu que, o mais certo seria trabalhar em cima de uma ferramenta que pudesse fazer toda a parte de streaming dos dados e que a utilização do Kinesis Firehose seria uma melhor abordagem. Pensando neste novo cenário, arquitetura foi mudada para a seguinte:



Certo, mas qual é o benefício de ter adicionado o Kinesis Firehose? O Kinesis possibilita gerenciar todo o volume de dados, independente do volume. Não será mais necessário acessar o S3 de forma contínua e gerenciar todo o fluxo de PUT dos objetos utilizando SDK Client, tornando o processo mais resiliente e simples.


O Kinesis Firehose possibilita entregar as mensagens para diferentes destinos, como o S3, Redshift, Elastic Search e dentre outros. É um ótimo recurso para um processo analítico de dados e não necessita de uma administração contínua, pois o recurso já é gerenciado pela AWS. Resumindo, se você e seu time precisa processar grandes volumes de dados em tempo real e entregar os dados para diferentes destinos, o Kinesis Firehose pode ser uma boa solução.


O cenário acima descrito, foi apenas para que a gente pudesse entender o contexto em qual cenário aplicar o Kinesis Firehose, existem vários outros cenários em que podemos aplicar.


Na próxima etapa, vamos criar um Kinesis utilizando uma ferramenta IAC, neste caso, o Terraform. O Terraform já foi abordado em outros posts e vale a pena dar uma olhada caso você seja novo por aqui, pois o foco não é falar sobre Terraform, combinado?


Após a criação do Kinesis Firehose utilizando Terraform, vamos criar um código Java que vai nos permitir enviar alguns eventos para o Kinesis Firehose processar e salvar estes mesmos dados em um Bucket no S3, finalizando o fluxo de streaming.


Lembre-se que, para executar os passos seguintes é necessário ter uma conta na AWS e ter as credencias já configuradas, ok?


Criando o Terraform


Crie um arquivo no seu projeto chamado main.tf e adicione o código abaixo

provider "aws" {
  region = "${var.region}"
}

Entendendo o código acima

  1. provider "aws": É no provider que definimos qual é a cloud que utilizaremos, no nosso caso a AWS.

  2. region: É onde definimos a região em que o serviço será executado.


Agora, crie o arquivo vars.tf . É neste arquivo que vamos definir a variáveis e os seus valores.

variable "region" {
  default = "us-east-1"
  type = "string"
}

variable "bucket" {
  type = "string"
}

Entendendo o código acima

  1. variable "region": Declaração da variável region. Esta mesma variável é utilizado no arquivo main.tf.

  2. default: Definição padrão do valor da variável, neste caso us-east-1. Esta será a região em que o serviço será executado.

  3. type: Tipo da variável

  4. variable "bucket": Declaração da variável bucket.


E por último, crie o arquivo kinesis.tf, este nome é sugestivo, combinado?

resource "aws_kinesis_firehose_delivery_stream" "kinesis_firehose" 
{

  destination = "extended_s3"
  name = "kns-delivery-event"

  extended_s3_configuration {
    bucket_arn = "${aws_s3_bucket.bucket.arn}"
    role_arn = "${aws_iam_role.firehose_role.arn}"
    prefix = "ingest/year=!{timestamp:yyyy}/
    month=!{timestamp:MM}/
    day=!{timestamp:dd}/
    hour=!{timestamp:HH}/"
    error_output_prefix = "ingest/
    !{firehose:error-output-type}/
    year=!{timestamp:yyyy}
    /month=!{timestamp:MM}/
    day=!{timestamp:dd}/hour=!{timestamp:HH}/"
  }

}

resource "aws_s3_bucket" "bucket" {
  bucket = "${var.bucket}"
  acl    = "private"
}

resource "aws_iam_role" "firehose_role" {
  name = "firehose_test_role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_policy" "policy" {

  name = "kns-s3-policy"
  policy = <<EOF
{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Action": [
          "s3:GetObject",
          "s3:ListBucketMultipartUploads",
          "s3:AbortMultipartUpload",
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:PutObject"
        ],
        "Effect": "Allow",
        "Resource":  [
          "arn:aws:s3:::${var.bucket}/*",
          "arn:aws:s3:::${var.bucket}*",
        ]
      }
    ]
  }
  EOF
}


resource "aws_iam_role_policy_attachment" "kns-s3-policy-attach" {
  policy_arn = aws_iam_policy.policy.arn
  role       = aws_iam_role.firehose_role.name
}

Entendendo o código acima


1.resource "aws_kinesis_firehose_delivery_stream": Bloco responsável pela criação do Kinesis Firehose


2. extended_s3_configuration: Bloco com configurações específicas do destino


3. bucket_arn: Referência do Bucket que será declarado nos próximos passos


4. role_arn: Role que será declarada nos próximos passos


5. prefix: Neste parâmetro, é configurado o caminho em que os arquivos serão salvos. Definimos um prefixo (ingest/) + partições, por exemplo: ingest/year=2021/month=03/day=01/hour=12


6. error_output_prefix: Neste parâmetro, definimos um caminho caso o processamento falhe e todos os erros serão enviados para o caminho ali definido.


7. resource "aws_s3_bucket" "bucket": Bloco responsável pelas configurações do Bucket a ser criado.


8. bucket: Nome do bucket em que será passado via parâmetro. Lembre-se que o bucket deve ser único.


9. acl: Nível de acesso, neste caso, privado.


10. resource "aws_iam_role" "firehose_role": Bloco responsável pela criação da role que será usada pelo Kinesis.


11. resource "aws_iam_policy" "policy": Bloco responsável pela criação da policy. Nela definimos as ações que serão permitidas executar no bucket S3.


12. resource "aws_iam_role_policy_attachment" "kns-s3-policy-attach": Neste bloco, anexamos a policy para a role firehose_role.


Executando o Terraform


Inicie o Terraform

terraform init

Executando o Plan


O plan possibilita verificar quais recursos serão criados, neste caso é necessário passar o valor da variável bucket para a criação do mesmo no S3.

terraform plan -var 'bucket=digite o nome do seu bucket'

Executando o apply


Neste passo, a criação dos recursos serão aplicados. Lembre-se de passar o nome do bucket em que deseja criar para a variável bucket e o nome bucket deve ser único.

terraform apply -var 'bucket=digite o nome do seu bucket'

Pronto, recurso criado!


Falando um pouco sobre Buffer


Existe um detalhe bem interessante no Kinesis que são as configurações de Buffer, dependendo de como está configurado, pode impactar o modo de quando os arquivos serão salvos no destino final, no caso, o S3. Por padrão, o Buffer size é criado com 5 MB e o interval de 300 segundos (5 minutos).


Como funciona o Buffer?


No exemplo acima, temos 2 configurações de Buffer, o Buffer size e o Interval. No Kinesis, o buffer é utilizado como uma espécie de memória interna que armazenará os dados antes de serem entregues ao S3.


Nas configurações acima, o Kinesis vai armazenar até 5 MB de dados antes de serem entregues ou a cada intervalo de 5 minutos. Esta segunda opção pode acontecer caso não haja 5 MB de dados para armazenar. Lembrando que existe a possibilidade do Kinesis aumentar estas configurações de forma dinâmica visando a entrega dos dados.


Criando o código Java


Após executado os passos anteriores e a criação dos recursos serem bem sucedidas, vamos criar o nosso código Java que enviará os eventos para o Kinesis Firehose.


Será um código Java simples, que enviará os eventos para o Kinesis Firehose através da AWS SDK. Neste mesmo código, vamos criar duas formas de envio para o Kinesis Firehose, utilizando PutRecord e PutRecordBatch.


Maven

<dependencies>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-kinesis</artifactId>
        <version>1.12.70</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

Criando o modelo


Classe Java chamada Event.java que representará o modelo do evento.

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.util.UUID;

@Data
public class Event {

    @JsonProperty("event_date")
    private String eventDate;

    @JsonProperty("event_id")
    private UUID eventId;

    @JsonProperty("provider")
    private String provider;

    @JsonProperty("blog")
    private String blog;

    @JsonProperty("post_id")
    private UUID postId;
}

Criando o Service


A classe KinesisService.java será a responsável por parte da lógica e envio dos eventos.

public class KinesisService {

   static String PROVIDER = "AWS KINESIS FIREHOSE";
   static String BLOG = "Coffee and Tips";
   static String KNS_DELIVERY_NAME = "kns-delivery-event";
   static String RECORD_ID = "Record ID ";
   static String EVENT = "Event ";

   public static AmazonKinesisFirehose kinesisFirehoseClient(){
       AmazonKinesisFirehose amazonKinesisFirehose =
               AmazonKinesisFirehoseClient.builder()
                       .withRegion(Regions.US_EAST_1.getName())
                       .build();
       return amazonKinesisFirehose;
   }

    @SneakyThrows
    public static void sendDataWithPutRecordBatch(int maxRecords){

        PutRecordBatchRequest putRecordBatchRequest =
                new PutRecordBatchRequest();

        putRecordBatchRequest
        .setDeliveryStreamName(KNS_DELIVERY_NAME);

        String line = "";
        List<Record> records = new ArrayList<>();

        while(maxRecords > 0){
            line = getData();
            String data = line + "\n";
            Record record = new Record()
                    .withData(ByteBuffer.wrap(data.getBytes()));
            records.add(record);
            maxRecords --;
        }

        putRecordBatchRequest.setRecords(records);
        PutRecordBatchResult putRecordResult =
                kinesisFirehoseClient()
                .putRecordBatch(putRecordBatchRequest);

        putRecordResult
           .getRequestResponses()
           .forEach(result -> System.out
           .println(RECORD_ID + result.getRecordId()));

    }

    @SneakyThrows
    public static void sendDataWithPutRecord(int maxRecords){

            PutRecordRequest PutRecordRequest = 
                    new PutRecordRequest();
            PutRecordRequest
                    .setDeliveryStreamName(KNS_DELIVERY_NAME);

            String line = "";

            while(maxRecords > 0){

                line = getData();
                String data = line + "\n";

                System.out.println(EVENT + data);

                Record record = new Record()
                        .withData(ByteBuffer
                        .wrap(data.getBytes()));
                        
                PutRecordRequest.setRecord(record);

                PutRecordResult 
                putRecordResult = kinesisFirehoseClient()
                        .putRecord(PutRecordRequest);

                System.out.println(RECORD_ID + 
                        putRecordResult.getRecordId());

                maxRecords --;
            }
    }

    @SneakyThrows
    public static String getData(){
        Event event = new Event();
        event.setEventId(UUID.randomUUID());
        event.setPostId(UUID.randomUUID());
        event.setBlog(BLOG);
        event.setEventDate(LocalDateTime.now().toString());
        event.setProvider(PROVIDER);
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.writeValueAsString(event);
    }

}

Entendendo o código acima


O código acima possui algumas adaptações visando o funcionamento do Kinesis para o Post. Para um ambiente de produção, recomendo utilizar técnicas de padrões de projeto ou adotar melhores práticas visando um código mais organizado, GG?


1. O método kinesisFirehoseClient( ) é o responsável por configurar e manter uma interação entre os serviços do Kinesis Firehose e a nossa aplicação através da interface AmazonKinesisFirehose. Esta interface provê os seguintes contratos:

  • putRecordBatch(PutRecordBatchRequest var1)

  • putRecord(PutRecordRequest var1)

Todo o código de envio foi baseado nos contratos acima e falaremos mais sobre eles nos próximos passos. Por último, perceba que estamos utilizando a região US-EAST-1, a mesma região definida no Terraform.


2. O método sendDataWithPutRecordBatch(int maxRecords) será o nosso método responsável por enviar os dados em Batch a partir do contrato putRecordBatch(PutRecordBatchRequest var1). A ideia de usar o envio em Batch, é a possibilidade de, em um único request, enviarmos uma grande quantidade de dados. Este cenário é bem comum para aplicações que trabalham com um grande volume de dados.


Um ponto importante a ser lembrado é que, esta operação suporta até 4 MB de dados ou até 500 registros por requisição.


Para o método sendDataWithPutRecordBatch(int maxRecords), estamos utilizando o parâmetro maxRecords, que simulará uma quantidade específica de dados dentro do loop para serem enviados via Batch ao fim da interação, ou seja, o seu uso será apenas para a simulação.


3. Para entendermos sobre o contrato putRecord(PutRecordRequest var1) fornecido pela interface AmazonKinesisFirehose, criamos o método sendDataWithPutRecord(int maxRecords). Este cenário difere bastante do envio em Batch, pois o tipo de envio será do tipo single record, ou seja, para cada dado enviado, um novo request deve ser criado e para cada envio, existe um limite de dado de até 1.000 KiB de tamanho.


4. O método getData( ) será o responsável por montar o objeto do tipo Event com valores aleatórios antes do envio e transformar em um JSON através do ObjectMapper.


Executando o código


Para a execução do código, vamos criar um método main( ) que fará a execução dos métodos acima.


Executando o método sendDataWithPutRecord(int maxRecords)

public static void main(String[] args) {
   sendDataWithPutRecord(2000);
}

No código acima estamos invocando o método sendDataWithPutRecord(int maxRecords) e passando no parâmetro o valor 2000, ou seja, iremos enviar 2000 eventos ou 2000 requests.


Resposta do envio no console


Para cada envio, o campo putRecordResult.getRecordId() retorna um ID.


Executando o método sendDataWithPutRecordBatch(int maxRecords)

public static void main(String[] args) {
   sendDataWithPutRecordBatch(500);
}

Já no método acima, especificamos o valor 500 para o parâmetro maxRecords, pois é o valor máximo de dados permitidos para envio em cada requisição. Lembrando que este parâmetro simula a quantidade de dados fictícios a serem enviados.


Destino final


Após a execução dos envios, os dados serão entregues ao Bucket do S3 definido na execução do Terraform e serão organizados conforme a imagem abaixo.


Na imagem acima, os dados estão organizados entre ano, mês, dia e hora de envio.


Os arquivos vão estar disponíveis dentro das pastas relacionadas a hora (hour=), e perceba a diferença de 5 minutos entre eles na coluna Última modificação, ou seja, seguindo a configuração de Buffer interval conforme falamos nos passos anteriores.


Pronto, os dados estão sendo entregues!


Destruindo os recursos


Caso você esteja acompanhando este Post de forma experimental, creio que seja necessário apagar toda a stack em que criamos para evitar uma conta inesperada da AWS.


Podemos executar o seguinte comando via Terraform para destruir os recursos em que criamos.

terraform destroy -var 'bucket=digite o nome do seu bucket'

Mas antes, apague todos os arquivos e pastas do Bucket, pois caso o contrário, o destroy não vai funcionar, GG?


Após a execução do comando acima, você deverá confirmar a destruição, é só digitar yes para confirmar.

Caso queria entender mais a fundo sobre Kinesis Firehose, recomendo a leitura na documentação oficial.


Código completo no GitHub.



É isso, curtiu? Até mais!


Posts recentes

Ver tudo