My Items

I'm a title. ​Click here to edit me.

Configurando S3 Bucket event notification com SQS via Terraform

O S3 (Simple Storage Service) possibilita notificar através de eventos, quando alguma ação ocorre dentro de um Bucket ou em alguma pasta específica. Em outras palavras, funciona como um listener, ou seja, qualquer ação ocorrida em uma origem, ocorrerá um evento de notificação para um destino. O que seriam estas ações? Qualquer ação que acontece dentro de um Bucket como, a criação de um novo arquivo manual ou através de um upload, criação de uma pasta, remoção de arquivos, restauração de arquivos e etc. Destinos Para cada configuração de notificação de evento, deve existir um destino. Para este destino, será enviado informações sobre cada ação, por exemplo: Um novo arquivo foi criado em uma pasta específica, assim, informações sobre o arquivo será enviada, como a data de criação, tamanho do arquivo, o tipo do evento, nome do arquivo e dentre outros. Lembrando que neste processo, não é enviado o conteúdo do arquivo, GG? Existem 3 tipos de destinos disponíveis: Lambda SNS SQS Entendendo o funcionamento na prática Neste post vamos criar uma configuração de notificação de evento em um Bucket S3, simular uma ação e entender o comportamento final. Todo este conteúdo é baseado na documentação oferecida pela HashiCorp, empresa mantenedora do Terraform. Poderíamos criar esta configuração via console mas por motivos de boas práticas, utilizaremos o Terraform como ferramenta IAC. Para quem não conhece muito sobre Terraform, segue este tutorial sobre Primeiros passos utilizando Terraform na AWS. No próximo passo, criaremos um fluxo simulando a imagem abaixo. Criaremos uma configuração no Bucket S3, que para cada arquivo criado na pasta files/, um evento de notificação será enviado para um fila SQS. Criando os arquivos Terraform Crie no seu projeto, uma pasta chamada terraform/ e a partir de agora, todos os arquivos .tf serão criados dentro dela. Agora, crie um arquivo chamado vars.tf onde armazenaremos as variáveis que serão utilizadas. variable "region" {
default = "us-east-1"
type = string
}

variable "bucket" {
type = string
} Crie um arquivo chamado provider.tf , onde adicionaremos a configuração do provider, no qual será a AWS. Isso significa que, o Terraform utilizará a AWS como nuvem para criar os recursos e fará o download dos plugins necessários na inicialização. Copie o código abaixo para o arquivo. provider "aws" {
region = "${var.region}"
} Crie um arquivo chamado s3.tf , onde adicionaremos as configurações para a criação de um novo Bucket S3 que será usado para a prática resource "aws_s3_bucket" "s3_bucket_notification" {
bucket = "${var.bucket}"
} Agora, crie um arquivo chamado sqs.tf , onde adicionaremos as configurações para a criação de uma fila SQS e algumas permissões conforme código abaixo: resource "aws_sqs_queue" "s3-notifications-sqs" {

name = "s3-notifications-sqs"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:*:*:s3-notifications-sqs",
"Condition": {
"ArnEquals": {
"aws:SourceArn":
"${aws_s3_bucket.s3_bucket_notification.arn}"
}
}
}
]
}
POLICY
} Entendendo o código acima No código acima, estamos criando uma SQS e adicionando algumas configurações de policy, segue mais detalhes: O nome da SQS será s3-notifications-sqs, valor detalhado no campo name No campo policy, definimos uma política para que o S3 possa enviar mensagens de notificações para a SQS. Veja que estamos referenciando o Bucket S3 através de ARN no trecho ${aws_s3_bucket.s3_bucket_notification.arn} Para o último arquivo, vamos criar a configuração que permitirá o envio de notificações de eventos do Bucket S3 para uma SQS. Crie o arquivo s3_notification.tf e adicione o código abaixo: resource "aws_s3_bucket_notification" "s3_notification" {
bucket = aws_s3_bucket.s3_bucket_notification.id

queue {
events = ["s3:ObjectCreated:*"]
queue_arn = aws_sqs_queue.s3-notifications-sqs.arn
filter_prefix = "files/"
}
} Entendendo o código acima No código acima, estamos criando um recurso chamado aws_s3_bucket_notification no qual será responsável por habilitar as notificações de um Bucket S3. No campo bucket, estamos referenciando através do ID, o bucket que será criado. Neste caso, estamos utilizando aws_s3_bucket + alias + id para a referência, no qual, o alias é o s3_bucket_notification, definido no arquivo s3.tf e por último, o id que será o identificador do Bucket que será criado. O campo queue é um bloco contendo algumas configurações como: events: É o tipo do evento da notificação. Neste caso, para eventos do tipo ObjectCreated, as notificações serão feitas apenas por objetos criados, ou seja, para objetos deletados, não haverá notificação. Isso ajuda para restringir alguns tipos de eventos. queue_arn: Referência para a SQS definida no arquivo sqs.tf filter_prefix: Este campo define a pasta em que desejamos que as notificações sejam disparadas. No código, definimos que a pasta files/ seja o gatilho. Resumindo, para todos arquivos criados dentro desta pasta, um notificação será enviada para a SQS definida no campo queue_arn. Executando o Terraform Inicie o Terraform terraform init Executando o Plan O plan possibilita verificar quais recursos serão criados, neste caso é necessário passar o valor da variável bucket para a criação do mesmo no S3. terraform plan -var bucket = 'digite o nome do bucket' Executando o Apply Neste passo, a criação dos recursos serão aplicados. Lembre-se de passar o nome do bucket em que deseja criar para a variável bucket e o nome bucket deve ser único. terraform apply -var bucket = 'digite o nome do bucket' Simulando um evento de notificação Após a execução dos passos anteriores e criado os recursos, vamos fazer um upload manual de um arquivo na pasta files/ do bucket que foi criado. Via console, acesse o Bucket criado no S3 e crie uma pasta chamada files. Dentro dela, carregue qualquer arquivo. Carregando um arquivo Após carregar o arquivo na pasta files, acesse o SQS criado. Você verá que existe mensagens disponíveis na SQS que foi criada nos passos anteriores. Acesse as mensagens e navegue pelas informações. Pronto, temos uma notificação de evento criada. Para mais informações, acesse os links: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_notification https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html É isso, curtiu? Até mais!

Criando Kinesis Firehose via Terraform e enviando eventos com Java

Neste post, vamos fazer um passo a passo de como criar um AWS Kinesis Firehose (Delivery Stream) via Terraform e enviar alguns eventos via aplicação Java. Antes de começarmos, precisamos entender o que é o Kinesis Firehose e como é o seu funcionamento. O Kinesis Firehose é um dos recursos do Amazon Kinesis, que se resume ao serviço de streaming de dados da AWS. O Kinesis Firehose possibilita capturar um grande volume de dados através de uma origem, processar em tempo real e entregar os dados para um destino. Para entender melhor, vamos pensar no cenário a seguir: Imagine que você e seu time esteja trabalhando em um processo que será necessário enviar um grande volume de dados em tempo real para o S3. Estes objetos são arquivos no formato JSON representados por dados transacionais de compras e vendas de uma plataforma de e-commerce. A ideia de enviar ao S3 é que, o time está planejando criar um Datalake no futuro, e já pensa em organizar os dados em um repositório único de dados, neste caso, o S3. O time criou uma API que faz o PUT dos objetos no S3, e foram percebendo que aquele processo era limitado, custoso, não muito escalável pelo fato do volume de dados ser alto e o mais importante, não era resiliente. Com base nas limitações, o time entendeu que, o mais certo seria trabalhar em cima de uma ferramenta que pudesse fazer toda a parte de streaming dos dados e que a utilização do Kinesis Firehose seria uma melhor abordagem. Pensando neste novo cenário, arquitetura foi mudada para a seguinte: Certo, mas qual é o benefício de ter adicionado o Kinesis Firehose? O Kinesis possibilita gerenciar todo o volume de dados, independente do volume. Não será mais necessário acessar o S3 de forma contínua e gerenciar todo o fluxo de PUT dos objetos utilizando SDK Client, tornando o processo mais resiliente e simples. O Kinesis Firehose possibilita entregar as mensagens para diferentes destinos, como o S3, Redshift, Elastic Search e dentre outros. É um ótimo recurso para um processo analítico de dados e não necessita de uma administração contínua, pois o recurso já é gerenciado pela AWS. Resumindo, se você e seu time precisa processar grandes volumes de dados em tempo real e entregar os dados para diferentes destinos, o Kinesis Firehose pode ser uma boa solução. O cenário acima descrito, foi apenas para que a gente pudesse entender o contexto em qual cenário aplicar o Kinesis Firehose, existem vários outros cenários em que podemos aplicar. Na próxima etapa, vamos criar um Kinesis utilizando uma ferramenta IAC, neste caso, o Terraform. O Terraform já foi abordado em outros posts e vale a pena dar uma olhada caso você seja novo por aqui, pois o foco não é falar sobre Terraform, combinado? Após a criação do Kinesis Firehose utilizando Terraform, vamos criar um código Java que vai nos permitir enviar alguns eventos para o Kinesis Firehose processar e salvar estes mesmos dados em um Bucket no S3, finalizando o fluxo de streaming. Lembre-se que, para executar os passos seguintes é necessário ter uma conta na AWS e ter as credencias já configuradas, ok? Criando o Terraform Crie um arquivo no seu projeto chamado main.tf e adicione o código abaixo provider "aws" {
region = "${var.region}"
} Entendendo o código acima provider "aws": É no provider que definimos qual é a cloud que utilizaremos, no nosso caso a AWS. region: É onde definimos a região em que o serviço será executado. Agora, crie o arquivo vars.tf . É neste arquivo que vamos definir a variáveis e os seus valores. variable "region" {
default = "us-east-1"
type = "string"
}

variable "bucket" {
type = "string"
} Entendendo o código acima variable "region": Declaração da variável region. Esta mesma variável é utilizado no arquivo main.tf. default: Definição padrão do valor da variável, neste caso us-east-1. Esta será a região em que o serviço será executado. type: Tipo da variável variable "bucket": Declaração da variável bucket. E por último, crie o arquivo kinesis.tf, este nome é sugestivo, combinado? resource "aws_kinesis_firehose_delivery_stream" "kinesis_firehose"
{

destination = "extended_s3"
name = "kns-delivery-event"

extended_s3_configuration {
bucket_arn = "${aws_s3_bucket.bucket.arn}"
role_arn = "${aws_iam_role.firehose_role.arn}"
prefix = "ingest/year=!{timestamp:yyyy}/
month=!{timestamp:MM}/
day=!{timestamp:dd}/
hour=!{timestamp:HH}/"
error_output_prefix = "ingest/
!{firehose:error-output-type}/
year=!{timestamp:yyyy}
/month=!{timestamp:MM}/
day=!{timestamp:dd}/hour=!{timestamp:HH}/"
}

}

resource "aws_s3_bucket" "bucket" {
bucket = "${var.bucket}"
acl = "private"
}

resource "aws_iam_role" "firehose_role" {
name = "firehose_test_role"

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}

resource "aws_iam_policy" "policy" {

name = "kns-s3-policy"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:GetObject",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:PutObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::${var.bucket}/*",
"arn:aws:s3:::${var.bucket}*",
]
}
]
}
EOF
}


resource "aws_iam_role_policy_attachment" "kns-s3-policy-attach" {
policy_arn = aws_iam_policy.policy.arn
role = aws_iam_role.firehose_role.name
} Entendendo o código acima 1.resource "aws_kinesis_firehose_delivery_stream": Bloco responsável pela criação do Kinesis Firehose 2. extended_s3_configuration: Bloco com configurações específicas do destino 3. bucket_arn: Referência do Bucket que será declarado nos próximos passos 4. role_arn: Role que será declarada nos próximos passos 5. prefix: Neste parâmetro, é configurado o caminho em que os arquivos serão salvos. Definimos um prefixo (ingest/) + partições, por exemplo: ingest/year=2021/month=03/day=01/hour=12 6. error_output_prefix: Neste parâmetro, definimos um caminho caso o processamento falhe e todos os erros serão enviados para o caminho ali definido. 7. resource "aws_s3_bucket" "bucket": Bloco responsável pelas configurações do Bucket a ser criado. 8. bucket: Nome do bucket em que será passado via parâmetro. Lembre-se que o bucket deve ser único. 9. acl: Nível de acesso, neste caso, privado. 10. resource "aws_iam_role" "firehose_role": Bloco responsável pela criação da role que será usada pelo Kinesis. 11. resource "aws_iam_policy" "policy": Bloco responsável pela criação da policy. Nela definimos as ações que serão permitidas executar no bucket S3. 12. resource "aws_iam_role_policy_attachment" "kns-s3-policy-attach": Neste bloco, anexamos a policy para a role firehose_role. Executando o Terraform Inicie o Terraform terraform init Executando o Plan O plan possibilita verificar quais recursos serão criados, neste caso é necessário passar o valor da variável bucket para a criação do mesmo no S3. terraform plan -var 'bucket=digite o nome do seu bucket' Executando o apply Neste passo, a criação dos recursos serão aplicados. Lembre-se de passar o nome do bucket em que deseja criar para a variável bucket e o nome bucket deve ser único. terraform apply -var 'bucket=digite o nome do seu bucket' Pronto, recurso criado! Falando um pouco sobre Buffer Existe um detalhe bem interessante no Kinesis que são as configurações de Buffer, dependendo de como está configurado, pode impactar o modo de quando os arquivos serão salvos no destino final, no caso, o S3. Por padrão, o Buffer size é criado com 5 MB e o interval de 300 segundos (5 minutos). Como funciona o Buffer? No exemplo acima, temos 2 configurações de Buffer, o Buffer size e o Interval. No Kinesis, o buffer é utilizado como uma espécie de memória interna que armazenará os dados antes de serem entregues ao S3. Nas configurações acima, o Kinesis vai armazenar até 5 MB de dados antes de serem entregues ou a cada intervalo de 5 minutos. Esta segunda opção pode acontecer caso não haja 5 MB de dados para armazenar. Lembrando que existe a possibilidade do Kinesis aumentar estas configurações de forma dinâmica visando a entrega dos dados. Criando o código Java Após executado os passos anteriores e a criação dos recursos serem bem sucedidas, vamos criar o nosso código Java que enviará os eventos para o Kinesis Firehose. Será um código Java simples, que enviará os eventos para o Kinesis Firehose através da AWS SDK. Neste mesmo código, vamos criar duas formas de envio para o Kinesis Firehose, utilizando PutRecord e PutRecordBatch. Maven <dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.12.70</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies> Criando o modelo Classe Java chamada Event.java que representará o modelo do evento. import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.util.UUID;

@Data
public class Event {

@JsonProperty("event_date")
private String eventDate;

@JsonProperty("event_id")
private UUID eventId;

@JsonProperty("provider")
private String provider;

@JsonProperty("blog")
private String blog;

@JsonProperty("post_id")
private UUID postId;
} Criando o Service A classe KinesisService.java será a responsável por parte da lógica e envio dos eventos. public class KinesisService {

static String PROVIDER = "AWS KINESIS FIREHOSE";
static String BLOG = "Coffee and Tips";
static String KNS_DELIVERY_NAME = "kns-delivery-event";
static String RECORD_ID = "Record ID ";
static String EVENT = "Event ";

public static AmazonKinesisFirehose kinesisFirehoseClient(){
AmazonKinesisFirehose amazonKinesisFirehose =
AmazonKinesisFirehoseClient.builder()
.withRegion(Regions.US_EAST_1.getName())
.build();
return amazonKinesisFirehose;
}

@SneakyThrows
public static void sendDataWithPutRecordBatch(int maxRecords){

PutRecordBatchRequest putRecordBatchRequest =
new PutRecordBatchRequest();

putRecordBatchRequest
.setDeliveryStreamName(KNS_DELIVERY_NAME);

String line = "";
List<Record> records = new ArrayList<>();

while(maxRecords > 0){
line = getData();
String data = line + "\n";
Record record = new Record()
.withData(ByteBuffer.wrap(data.getBytes()));
records.add(record);
maxRecords --;
}

putRecordBatchRequest.setRecords(records);
PutRecordBatchResult putRecordResult =
kinesisFirehoseClient()
.putRecordBatch(putRecordBatchRequest);

putRecordResult
.getRequestResponses()
.forEach(result -> System.out
.println(RECORD_ID + result.getRecordId()));

}

@SneakyThrows
public static void sendDataWithPutRecord(int maxRecords){

PutRecordRequest PutRecordRequest =
new PutRecordRequest();
PutRecordRequest
.setDeliveryStreamName(KNS_DELIVERY_NAME);

String line = "";

while(maxRecords > 0){

line = getData();
String data = line + "\n";

System.out.println(EVENT + data);

Record record = new Record()
.withData(ByteBuffer
.wrap(data.getBytes()));

PutRecordRequest.setRecord(record);

PutRecordResult
putRecordResult = kinesisFirehoseClient()
.putRecord(PutRecordRequest);

System.out.println(RECORD_ID +
putRecordResult.getRecordId());

maxRecords --;
}
}

@SneakyThrows
public static String getData(){
Event event = new Event();
event.setEventId(UUID.randomUUID());
event.setPostId(UUID.randomUUID());
event.setBlog(BLOG);
event.setEventDate(LocalDateTime.now().toString());
event.setProvider(PROVIDER);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(event);
}

} Entendendo o código acima O código acima possui algumas adaptações visando o funcionamento do Kinesis para o Post. Para um ambiente de produção, recomendo utilizar técnicas de padrões de projeto ou adotar melhores práticas visando um código mais organizado, GG? 1. O método kinesisFirehoseClient( ) é o responsável por configurar e manter uma interação entre os serviços do Kinesis Firehose e a nossa aplicação através da interface AmazonKinesisFirehose. Esta interface provê os seguintes contratos: putRecordBatch(PutRecordBatchRequest var1) putRecord(PutRecordRequest var1) Todo o código de envio foi baseado nos contratos acima e falaremos mais sobre eles nos próximos passos. Por último, perceba que estamos utilizando a região US-EAST-1, a mesma região definida no Terraform. 2. O método sendDataWithPutRecordBatch(int maxRecords) será o nosso método responsável por enviar os dados em Batch a partir do contrato putRecordBatch(PutRecordBatchRequest var1). A ideia de usar o envio em Batch, é a possibilidade de, em um único request, enviarmos uma grande quantidade de dados. Este cenário é bem comum para aplicações que trabalham com um grande volume de dados. Um ponto importante a ser lembrado é que, esta operação suporta até 4 MB de dados ou até 500 registros por requisição. Para o método sendDataWithPutRecordBatch(int maxRecords), estamos utilizando o parâmetro maxRecords, que simulará uma quantidade específica de dados dentro do loop para serem enviados via Batch ao fim da interação, ou seja, o seu uso será apenas para a simulação. 3. Para entendermos sobre o contrato putRecord(PutRecordRequest var1) fornecido pela interface AmazonKinesisFirehose, criamos o método sendDataWithPutRecord(int maxRecords). Este cenário difere bastante do envio em Batch, pois o tipo de envio será do tipo single record, ou seja, para cada dado enviado, um novo request deve ser criado e para cada envio, existe um limite de dado de até 1.000 KiB de tamanho. 4. O método getData( ) será o responsável por montar o objeto do tipo Event com valores aleatórios antes do envio e transformar em um JSON através do ObjectMapper. Executando o código Para a execução do código, vamos criar um método main( ) que fará a execução dos métodos acima. Executando o método sendDataWithPutRecord(int maxRecords) public static void main(String[] args) {
sendDataWithPutRecord(2000);
} No código acima estamos invocando o método sendDataWithPutRecord(int maxRecords) e passando no parâmetro o valor 2000, ou seja, iremos enviar 2000 eventos ou 2000 requests. Resposta do envio no console Para cada envio, o campo putRecordResult.getRecordId() retorna um ID. Executando o método sendDataWithPutRecordBatch(int maxRecords) public static void main(String[] args) {
sendDataWithPutRecordBatch(500);
} Já no método acima, especificamos o valor 500 para o parâmetro maxRecords, pois é o valor máximo de dados permitidos para envio em cada requisição. Lembrando que este parâmetro simula a quantidade de dados fictícios a serem enviados. Destino final Após a execução dos envios, os dados serão entregues ao Bucket do S3 definido na execução do Terraform e serão organizados conforme a imagem abaixo. Na imagem acima, os dados estão organizados entre ano, mês, dia e hora de envio. Os arquivos vão estar disponíveis dentro das pastas relacionadas a hora (hour=), e perceba a diferença de 5 minutos entre eles na coluna Última modificação, ou seja, seguindo a configuração de Buffer interval conforme falamos nos passos anteriores. Pronto, os dados estão sendo entregues! Destruindo os recursos Caso você esteja acompanhando este Post de forma experimental, creio que seja necessário apagar toda a stack em que criamos para evitar uma conta inesperada da AWS. Podemos executar o seguinte comando via Terraform para destruir os recursos em que criamos. terraform destroy -var 'bucket=digite o nome do seu bucket' Mas antes, apague todos os arquivos e pastas do Bucket, pois caso o contrário, o destroy não vai funcionar, GG? Após a execução do comando acima, você deverá confirmar a destruição, é só digitar yes para confirmar. Caso queria entender mais a fundo sobre Kinesis Firehose, recomendo a leitura na documentação oficial. Código completo no GitHub. É isso, curtiu? Até mais!

Criando uma Lambda AWS usando Python + Deploy

Neste post, iremos criar uma aplicação Lambda AWS usando Python. Será um passo a passo, desde criar o código fonte, que será bem simples, passando pela criação da role e por fim, o deploy da aplicação. Este tutorial será baseado na documentação da AWS, pois, muitas vezes os usuários possuem uma dificuldade em ler uma documentação por talvez ser um pouco verbosa ou até estar em outra língua, por isso, decidi criar este passo a passo baseando-se na própria documentação da AWS e espero que gostem. O que é um Lambda? Lambdas são conhecidos como aplicações serverless, ou seja, são aplicações que não tem a necessidade em configurar um servidor memória, CPU etc. Todo o poder computacional é gerenciado pela própria AWS e isso nos dá a flexibilidade de apenas se preocupar com o código que será executado. Característica de um Lambda O Lambda possui uma característica diferente de uma aplicação tradicional, que tem uma característica de uptime 24/7, para isso, temos que ter em mente a finalidade do seu uso. A característica principal é que um Lambda é executado para uma tarefa específica e após o término, ela basicamente "morre". Isso é muito positivo, pois gera uma boa economia e os recursos são bem mais utilizados. Imagine que você recebeu uma tarefa para criar uma aplicação que enviará emails algumas vezes durante o dia. Faz sentido manter uma aplicação online 100% do tempo para este tipo de tarefa? Você até pode fazer, mas estaria desperdiçando recursos, por na maioria do tempo a aplicação estar ociosa e o tempo de execução da tarefa seria em questão de poucos minutos ou nem isso. Já um Lambda, seria uma abordagem ideal para este cenário. Imagine que os emails devem ser enviados 6 vezes ao dia em um intervalo de 4 horas. A aplicação executaria a tarefa, terminaria e depois de 4 horas executaria novamente. E cada execução seria em torna de 2 minutos. Resumindo, o objetivo da tarefa seria alcançado em pouco tempo e com um baixo custo computacional. Mãos na massa Nos próximos passos, criaremos o nosso código em Python na versão 3.8, em seguida, a role com as permissões necessárias para a execução do Lambda. Vamos anexar a role criada á uma policy, criaremos a aplicação Lambda, e em seguida, vamos compactar o arquivo com o código para o deploy, e por fim, faremos uma invocação para execução da Lambda. Os passos após o código, iremos executar utilizando CLI, para isso faça a instalação do AWS CLI seguindo esta documentação . O CLI é uma das maneiras de interação com a AWS e é bem simples a sua utilização. Existem outras maneiras também de interagir com a AWS para a criação de recursos, o mais comum no mercado é utilizando ferramentas IAC, como Cloudformation e Terraform. Já até criamos um tutorial sobre estes recursos, basta acessar os links abaixo para entender melhor, mas para este tutorial, seguiremos como base a documentação da AWS, utilizando CLI. Criando recursos utilizando Cloudformation Primeiros passos utilizando Terraform na AWS Após a instalação do CLI, configure suas credenciais da AWS executando o comando abaixo via terminal. Caso você não conheça este comando, recomendo a leitura de como utilizar, nesta documentação, aws configure Criando o código da Lambda A seguir, criaremos o código que será executado na aplicação Lambda, será um código muito simples apenas para exemplificar a execução. Utilizaremos Python neste exemplo, mas é possível utilizar várias outras linguagens de programação para a criação da Lambda. Crie um arquivo chamado lambda_function.py e escreva o código abaixo. import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):

name = event['yourname']

message = {'Message': 'hello ' + name +
' ,this is your first '
'Lambda application!' }

logger.info(message) Criando a role O comando a seguir criará a role chamada lambda-default-role. A role é uma forma de dar permissão para um recurso específico executar uma ação específica. Execute via terminal. aws iam create-role --role-name lambda-default-role --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{ "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]}' Após a execução do comando acima, um output como este será mostrado. {
"Role": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
]
},
"RoleId": "AROA5WDL5GTDVI43EJEUOV",
"CreateDate": "2021-09-12T22:11:04Z",
"RoleName": "lambda-default-role",
"Path": "/",
"Arn": "arn:aws:iam::id-da-sua-conta:role/lambda-default-role"
}
} Anexando a role á uma policy Para este passo, basta executar o comando abaixo via terminal aws iam attach-role-policy --role-name lambda-default-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Zipando o arquivo Python Neste passo, vamos compactar o arquivo Python contendo o código que escrevemos nos passos anteriores, para isso, acesse a pasta onde o arquivo se encontra e execute o comando abaixo. zip my-deployment-package.zip lambda_function.py Criando a função Lambda e efetuando o deploy Agora criaremos a função Lambda chamada my-first-lambda e no mesmo comando, vamos anexar o código compactado no passo anterior. Lembre-se que este comando deve ser executado na mesma pasta onde o arquivo foi zipado, ok? aws lambda create-function --function-name my-first-lambda --zip-file fileb://my-deployment-package.zip --handler lambda_function.lambda_handler --runtime python3.8 --role arn:aws:iam::id-da-sua-conta:role/lambda-default-role Entendendo o comando acima aws lambda create-function --function-name my-first-lambda: Comando para criar a função Lambda chamada my-first-lambda --zip-file fileb://my-deployment-package.zip: Anexa o arquivo my-deployment-package.zip na criação da função, onde contém o código fonte. --handler lambda_function.lambda_handler: Esse parâmetro seta a nome da função lambda_handler, ou seja, quando o Lambda for invocado, será está função que será executada. Esta mesma função deve existir no arquivo Python, conforme criamos anteriormente no primeiro passo. --runtime python3.8: Neste parâmetro, passamos como argumento a linguagem em que estamos usando para o Lambda, no nosso caso, Python com a versão 3.8 --role arn:aws:iam::id-da-sua-conta:role/lambda-default-role: Por último, passamos o valor do parâmetro role. O valor deste parâmetro é mostrado no output após a criação da role, no campo Arn, basta copiar e colar. Pronto, agora que você entendeu, basta executar via terminal para a criação definitiva do Lambda. Veja que o Lambda foi criado com sucesso, agora vamos fazer algumas invocações para que o Lambda seja executado. Invocando o Lambda Neste passo, vamos executar o Lambda através de uma invocação manual. Existem diversas outras formas em invocar uma aplicação Lambda. A invocação pode ser feita através de um Event-Bridge que funcionaria como um gatilho, no qual podemos configurar um cron, que poderá invocar o Lambda de tempos pré-determinados, como, minuto a minuto, hora em hora, 1 vez por semana e dentre outras opções. Existem várias outras formas, todas elas são descritas na documentação da AWS, mas para este post, faremos de forma manual, utilizando o CLI. aws lambda invoke --function-name my-first-lambda --payload '{"yourname": "Mônica"}' output.txt Entendendo o comando acima aws lambda invoke --function-name my-first-lambda: Invoca o Lambda my-first-lambda ---payload '{"yourname": "Mônica"}': No payload é onde passamos o valor do input que o Lambda vai processar. Neste caso, estamos passando um JSON com uma informação. O segundo parâmetro, o output.txt, será o arquivo que armazenará informações como, o response do Lambda caso exista ou algum log de erro, caso aconteça. Você pode acessar os logs do Lambda através do Cloudwatch para visualizar melhor os logs que definimos no código Python. Atualizando o código Caso precise alterar o código, refaça o passo em que zipamos o arquivo e em seguida, execute o comando abaixo. aws lambda update-function-code --function-name my-first-lambda --zip-file fileb://my-deployment-package.zip Pronto, agora temos um Lambda! Caso queira ler a documentação da AWS sobre como criar um Lambda, siga esta documentação. É isso, curtiu? Até mais!

Enviando mensagens para fila SQS via Python + boto3

Tenho experimentado Python e tenho me surpreendido com a facilidade no desenvolvimento e sem muitas configurações de projeto. Para este post, vamos criar um exemplo de como criar uma fila SQS e enviar mensagens utilizando Python e boto3, que possui uma grande variedade de métodos para acesso aos recursos da AWS. Instalando boto3 A instalação é bem simples, assegure-se que já exista o Python instalado em seu ambiente. Caso já exista, via terminal, execute o comando a seguir para a instalação do boto3. pip install boto3 Importando boto3 para o projeto Após instalado, vamos escrever o nosso código em partes, começando pelo import da biblioteca boto3 e atribuindo para uma variável sqs e client, conforme imagem abaixo: import boto3
sqs = boto3.resource('sqs')
client = boto3.client('sqs') Criando fila SQS Criando uma function que criará a fila SQS passando alguns parâmetros como, name e attributes. def create_sqs(name, attributes):
sqs.create_queue(QueueName=name, Attributes=attributes) Envio de mensagens para a fila SQS Criando uma function que enviará as mensagens para a fila SQS def send_messages_to_sqs(name, message):
queue = sqs.get_queue_by_name(QueueName=name)
client.send_message(QueueUrl=queue.url, MessageBody=message) No código acima, existe um método chamado get_queue_by_name que no qual precisamos usar para recuperar a url da fila existente, para que seja repassada para o método send_message Executando o código Após criada as functions, vamos executar o código para a criação da fila SQS e o envio das mensagens. if __name__ == '__main__':

attributes = {
'DelaySeconds': '60',
'MessageRetentionPeriod': '600'
}

create_sqs('sqs_messages', attributes)

send_messages_to_sqs('sqs_messages', "Hello, SQS!") Código completo import boto3

sqs = boto3.resource('sqs')
client = boto3.client('sqs')


def create_sqs(name, attributes):
sqs.create_queue(QueueName=name, Attributes=attributes)

def send_messages_to_sqs(name, message):
queue = sqs.get_queue_by_name(QueueName=name)
client.send_message(QueueUrl=queue.url, MessageBody=message)

if __name__ == '__main__':

attributes = {
'DelaySeconds': '60',
'MessageRetentionPeriod': '600'
}

create_sqs('sqs_messages', attributes)

send_messages_to_sqs('sqs_messages', "Hello, SQS!") Simples, certo? É um código prático que permite acessar recursos da AWS com poucas linhas de código. Sou novo em Python e espero trazer mais posts como este, assim vamos aprendendo juntos =) É isso, curtiu? Até mais!

Entendendo o AWS SNS - Simple Notification Service

O SNS (Simple Notification Service), provê um serviço de notificação utilizando o paradigma Pub/Sub. É uma forma de publicar mensagens destinadas a um ou mais inscritos na forma de endpoints. Confuso? Vamos aprofundar um pouco mais sobre o assunto. O termo Pub/Sub é um tema bastante relacionado em arquiteturas guiada a eventos, conhecida tecnicamente como event-driven architecture. Nesta arquitetura a publicação de mensagens podem ser feitas através de notificações para um ou mais destinos já conhecidos, criando uma arquitetura mais assíncrona. Para que um destino se torna conhecido, deve haver uma forma de sinalizar que aquele destino seja um candidato a receber qualquer mensagem da origem, ou seja, o destino é um subscriber (sub) ou inscrito. Mas inscrito aonde? Todo subscriber pode ser inscrito em um ou mais publicadores, no contexto do SNS, seria Tópicos, no qual falaremos mais adiante. Dessa forma, para cada publicação feita, um inscrito naquela publicação, receberá uma mensagem. Um exemplo, é quando recebemos notificações de algum aplicativo instalado no nosso Smartphone via push, ou seja, na instalação daquele aplicativo nos tornamos um inscrito (sub ou assinante), ou seja, para que qualquer publicação feita pelo aplicativo, seremos notificados. Provavelmente este serviço pode utilizar SNS como solução. O exemplo anterior é uma visão de mais alto nível como forma de introdução. O tema é um pouco mais amplo e será abordado a seguir. O SNS é dividido em Tópicos e Assinaturas, ambos trabalham de forma conjunta e oferecem diversos recursos através do próprio console da AWS ou de APIs. 1. Tópicos Os Tópicos são pontos de acesso que funciona como interface entre o Publisher (publicador) e o Subscriber (inscrito). Todo aplicativo deve estar inscrito a um Tópico para que receba notificações, ou seja, é o único ponto de acesso para a comunicação. Um Tópico é dividido entre o tipo Fifo e o Padrão: Fifo: O tipo Fifo permite um controle mais rigoroso de ordenação das mensagens (first in/first out), possui um limite de throughput de até 300 publicações por segundo, garante a entrega da mensagem uma única vez e por fim, fornece suporte somente ao protocolo de assinatura SQS. Padrão: O tipo padrão possui algumas diferenças que o torna mais flexível, porém menos rigoroso se comparado ao Fifo. Começando pela ordenação de mensagens. Este padrão visa uma ordenação de mensagens da maneira mais apropriada, ou seja, não possui uma regra que visa ordenar as mensagens por chegada. O throughput de publicações/segundo é maior que a do tipo Fifo e fornece suporte de protocolos de assinaturas para SQS, Lambda, HTTP, SMS, E-mail e endpoints de aplicativos móveis. Limite de tópicos Por conta da AWS, é permitido criar até 100.000 tópicos 2. Assinaturas A Assinatura é a forma de conectar ou inscrever um endpoint para um Tópico específico. Ou seja, cada Assinatura deve-se especificar um Tópico (existente) e o endpoint em que deseja receber as notificações publicadas pelo Tópico que será assinado. O endpoint é representado por diferentes tipos: AWS SQS HTTP HTTPS AWS Kinesis Data Firehose E-mail SMS AWS Lambda Resumindo, cada endpoint acima, são formatos de entrega/transporte para recebimento de notificações. Limite de Assinaturas A AWS permite até 10 milhões de assinaturas por tópico. 3. Limite de tamanho da mensagem O SNS possui um limite de tamanho de mensagem de até 256 KB. Já as mensagens para SMS são de 140 bytes. 4. Tipos de mensagens O SNS possui suporte para diferentes tipos de mensagens, como por exemplo texto, XML, JSON e texto sem formato. 5. SNS X SQS O SNS e o SQS são coisas diferentes, mas que possuem relação. Como falamos anteriormente, o SQS pode ser utlizado como endpoint, ou seja, um protocolo SQS que assina um Tópico SNS passa a receber qualquer mensagem publicada no Tópico tornando um processo de integração assíncrona. Na imagem acima descreve o contexto do SNS junto aos Tópicos e algumas SQS (subscribers) simulando as assinaturas. Após assinadas, todas estas SQS receberão mensagens publicadas do(s) Tópico(s). A SQS 1 receberá notificações dos Tópicos 1 e 2, a SQS 2 receberá notificações dos Tópicos 2 e 3 e por fim, a SQS 3 receberá somente do Tópico 3. Em breve será liberado um post com alguns exemplos de códigos que te ajudará entender ainda mais sobre o SNS mais a fundo. Caso queira saber mais detalhes, recomendo ler a documentação oficial através deste link. É isso, curtiu? Até mais!

Entendendo o Presto

O Presto é uma ferramenta bem conhecido no mundo Big Data, especificamente na área de Engenharia de dados. O Presto funciona como uma SQL Engine para análise de grandes volumes de dados distribuídos e heterogêneos. É um conceito um pouco subjetivo que vamos entender mais a frente. Presto não substitui MySQL, Oracle, SQL Server e etc, muitas pessoas se confundem com esta ideia. O Presto é um contexto composto por uma engine robusta que possibilita processar um input no padrão ANSI-SQL em que possibilitará um resultado final ao consumidor. Hoje no mercado temos algumas ferramentas/recursos baseados no Presto. Um deles é o conhecido AWS Athena. O Athena é um recurso que permite executar consultas interativas visando análise de dados. Entender o funcionamento do Athena, vai nos ajudar a entender também o funcionamento do Presto. No Athena, um Datasource é dividido por um catalogo de dados externo ou um connector, ou seja, onde os dados estão alocados (Por ser um contexto da AWS, os times escolhem algum bucket do S3). E por fim, um com catalogo de metadados, mais uma vez, naturalmente utiliza-se o Glue, mas também é possível o Apache Hive. Contexto Athena x Presto O que descrevemos anteriormente sobre a um Datasource no Athena é basicamente o contexto do Presto. O Presto é composto por diversos componentes que possibilitam processar uma entrada criando um processo de Query gerenciado por um componente que coordena todo o fluxo, chamado de Coordinator. Este Coordinator tem um papel em gerenciar os nós workers que trabalham em conjunto para entregar o resultado final ao cliente. Da mesma forma que o Athena funciona, o Presto também possui suporte para diferentes catálogos de dados, incluindo o S3 e entre outros. Possui também uma lista extensa de catalogo de metadados e conectores. Uma outra forma de entender o funcionamento e a sua finalidade, seria o seguinte cenário: Imagine um cenário em que é necessário executar uma análise em cima de dados representados por uma grande quantidade de arquivos semi-estruturados. Para uma melhor análise, será utilizado o padrão ANSI-SQL para uma análise mais precisa. Para este contexto, poderíamos adotar o Presto para conectar aos dados, organizar os dados em um catalogo e tabelas e por fim, criar um ecossistema que possibilita analisar os dados. Dentro deste contexto, o usuário poderia executar análises utilizando SQL baseadas nos arquivos em um repositório único, neste caso, um catálogo de dados mais transparente e possibilitando trabalhar com dados heterogêneos. Entendendo melhor o funcionamento do Presto Aqui vamos descrever os principais componentes do Presto, recomendo a leitura da sua documentação original. Overview do contexto Presto 1. Coordinator O Coordinator é o principal servidor e é o responsável por fazer os parses dos statements, queries, retornar o resultado final para o cliente e por gerenciar os workers. O Coordinator é o core de cada instalação, ou seja, uma peça chave para o funcionamento. A comunicação entre o Coordinator e os nós workers são através de API REST. 2. Workers Os Workers são os nós responsáveis por executar as tasks e processar os dados estimulados pelo Coordinator. As comunicações entre os Workers e o Coordinator são feitas através de API REST. 3. Connector O Connector é uma forma de adaptar Presto a um Datasource. É possível utilizar um Connector para ser utilizado junto ao Hive, MySQL, Redshift e entre vários outros. Resumidamente, o Connector funciona como um driver para um banco de dados. 4. Query Execution Model Presto executa SQL statements que são convertidos em Queries que por fim, são executados através de clusters distribuídos e gerenciados pelo coordinator e workers. 5. Statement Uma Statement é basicamente um texto SQL. Presto trabalha com statements seguindo os padrões ANSI-SQL, ou seja, possui expressões, cláusulas e etc. É interessante não confundir uma Statement com Query, existe uma diferença na qual falaremos no próximo tópico. 6. Query A Query é um processo criado a partir de uma Statement, ou seja, uma Statement é passada ao Presto e em seguida convertida para uma Query. A Query é um conjunto de fatores executados através de estágios interconectados e gerenciados pelos Workers. Perceba que existe uma diferença entre Statement (Texto SQL) e Query. A Query é um processo criado através de uma Statement envolvendo alguns componentes. Já a Statement é a representação textual de um script SQL que segue o padrão ANSI-SQL que será passado e processado pelo Presto para um resultado final. Para maiores informações sobre o Presto, recomendo bastante a leitura da doc Este foi um post foi um pouco mais simples onde a ideia foi fazer um overview sobre o contexto do Presto, espero ter ajudado. Até mais.

Listando tabelas via Glue SDK e Java

Utilizar a SDK da AWS é sempre uma boa opção caso precise explorar algum recurso mais a fundo em busca de uma solução. Neste post, iremos explorar um pouco do AWS Glue utilizando SDK. O Glue é uma ferramenta de ETL da AWS, que proporciona um repositório central de metadados, este chamado de Glue Catalog. Resumindo, o Glue Catalog mantém toda a estrutura de bancos e tabelas e seus schemas em um único lugar. A ideia deste post será listar todas as tabelas de um determinado banco de dados existente no Glue Catalog de forma programática utilizando a SDK. Dependências Maven <dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
<version>1.12.13</version>
</dependency> Neste exemplo, estamos utilizando a versão do Java 8 para explorar melhor o uso de Streams na interação. import com.amazonaws.regions.Regions;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.AWSGlueClientBuilder;
import com.amazonaws.services.glue.model.GetTablesRequest;
import com.amazonaws.services.glue.model.GetTablesResult;

public class GlueService {

public static void main(String[] args) {

AWSGlue awsGlue = AWSGlueClientBuilder.standard()
.withRegion(Regions.US_EAST_1)
.build();

GetTablesResult getTablesResult;
GetTablesRequest getTableRequest = new GetTablesRequest();
getTableRequest.setDatabaseName("<database>");


do {

getTablesResult = awsGlue.getTables(getTableRequest);

getTablesResult.getTableList()
.stream()
.forEach(
table ->
System.out.println(table.getName())
);

String token = getTablesResult.getNextToken();
getTableRequest.setNextToken(token);

} while (getTablesResult.getNextToken() != null);
}
} Passo a passo O objeto awsGlue é o responsável pelo acesso ao recurso através das credencias que devem ser configuradas. Neste post não vamos entrar neste detalhe. O objeto getTablesRequest é o responsável por setar os parâmetros de requisição, neste caso, estamos setando o database. O objeto getTablesResult é o responsável por listar as tabelas com base nos parâmetros setados pelo objeto getTablesRequest e também controlar o fluxo do resultado. Perceba que além de retornar as tabelas através do método getTablesResult.getTableList(), este mesmo objeto retorna um token que será explicado melhor no próximo item. O token é representado pelo método getTablesResult.getNextToken(), a ideia do token é controlar o fluxo de resultados, pois todos os resultados são paginados e caso exista token para cada resultado, significa que ainda existem dados a serem retornados. No código, utilizamos uma estrutura de repetição com base na validação da existência do token, ou seja, se ainda existe token, este será setado no objeto getTableRequest através do código getTableRequest.setNextToken(token), para retornar mais resultados. É isso, curtiu? Até mais!

Introdução ao Apache Hive com Spark e Java

O Hive é um software de Data Warehouse que possibilita a leitura, escrita e o gerenciamento de dados distribuídos e permite a utilização de SQL em consultas estruturadas. Vamos utilizar o contexto do Spark para a configuração inicial, mas é possível fazer de outras formas sem a utilização do Spark. Maven <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.5</version>
</dependency> O primeiro passo é criar a configuração do contexto inicial: String dwDir = new File("warehouse-dir").getAbsolutePath();

SparkConf sparkConf = new SparkConf()
.set("spark.sql.warehouse.dir", wareHouseDir);

SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.master("local[1]")
.getOrCreate(); Entendendo as configurações acima: 1. Neste trecho a variável dwDir recebe o caminho da pasta warehouse-dir que será utilizada nas configurações do Spark. Até então essa pasta não foi criada. String dwDir = new File("warehouse-dir").getAbsolutePath(); 2. No próximo trecho é setado a parâmetro spark.sql.warehouse.dir no contexto do Spark com o caminho do diretório referenciado no primeiro trecho. Dessa forma o Spark usará este diretório como o repositório dos bancos que serão criados. SparkConf sparkConf = new SparkConf()
.set("spark.sql.warehouse.dir", wareHouseDir); 3. E por fim, a criação do SparkSession onde estão as configurações criadas anteriormente, o local onde o master será executado (localmente - local[1]) e a ativação do Hive. SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.master("local[1]")
.getOrCreate(); 4. Agora podemos executar algumas operações de DML e DDL 4.1. Criando um banco de dados sparkSession.sql("CREATE DATABASE IF NOT EXISTS hive_tutorial"); 4.2. Criando os Parquets Neste passo iremos criar uma tabela já apontando para o DataSource. Neste caso o DataSource será um Parquet. O Parquet é um arquivo de formato colunar que provê um melhor desempenho nas consultas. Para isso vamos criar um Parquet a partir de um JSON com seguinte o conteúdo: Arquivo data/pessoa.json {"id":1,"nome":"Joao","idade":12}
{"id":2,"nome":"Kelly","idade":21}
{"id":3,"nome":"Monica","idade":29}
{"id":4,"nome":"Laura","idade":32}
{"id":5,"nome":"Kiko","idade":23}
{"id":6,"nome":"Miguel","idade":55}
{"id":7,"nome":"Junior","idade":25}
{"id":8,"nome":"Luis","idade":36} Executando a leitura do arquivo para um DataFrame. Dataset<Row> df = sparkSession.read().json("data/pessoa.json"); Criando os arquivos Parquet com base no DataFrame dentro do diretório data/parquet do seu projeto df.write().parquet("data/parquet/") Veja que os arquivos foram criados Pronto, agora temos um Data Source criado. 4.3. Criando a Tabela Segue os passos: sparkSession.sql("USE hive_tutorial"); Após selecionar o banco HIVE_TUTORIAL. O comando CREATE TABLE possui alguns argumentos extras, segue: STORED AS PARQUET : É um argumento que o Hive utilizará para saber que tipo de arquivo será usado na conversão, neste caso o Parquet. LOCATION: Diretório do Data Source criado anteriormente. sparkSession.sql("CREATE TABLE IF NOT EXISTS pessoa " +
"(id BIGINT, nome STRING, idade BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/'"); É possível verificar a tabela criada executando o trecho abaixo: sparkSession.sql("SHOW TABLES").show(); Independente do fim da execução do programa, a tabela será mantida. Diferente de uma view criada com SparkSQL que é somente mantida em memória. 5. Exemplos de consultas Selecione o banco de dados sparkSession.sql("USE hive_tutorial"); Exemplo 1 sparkSession.sql("SELECT id, nome, idade " +
"FROM hive_tutorial.pessoa " +
"WHERE idade between 10 and 30 " +
"ORDER BY nome desc ").show(); Resultado Exemplo 2 sparkSession.sql("SELECT count(nome) " +
"FROM hive_tutorial.pessoa " +
"WHERE idade > 45 ").show(); Resultado 6. Exemplos de consultas mais complexas Agora vamos criar duas novas tabelas para explorar melhor os recursos do Hive. Crie o arquivo JSON data/produto.json {"id":1,"desc":"video game","preco":1800.0,"tipo":"eletronico"}
{"id":2,"desc":"geladeira","preco":1600.0,"tipo":"eletronico"}
{"id":3,"desc":"cama","preco":2000.0,"tipo":"quarto"}
{"id":4,"desc":"armário","preco":750.0,"tipo":"sala"}
{"id":5,"desc":"notebook","preco":4500.0,"tipo":"eletronico"}
{"id":6,"desc":"mesa","preco":2500.0,"tipo":"sala"}
{"id":7,"desc":"cadeira","preco":110.0,"tipo":"sala"}
{"id":8,"desc":"TV","preco":1500.0,"tipo":"eletronico"}
{"id":9,"desc":"fogão","preco":900.0,"tipo":"cozinha"} Crie os parquets para Produto Dataset<Row> dfP = sparkSession.read().json("data/produto.json");
dfProd.write().parquet("data/parquet/produto/"); Crie o arquivo JSON data/item.json {"id":1,"id_produto":2,"qtde":1}
{"id":2,"id_produto":1,"qtde":2}
{"id":3,"id_produto":3,"qtde":3}
{"id":4,"id_produto":4,"qtde":2}
{"id":5,"id_produto":5,"qtde":5} Crie os parquets para Item Dataset<Row> dfItem = sparkSession.read().json("data/item.json");
dfItem.write().parquet("data/parquet/item/"); Com base nos parquets criado, agora vamos criar a tabelas Produto e Item sparkSession.sql("USE hive_tutorial");

sparkSession.sql("CREATE TABLE IF NOT EXISTS produto " +
"(id BIGINT, desc STRING, " +
"preco BIGINT, " +
"tipo STRING) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/produto'");

sparkSession.sql("CREATE TABLE IF NOT EXISTS item " +
"(id BIGINT, " +
"id_produto BIGINT, " +
"qtde BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/item/'"); Tabelas criadas sparkSession.sql("SHOW TABLES").show(); Consultas utilizando JOIN Exemplo 1 sparkSession.sql("SELECT prod.id, " +
"prod.desc, " +
"prod.preco, " +
"prod.tipo, " +
"item.qtde " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"order by prod.id ").show(); Resultado Exemplo 2 sparkSession.sql(" SELECT " +
"prod.tipo, " +
"sum(item.qtde) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo").show(); Resultado Exemplo 3 sparkSession.sql(" SELECT " +
"prod.tipo, " +
"sum(item.qtde), " +
"sum(item.qtde * prod.preco) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo").show(); Resultado Pra finalizar, dê uma olhada na documentação oficial para mais detalhes: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html https://hive.apache.org/ É isso, espero ter ajudado!

Lendo arquivo CSV com Apache Spark

Apache Spark atua muito bem na leitura de diversos arquivos para extração de dados, nesse post vamos criar um exemplo de leitura de um arquivo CSV utilizando Spark, Java e Maven. Para quem não sabe o que é um CSV, é um arquivo texto que separa as colunas entre ponto e vírgula ( ; ). Maven <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies> Conteúdo do CSV (Crie um arquivo chamado movies.csv com este conteúdo) title;year;rating
The Shawshank Redemption;1994;9.3
The Godfather;1972;9.2
The Dark Knight;2008;9.0
The Lord of the Rings: The Return of the King ;2003;8.9
Pulp Fiction;1994;8.9
Fight Club;1999;8.8
Star Wars: Episode V - The Empire Strikes Back;1980;8.7
Goodfellas;1990;8.7
Star Wars;1977;8.6 Criando SparkSession SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("app");

SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate(); Executando a leitura Dataset<Row> ds = sparkSession.read()
.format("CSV")
.option("sep",";")
.option("inferSchema", "true")
.option("header", "true")
.load("movies.csv");

ds.select("title","year","rating").show(); Resultado Entendendo alguns parâmetros .option("sep", ";"): Define a utilização de um separador padrão para a leitura do arquivo, neste caso o separador é o ponto e vírgula (;) .option("inferSchema", "true"): O parâmetro inferSchema possibilita inferir o(s) arquivo(s) afim de entender (adivinhar) os tipos dos dados de cada campo .option("header", "true"): Habilitar o parâmetro header possibilita utilizar o nome de cada campo definido no cabeçalho do arquivo .load("movies.csv"): movies.csv é o nome do arquivo a ser lido Curtiu? Espero que sim, até mais!

Acessando e modificando o Terraform State

Antes de começar a falar sobre o acesso aos estados, é necessário explicar o que são os estados ou State. O que são os States? O que é o Terraform State? O Terraform State é uma forma do Terraform gerenciar a infra, configurações e os recursos criados afim de manter um mapeamento do que já existe e controlar a atualização e criação de novos recursos. Um exemplo básico é quando criamos um Bucket S3, um EC2 ou uma SQS via Terraform. Todos estes recursos são mapeados no estado e passam a ser gerenciados pelo Terraform. Localização do State Local Por padrão o Terraform aloca o estado localmente no arquivo terraform.tfsate. Utilizar o State localmente pode funcionar bem para um estudo específico no qual não exista a necessidade em compartilhar o State entre times. Remote Ao contrário do Local, quando temos times compartilhando dos mesmos recursos, a utilização do State de forma remota se torna imprescindível. O Terraform provê suporte para que o State possa ser compartilhado de forma remota. Não entraremos em detalhes em como configurar, mas é possível manter o State no Amazon S3, Azure Blob Storage, Google Cloud Storage, Alibaba Cloud OSS e entre outras nuvens. O State é representado pelo arquivo terraform.tfsate, um arquivo no formato JSON, segue um exemplo de um Bucket S3 mapeando no State: {
"version": 4,
"terraform_version": "0.12.3",
"serial": 3,
"lineage": "853d8b-6ee1-c1e4-e27e-e10",
"outputs": {},
"resources": [
{
"mode": "managed",
"type": "aws_s3_bucket",
"name": "s3_bucket_xpto",
"provider": "provider.aws",
"instances": [
{
"schema_version": 0,
"attributes": {
"acceleration_status": "",
"acl": "private",
"arn": "arn:aws:s3:::bucket.xpto",
"bucket": "bucket.xpto",
"bucket_domain_name": "bucket.xpto",
"bucket_prefix": null,
"bucket_regional_domain_name": "bucket.xpto",
"cors_rule": [],
"force_destroy": false,
"grant": [],
"hosted_zone_id": "Z3NHGSIKTF",
"id": "bucket.xpto",
"lifecycle_rule": [],
"logging": [],
"object_lock_configuration": [],
"policy": null,
"region": "us-east-1",
"replication_configuration": [],
"request_payer": "BucketOwner",
"server_side_encryption_configuration": [],
"tags": {
"Environment": "staging"
},
"versioning": [
{
"enabled": false,
"mfa_delete": false
}
],
"website": [],
"website_domain": null,
"website_endpoint": null
},
"private": "UJbhV=="
}
]
}
]
} Acessando e alterando o State Apesar do State estar alocado em um arquivo JSON, não é recomendado a alteração direta no arquivo. O Terraform provê a utilização do comando Terraform state executado via CLI para que pequenas modificações sejam efetuadas. Através do CLI, podemos executar comandos afim de manipular o State, segue o uso: terraform state <subcommand> [options] [args] Sub-comandos: list Lista os recursos no estado mv Move um item no estado pull Extrai o estado atual e lista o resultado no stdout push Atualiza um estado remoto de um arquivo de estado local rm Remove instância do estado show Mostra recursos do estado 1. Listando os recursos do State Comando: terraform state list O comando acima possibilita listar os recursos que estão sendo gerenciados pelo State Exemplo: $ terraform state list
aws_s3_bucket.s3_bucket
aws_sqs_queue.sqs-xpto No exemplo acima, temos um Bucket S3 e um SQS que foram criados via terraform e que estão sendo gerenciados pelo State. 2. Visualizando um recurso e seus atributos Comando: terraform state show [options] ADDRESS O comando acima possibilita mostrar em detalhes um recurso específico e seus atributos Exemplo: $ terraform state show aws_sqs_queue.sqs-xpto
# aws_sqs_queue.sqs-xpto:
resource "aws_sqs_queue" "sqs-xpto" {
arn = "arn:aws:sqs:sqs-xpto"
content_based_deduplication = false
delay_seconds = 90
fifo_queue = false
id = "https://sqs-xpto"
kms_data_key_reuse_period_seconds = 300
max_message_size = 262144
message_retention_seconds = 345600
name = "sqs-xpto"
receive_wait_time_seconds = 10
tags = {
"Environment" = "staging"
}
visibility_timeout_seconds = 30
} 3. Removendo recursos do State Comando: terraform state rm [options] ADDRESS... O comando acima remove um ou mais items do State. Diferente de um terraform destroy, que remove o recurso do State e os objetos remotos criados na nuvem. Exemplo: $ terraform state rm aws_sqs_queue.sqs-xpto E aí, Curtiu? Até mais!

Java: Streams API - findFirst()

Java 8 Streams introduziu diferentes métodos para manipular coleções. Um destes métodos é o findFirst(), que permite retornar o primeiro elemento de uma Stream através de uma instância Optional. Na prática List<String> names =
Arrays.asList("Monica Souza",
"Andre Silva",
"Elisa Santos",
"Adriano Silva");

Optional<String> opt =
names.stream()
.findFirst();

System.out.println("Item:" + opt.get()); Resultado Item: Monica Souza Utilizando filter Optional<String> opt =
names.stream()
.filter(f -> f.contains("Silva"))
.findFirst();

System.out.println("Item:" + opt.get()); Resultado Item: Andre Silva Perceba que retornou o primeiro nome com sobrenome Silva da coleção. A não utilização de Streams Se utilizarmos o modo tradicional, ou seja, sem a utilização de Streams. O código ficaria assim, filtrando pelo sobrenome "Silva" for(String item : list){
if(item.contains("Silva")){
break;
}
} Neste caso dependemos do break para encerrar a execução.

Primeiros passos com Delta Lake

O que é o Delta Lake? O Delta Lake é um projeto open-source que visa gerenciar a camada de armazenamento (Storage Layer) que segundo o seu conceito: "traz confiabilidade para Datalakes". Na prática é uma abstração do Apache Spark reutilizando os mesmos mecanismos mas oferencendo recursos extras interessantes. Dentre eles, suporte para transações ACID. Todos sabem que manter a integridade dos dados em uma pipeline de dados é uma tarefa crítica diante do alta concorrência de leitura e escrita de dados. Neste caso, o ACID possibilita gerenciar ambientes como estes. Dentre outras vantagens, o Delta Lake provê histórico de auditoria, versionamento de dados, suporta operações DML do tipo delete e update e vários outras vantagens. Para este tutorial, vamos simular uma pipeline de dados simples de forma local, com o foco no que o Delta Lake pode nos oferecer de vantagens. Na etapa de ingestão, vamos carregar um Spark DataFrame a partir de um JSON, criaremos uma view temporária para nos auxiliar, criaremos uma tabela Delta via SQL e por fim, utilizaremos o Delta Lake para executar algumas operações de DML utilizando a view que será criada. Vamos utilizar Java e Maven para gerenciar as dependências, além de Spark e Hive no qual este, vai nos auxiliar em manter a tabelas em um catálogo de dados. Maven <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.1</version>
</dependency>

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies> O código será desenvolvido em pequenos trechos para um melhor entendimento. Configurando Spark com Delta e Hive String val_ext="io.delta.sql.DeltaSparkSessionExtension";
String val_ctl="org.apache.spark.sql.delta.catalog.DeltaCatalog";

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("app");
sparkConf.setMaster("local[1]");
sparkConf.set("spark.sql.extensions",var_ext);
sparkConf.set("spark.sql.catalog.spark_catalog",val_ctl);

SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate(); Entendendo o trecho acima Definimos duas variáveis val_ext e val_ctl atribuindo os valores para as chaves (spark.sql.extensions e spark.sql.catalog.spark_catalog) . Estas, necessárias para a configuração do Delta junto com o Spark Demos o nome do contexto do Spark de app Como não estamos rodando o Spark em um cluster, o master está configurado para rodar local local[1] O Spark tem suporte para o Hive, nesse caso o habilitamos no trecho enableHiveSupport( ) Ingestão dos Dados Vamos trabalhar com Spark Dataframe como fonte dos dados. Carregaremos um Dataframe a partir de um JSON. Arquivo order.json {"id":1, "date_order": "2021-01-23", "customer": "Jerry", "product": "BigMac", "unit": 1, "price": 8.00}
{"id":2, "date_order": "2021-01-22", "customer": "Olivia", "product": "Cheese Burguer", "unit": 3, "price": 21.60}
{"id":3, "date_order": "2021-01-21", "customer": "Monica", "product": "Quarter", "unit": 2, "price": 12.40}
{"id":4, "date_order": "2021-01-23", "customer": "Monica", "product": "McDouble", "unit": 2, "price": 13.00}
{"id":5, "date_order": "2021-01-23", "customer": "Suzie", "product": "Double Cheese", "unit": 2, "price": 12.00}
{"id":6, "date_order": "2021-01-25", "customer": "Liv", "product": "Hamburger", "unit": 1, "price": 2.00}
{"id":7, "date_order": "2021-01-25", "customer": "Paul", "product": "McChicken", "unit": 1, "price": 2.40} Criando Dataframe Dataset<Row> df = sparkSession.read().json("datasource/");
df.createOrReplaceGlobalTempView("order_view"); Entendendo o trecho acima No trecho anterior, estamos criando um Dataframe a partir do arquivo JSON que está dentro do diretório datasource/ , crie este diretório para que a estrutura do seu código fique mais compreensiva e em seguida, crie o arquivo order.json com base no conteúdo mostrado anteriormente. Por último, criamos um view temporária que vai nos auxiliar mais a frente nos próximos passos. Criando a Tabela Delta (Delta Table) Vamos criar a Delta Table a partir de um script SQL. A princípio a criação é simples, mas perceba que usamos tipagens diferentes de uma tabela utilizada em um banco relacional. Como por exemplo, utilizamos STRING ao invés de VARCHAR e assim por diante. Estamos particionando a tabela pelo campo date_order. Este campo foi escolhido como partição pois acreditamos que haverá diferentes datas. Dessa forma, as consultas podem utilizar este campo como filtro, visando um melhor desempenho. E por fim, definimos a tabela como Delta Table a partir do trecho USING DELTA. String statement =
"CREATE OR REPLACE TABLE orders (" +
"id STRING, " +
"date_order STRING," +
"customer STRING," +
"product STRING," +
"unit INTEGER," +
"price DOUBLE) " +
"USING DELTA " +
"PARTITIONED BY (date_order) ";

sparkSession.sql(statement); Entendendo o trecho acima No trecho anterior estamos criando uma tabela Delta chamada orders e em seguida executamos a criação. Operações DML Delta suporta operações como Delete, Update e Upsert utilizando Merge Utilizando Merge junto com Insert e Update Neste passo, vamos executar um Merge que possibilita controlar o fluxo de inserção e atualização de dados através de uma tabela, Dataframe ou view. O Merge trabalha a partir de row matches que ficará mais compreensível no trecho seguinte. String mergeStatement = "Merge into orders " +
"using global_temp.order_view as orders_view " +
"ON orders.id = orders_view.id " +
"WHEN MATCHED THEN " +
"UPDATE SET orders.product = orders_view.product," +
"orders.price = orders_view.price " +
"WHEN NOT MATCHED THEN INSERT * ";

sparkSession.sql(mergeStatement); Entendendo o trecho acima No trecho acima estamos executando o Merge a partir da view order_view criada nos passos anteriores. No mesmo trecho temos uma condição orders.id = orders_view.id que vai auxiliar nos matches seguintes. Caso a condição anterior seja verdadeira, ou seja o MATCHED seja verdadeiro. Os dados serão atualizados. Caso a condição não seja verdadeira, NOT MATCHED. Os dados serão inseridos. No caso acima, os dados serão inseridos, pois até então não existia dados na tabela orders. Execute o comando abaixo para visualizar os dados inseridos. sparkSession.sql("select * from orders").show(); Atualize o arquivo datasource/order.json alterando o campo product, price e execute todos os trechos novamente. Você verá que todos os registros serão atualizados. Comando Update É possível executar Update sem a necessidade de usar o Merge, basta executar o comando abaixo: String updateStatement = "update orders " +
"set product = 'Milk-Shake' " +
"where id = 2";

sparkSession.sql(updateStatement); Comando Delete String deleteStatement = "delete from pedidos where id = 2";
sparkSession.sql(deleteStatement); Além de poder executar o comando Delete, é possível utilizar este comando junto ao Merge. Entendendo o Delta Lake Transaction Log (DeltaLog) Além de dar suporte a transações ACID, o delta gera alguns arquivos JSON que servem como uma forma de auditar e manter o histórico de cada transação, desde comandos DDL e DML Com este mecanismo é possível até voltar em um estado específico da tabela caso necessário. Para cada transação executada um arquivo JSON é criado dentro da pasta _delta_log. O arquivo inicial sempre será 000000000.json, contendo os commits da transação. Neste nosso cenário, este primeiro arquivo contém os commits da criação da tabela orders. Para visualizar melhor, acesse a pasta local que provavelmente foi criada na raiz do seu projeto chamada spark-warehouse. Esta pasta foi criada pelo Hive para alocar os recursos criados desde os arquivos JSON e os parquets. Dentro dela terá uma estrutura de pastas conforme imagem abaixo: Perceba que os arquivos são criados em ordem crescente a partir de cada transação executada. Acesse cada arquivo JSON e verá cada transação que foi executado através do campo operation, além de outras informações. 00000000000000000000.json "operation":"CREATE OR REPLACE TABLE" 00000000000000000001.json "operation":"MERGE" 00000000000000000002.json "operation":"UPDATE" 00000000000000000003.json "operation":"DELETE" Perceba também que os arquivos parquets foram gerados particionados em pastas pelo campo date_order. Utilizar partição visa consultas com melhores desempenho, mas não entraremos em detalhes neste post, futuramente falaremos mais sobre isso. Espero que neste post foi possível esclarecer algumas dúvidas sobre o que é o Delta Lake e seu funcionamento. Até a próxima!