My Items

I'm a title. ​Click here to edit me.

Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes

Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes

There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent. In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode: PERMISSIVE DROPMALFORMED FAILFAST CSV file content This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema. "type","country","city","engines","first_flight","number_built"
"Airbus A220","Canada","Calgary",2,2013-03-02,179
"Airbus A220","Canada","Calgary","two",2013-03-02,179
"Airbus A220","Canada","Calgary",2,2013-03-02,179
"Airbus A320","France","Lyon","two",1986-06-10,10066
"Airbus A330","France","Lyon","two",1992-01-02,1521
"Boeing 737","USA","New York","two",1967-08-03,10636
"Boeing 737","USA","New York","two",1967-08-03,10636
"Boeing 737","USA","New York",2,1967-08-03,10636
"Airbus A220","Canada","Calgary",2,2013-03-02,179 Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate. Creating a Dataframe using PERMISSIVE mode The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":

spark = SparkSession.builder \
.master("local[1]") \
.appName("spark-app") \
.getOrCreate()

schema = StructType([
StructField("TYPE", StringType()),
StructField("COUNTRY", StringType()),
StructField("CITY", StringType()),
StructField("ENGINES", IntegerType()),
StructField("FIRST_FLIGHT", StringType()),
StructField("NUMBER_BUILT", IntegerType())
])

read_df = spark.read \
.option("header", "true") \
.option("mode", "PERMISSIVE") \
.format("csv") \
.schema(schema) \
.load("airplanes.csv")

read_df.show(10) Result of PERMISSIVE mode Creating a Dataframe using DROPMALFORMED mode The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list. from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":

spark = SparkSession.builder \
.master("local[1]") \
.appName("spark-app") \
.getOrCreate()

schema = StructType([
StructField("TYPE", StringType()),
StructField("COUNTRY", StringType()),
StructField("CITY", StringType()),
StructField("ENGINES", IntegerType()),
StructField("FIRST_FLIGHT", StringType()),
StructField("NUMBER_BUILT", IntegerType())
])

read_df = spark.read \
.option("header", "true") \
.option("mode", "DROPMALFORMED") \
.format("csv") \
.schema(schema) \
.load("airplanes.csv")

read_df.show(10) Result of DROPMALFORMED mode After execution it's possible to realize that the corrupted records aren't available at Dataframe. Creating a Dataframe using FAILFAST mode Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records. from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == "__main__":

spark = SparkSession.builder \
.master("local[1]") \
.appName("spark-app") \
.getOrCreate()

schema = StructType([
StructField("TYPE", StringType()),
StructField("COUNTRY", StringType()),
StructField("CITY", StringType()),
StructField("ENGINES", IntegerType()),
StructField("FIRST_FLIGHT", StringType()),
StructField("NUMBER_BUILT", IntegerType())
])

read_df = spark.read \
.option("header", "true") \
.option("mode", "FAILFAST") \
.format("csv") \
.schema(schema) \
.load("airplanes.csv")

read_df.show(10) Result of FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Cool? I hope you enjoyed it!

Integração Salesforce com AWS AppFlow e S3

Integração Salesforce com AWS AppFlow e S3

É muito comum que Devs Salesforce precisem integrar dados do CRM com um banco de dados ou data lake por exemplo de uma empresa. Quando o volume desses dados são altos, fazer essa integração via REST pode não ser a forma mais elegante, até porque o próprio Salesforce possui algumas travas que dificultam por exemplo enviar um payload com uma lista de objetos para integração. O passo a passo a seguir mostra como podemos enviar dados de uma API qualquer para o Salesforce ou do Salesforce para qualquer API utilizando o serviço AppFlow da AWS para a orquestração e S3 para integração. Envio de dados do Salesforce para S3. Vamos criar um objeto simples para simular a integração. No Salesforce vá em Setup e procure por Integrations >> Platform Events e crie um novo evento. Na seção Custom Fields, vamos criar apenas dois campos para exemplificar, ID e Name. Agora, vamos focar em algumas tarefas no painel AWS. Procure pelo serviço AppFlow e crie um novo Fluxo. Dê um nome para o Fluxo e clique em próximo. No segundo passo, você deverá selecionar a origem do evento. Nesse caso, dentre as diversas opções você encontrará “Salesforce”. Após selecionar, você precisará especificar a conexão. Faça login na conta de sua preferencia (Production ou Sandbox) e prossiga. Ainda nesse formulário de origem, selecione o evento criado no Salesforce no primeiro passo deste tutorial. No formulário de destino, selecione um bucket do S3 onde deseja que os arquivos de integração sejam salvos e prossiga. Na próxima tela, faça o mapeamento dos campos do Evento que você deseja salvar no arquivo S3. Clique em próximo no Passo 4 e em Criar Fluxo no passo 5. Será exibida uma mensagem que o Fluxo foi criado. Por fim, clique em Ativar Fluxo. Pronto!!! O Fluxo já está com status ativo. Agora vamos escrever um simples teste no Salesforce para enviar um evento e verificar o funcionamento do nosso fluxo criado acima. Abra o Developer Console do Salesforce, vá em Debug e selecione Open Execute Anonymous Window. Vamos publicar um simples evento como o exemplo abaixo: EventTest__e myEvent = new EventTest__e();
myEvent.ID__c = 1;
myEvent.Name__c = 'My First Event';

EventBus.publish(myEvent); E… PRONTO! Agora é só verificar o S3 e você irá encontrar uma pasta com o nome do Fluxo e os arquivos do evento na pasta. Nessa situação você pode conectar qualquer API do seu ambiente externo ao Salesforce e consumir esses dados. Envio de Dados S3 para Salesforce E se podemos usar o S3 para consumir os dados do Salesforce por uma API externa, também podemos usá-lo para enviar dados para o CRM. Nesse caso faremos da seguinte forma. Dentro do mesmo bucket S3 do exemplo anterior, criaremos uma pasta chamada ingest que usaremos para enviar os arquivos com os dados que queremos integrar com o Salesforce. Agora, voltamos ao AppFlow e vamos criar um novo fluxo. No próximo passo, selecionamos o bucket e a pasta criada e ainda o formato de dados JSON. Em seguida, informamos o destino desses dados, selecionamos Salesforce, a sandbox e o objeto onde será inserido os dados. Nesse caso, criamos um objeto customizado no Salesforce chamado S3EventIngest. Nesse primeiro momento, deve ser adicionado a pasta especificada acima “ingest”, um arquivo JSON contendo um mapeamento DE-PARA relacionando os atributos do Json que será consumido pelo fluxo e os campos referentes ao salesforce. No caso, adicionamos o um arquivo.json com o seguinte código: {"ID":"ID__c","Name":"Name__c"} No próximo passo, faça o mapeamento dos atributos JSON que serão enviados no arquivo para o S3 para os campos do objeto Salesforce. Com o mapeamento feito, seguiremos os próximos passos sem fazer nenhuma modificação até que o fluxo seja concluído e criado. É possível configurar triggers para que o fluxo seja executado de forma periódica, mas aqui vamos apenas executar manualmente. Depois de criado, podemos fazer um teste simples. Para isso, vamos EXCLUIR o arquivo da pasta ingest que adicionamos anteriormente e adicionar um arquivo com registros reais de fato. Nesse caso, vamos adicionar a pasta ingest um arquivo com o seguinte json: [
{"ID":779,"Name":"My Event Json 1"},
{"ID":780,"Name":"My Event Json 2"},
{"ID":781,"Name":"My Event Json 3"},
{"ID":782,"Name":"My Event Json 4"}
] Após carregar o arquivo no S3, volte ao fluxo e clique no botão “Executar Fluxo” Espere alguns segundos e você verá uma mensagem de sucesso e também pode consultar o histórico de execução com os últimos processamentos. Além disso, consulte também no Salesforce e você verá os dados inseridos. É isso pessoal, espero ter ajudado!!

Working with Schemas in Spark Dataframes using PySpark

Working with Schemas in Spark Dataframes using PySpark

What's a schema in the Dataframes context? Schemas are metadata that allows working with a standardized Data. Well, that was my definition about schemas but we also can understanding schemas as a structure that represents a data context or a business model. Spark enables using schemas with Dataframes and I believe that is a good point to keep data quality, reliability and we also can use these points to understand the data and connect to the business. But if you know a little more about Dataframes, working with schema isn't a rule. Spark provides features that we can infer to a schema without defined schemas and reach to the same result, but depending on the data source, the inference couldn't work as we expect. In this post we're going to create a simple Dataframe example that will read a CSV file without a schema and another one using a defined schema. Through examples we'll can see the advantages and disadvantages. Let's to the work! CSV File content "type","country","engines","first_flight","number_built"
"Airbus A220","Canada",2,2013-03-02,179
"Airbus A320","France",2,1986-06-10,10066
"Airbus A330","France",2,1992-01-02,1521
"Boeing 737","USA",2,1967-08-03,10636
"Boeing 747","USA",4,1969-12-12,1562
"Boeing 767","USA",2,1981-03-22,1219 If you noticed in the content above, we have different data types. We have string, numeric and date column types. The content above will be represented by airliners.csv in the code. Writing a Dataframe without Schema from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession.builder \
.master("local[1]") \
.appName("schema-app") \
.getOrCreate()

air_liners_df = spark.read \
.option("header", "true") \
.format("csv") \
.load("airliners.csv")

air_liners_df.show()
air_liners_df.printSchema() Dataframe/Print schema result It seems that worked fine but if you look with attention, you'll realize that in the schema structure there are some field types that don't match with their values, for example fields like number_built, engines and first_flight. They aren't string types, right? We can try to fix it adding the following parameter called "inferSchema" and setting up to "true". from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession.builder \
.master("local[1]") \
.appName("schema-app") \
.getOrCreate()

air_liners_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.format("csv") \
.load("airliners.csv")

air_liners_df.show()
air_liners_df.printSchema() Dataframe/Print schema result Even inferring the schema, the field first_flight keeping as a string type. Let's try to use Dataframe with a defined schema to see if this details will be fixed. Writing a Dataframe with Schema Now it's possible to see the differences between the codes. We're adding an object that represents the schema. This schema describes the content in CSV file, you can note that we have to describe the column name and type. from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField

if __name__ == "__main__":

spark = SparkSession.builder \
.master("local[1]") \
.appName("schema-app") \
.getOrCreate()

StructSchema = StructType([
StructField("type", StringType()),
StructField("country", StringType()),
StructField("engines", IntegerType()),
StructField("first_flight", DateType()),
StructField("number_built", IntegerType())
])

air_liners_df = spark.read \
.option("header", "true") \
.format("csv") \
.schema(StructSchema) \
.load("airliners.csv")

air_liners_df.show()
air_liners_df.printSchema() Dataframe/Print schema result After we defined the schema, all the field types match with their values. This shows how important is to use schemas with Dataframes. Now it's possible to manipulate the data according to the type with no concerns. Cool? I hope you enjoyed it!

Differences between External and Internal tables in Hive

Differences between External and Internal tables in Hive

There are two ways to create tables in the Hive context and this post we'll show the differences, advantages and disadvantages. Internal Table Internal tables are known as Managed tables and we'll understand the reason in the following. Now, let's create an internal table using SQL in the Hive context and see the advantages and disadvantages. create table coffee_and_tips_table (name string, age int, address string) stored as textfile; Advantages To be honest I wouldn't say that it's an advantage but Internal tables are managed by Hive Disadvantages Internal tables can't access remote storage services for example in clouds like Amazon AWS, Microsoft Azure and Google Cloud. Dropping Internal tables all the data including metadata and partitions will be lost. External Table External tables has some interesting features compared to Internal tables and it's a good and recommended approach when we need to create tables. In the script below you can see the difference between Internal table creation and External table related to the last section. We just added the reserved word external in the script. create external table coffee_and_tips_external (name string, age int, address string) stored as textfile; Advantages The data and metadata won't be lost if drop table External tables can be accessed and managed by external process External tables allows access to remote storage service as a source location Disadvantages Again, I wouldn't say that it's a disadvantage but if you need to change schema or dropping a table, probably you'll need to run a command to repair the table as shown below. msck repair table <table_name> Depending on the volume, this operation may take some time to complete. To check out a table type, run the following command below and you'll see at the column table_type the result. hive> describe formatted <table_name> That's it, I hope you guys enjoy it! References: https://hive.apache.org/

How to generate random Data using Datafaker lib

How to generate random Data using Datafaker lib

Sometimes in our projects we have to fill Java objects for unit tests or even to create a database dump with random data to test a specific feature and etc. We need to be creative trying to create names, street names, cities or documents. There's an interesting and helpful Java library called Datafaker that allows to create random data with a large number of providers. Providers are objects based on a context, for example: If you want to generate data about a Person object, there's a specific provider for this context that will generate name, last name and etc. If you need to create a unit test that you need data about address, you'll find it. In this post we'll create some examples using Maven but the library also provides support for Gradle projects. Maven <dependency>
<groupId>net.datafaker</groupId>
<artifactId>datafaker</artifactId>
<version>1.1.0</version>
</dependency> Generating Random Data Let's create a simple Java class that contains some properties like name, last name, address, favorite music genre and food. public class RandomPerson {

public String firstName;
public String lastName;
public String favoriteMusicGenre;
public String favoriteFood;
public String streetAddress;
public String city;
public String country;

@Override
public String toString() {

return "firstName=" + firstName + "\n" +
"lastName=" + lastName + "\n" +
"favoriteMusicGenre="+favoriteMusicGenre + "\n" +
"favoriteFood=" + favoriteFood + "\n" +
"streetAddress=" + streetAddress + "\n" +
"city=" + city + "\n" +
"country=" + country ;
}

static void print(RandomPerson randomPerson){
System.out.println(
randomPerson
);
}
} In the next step we'll fill an object using the providers that we quote in the first section. First of all, we create an object called randomData that represents Faker class. This class contains all the providers in the example below. public static void main(String[] args) {

Faker randomData
= new Faker();

RandomPerson randomPerson
= new RandomPerson();

randomPerson.firstName
= randomData.name().firstName();

randomPerson.lastName
= randomData.name().lastName();

randomPerson.favoriteMusicGenre
= randomData.music().genre();

randomPerson.favoriteFood
= randomData.food().dish();

randomPerson.streetAddress
= randomData.address().streetAddress();

randomPerson.city
= randomData.address().city();

randomPerson.country
= randomData.address().country();

print(randomPerson);
} After the execution, we can see the results like this at the console: Result firstName=Dorthy lastName=Jones favoriteMusicGenre=Electronic favoriteFood=Cauliflower Penne streetAddress=7411 Darin Gateway city=Gutkowskifort country=Greece Every execution will be a new result because of providers are randoms. Another interesting feature is that we can set up the Locale when instantiate an object. Faker randomData
= new Faker(Locale.JAPANESE); See the results based on Local.JAPANESE: Result firstName=航 lastName=横山 favoriteMusicGenre=Non Music favoriteFood=French Fries with Sausages streetAddress=418 美桜Square city=南斉藤区 country=Togo Isn't a cool library!? See you!

Creating a Java code using Builder pattern

Creating a Java code using Builder pattern

If you're using a language that supports oriented object in your project, probably there's some lines of codes with Builder pattern. If not, this post will help you to understand about it. What's Builder Pattern? Builder Pattern belongs to an area in Software Engineer called Design Patterns, the idea behind of a pattern is to solve commons problems in your project following best practices. Builder Pattern is very useful when we need to provide a better solution in the creational objects part in our project. Sometimes we need to instantiate an object with a lot of parameters and this could be a problem if you pass a wrong parameter value. Things like this happen every time and results in bugs and you will need to find out where's the issue and maybe, refactoring code to improve it. Let's write some lines of code to see how does Builder Pattern works and when to apply it. The code below is an example of a traditional Class with constructor used to load values when the object instantiated. public class PersonalInfo {

private final String firstName;
private final String lastName;
private final Date birthDate;
private final String address;
private final String city;
private final String zipCode;
private final String state;
private final int population;

public PersonalInfo(String firstName, String lastName,
Date birthDate, String address,
String city, String zipCode,
String state, int population){

this.firstName = firstName;
this.lastName = lastName;
this.birthDate = birthDate;
this.address = address;
this.city = city;
this.zipCode = zipCode;
this.state = state;
this.population = population;
}
} And now, we can instantiate the object simulating the client code. PersonalInfo personalInfo
= new BuilderPattern("Mônica", "Avelar",
new Date(), "23 Market Street",
"San Francisco", "94016",
"CA", 800000); If you notice, to instantiate the object we should pass all the values related to each property of our class and there's a big chance to pass a wrong value. Another disadvantage of this approach is the possibility to not scale it. In this example we have a few properties but tomorrow we can add more properties and the disadvantage becomes clearer. Working with Builder Pattern Let's rewrite the code above to the Builder Pattern and see the differences. public class PersonalInfo {

private final String firstName;
private final String lastName;
private final Date birthDate;
private final String address;
private final String city;
private final String zipCode;
private final String state;
private final int population;

public static class Builder {

private String firstName;
private String lastName;
private Date birthDate;
private String address;
private String city;
private String zipCode;
private String state;
private int population;

public Builder firstName(String value) {
firstName = value;
return this;
}

public Builder lastName(String value) {
lastName = value;
return this;
}

public Builder birthDate(Date value) {
birthDate = value;
return this;
}

public Builder address(String value) {
address = value;
return this;
}

public Builder city(String value) {
city = value;
return this;
}

public Builder zipCode(String value) {
zipCode = value;
return this;
}

public Builder state(String value) {
state = value;
return this;
}

public Builder population(int value) {
population = value;
return this;
}

public BuilderPattern build() {
return new BuilderPattern(this);
}
}

public PersonalInfo(Builder builder){

firstName = builder.firstName;
lastName = builder.lastName;
birthDate = builder.birthDate;
address = builder.address;
city = builder.city;
zipCode = builder.zipCode;
state = builder.state;
population = builder.population;
}
} If you compare both codes you will conclude that the first one is smaller and better to understand than the second one and I agree it. The advantage of the usage is going to be clear for the next example when we create an object based on Builder pattern. Simulating client code using Builder Pattern PersonalInfo personalInfo =
new Builder()
.firstName("Mônica")
.lastName("Avelar")
.birthDate(new Date())
.address("23 Market Street")
.city("San Francisco")
.zipCode("94016")
.state("CA")
.population(80000)
.build(); This last example of creation object using Builder Pattern turns an organized code following the best practices and easy to read. Another advantage of Builder is that we can identify each property before passing values. To be honest I've been using Builder Pattern in my projects and I strongly recommend you do it the same in your next projects. There's an easier way to implement Builder pattern in projects nowadays and I'll write a post about it, see you soon!

Versionamento de Banco de Dados utilizando Flyway

Versionamento de Banco de Dados utilizando Flyway

O versionamento de dados em banco de dados é uma ferramenta muito poderosa, facilitando a gestão das mudanças do banco, além de ajudar a contornar cenários de crise. No mundo Java existem duas ferramentas bastante utilizadas, o Liquibase e o Flyway, para este Post utilizaremos o Flyway. Sobre o Flyway O Flyway trabalha com um conceito de migrations, basicamente uma migration é uma script SQL que será executado dentro do banco de dados. Caso ocorra algum erro, o próprio flyway se encarregará de fazer o rollback daquele script SQL. A vantagem de utilizar tal sistema é que os desenvolvedores podem ter uma forma única de criação e alteração das estruturas do banco de dados. Sendo assim, caso alguém crie uma nova tabela ou um novo índice de tabela, outras pessoas podem pegar essa alteração e aplicar em seu banco local. Além disso, suas migrations se tornam uma espécie de documentação do seu banco de dados, refletindo tudo que foi acontecendo ao longo do tempo. Para fazer essas mudanças, o flyway utiliza uma tabela de versionamento (por padrão, a tabela flyway_schema_history) onde ele verifica qual foi a última modificação executada e qual foi o hash do script executado. Essa verificação do hash é muito importante porque é a forma que a ferramenta possui para controlar as modificações em um script já executado, avisando a quem esta executando para revisar aquela migration tendo em vista que pode ter ocorrido alguma corrupção de dados ou se o script foi alterado. Instalação Existem várias formas de utilizar o Flyway, podendo ser dentro da aplicação Java, Maven, Gradle ou linha de comando. Para este Post, iremos explorar a linha de comando. Faça a instalação de acordo com o seu sistema operacional acessando este link https://flywaydb.org/download/community Para verificar se foi instalado corretamente, execute o comando abaixo em seu terminal: * Você deve estar dentro da pasta descompactada caso não tenha adicionado a pasta a seu PATH flyway -v Logo após a instalação, basta clonar o projeto base no seguinte repositório no Github . Execução Para executar as migrations é necessário informar ao Flyway alguns parâmetros para que ele possa se conectar ao banco de dados: Via terminal, execute o comando abaixo passando os parâmetros necessários como a URL, usuário e senha. flyway migrate -url=... -user=... -password=... Esse usuário informado precisa de ter acesso e permissão para o comandos listados nas migrations, ok? Também é possível criar um arquivo de configuração flyway.conf dentro do projeto para que não seja necessário informar os dados de: url , user e password. Lembrando que o arquivo deve estar na raiz do projeto, conforme o projeto base indicado no artigo. É isso, curtiu? Até mais!

Criando Alarmes com AWS CloudWatch

Criando Alarmes com AWS CloudWatch

A utilização de alarmes é um requisito imprescindível quando se trabalha com diversos recursos na nuvem. É uma das formas mais eficientes em monitorar e entender o comportamento de uma aplicação caso as métricas estejam diferentes do esperado. Neste post, vamos criar um alarme do zero utilizando AWS CloudWatch. Existem diversas outras ferramentas que nos possibilita configurarmos alarmes, mas quando se trabalha com AWS, configurar alarmes utilizando CloudWatch é bem simples e rápido. Let's do this! Primeiramente, precisamos escolher algum recurso no qual vamos monitorar, para simplificar, vamos criar um SQS e utilizar uma de suas métricas para que a gente possa utilizar no nosso alarme. Criando uma fila SQS Vamos criar uma fila SQS simples e escolher alguma métrica para que a gente possa utilizar no nosso alarme. Acesso o console da AWS e na barra de pesquisa, digite "SQS" conforme imagem abaixo e em seguida, acesse o serviço. Após acessar o serviço, clique em Criar Fila Vamos criar uma fila padrão para o exemplo, o nome da fila será sqs-messages conforme mostrado abaixo. Não precise se atentar aos outros detalhes, apenas clique no botão Criar Fila para finalizar a criação. Fila criada, agora o próximo passo é criar o nosso alarme. Criando o Alarme Todo recurso/serviço é composto de uma ou mais métricas, são basicamente características. Imagine um carro, no carro temos características do tipo, KMs rodados, KM/hora, quantidade de passageiros em um certo momento e quantidade de combustível no tanque e entre outros. Em uma fila SQS temos métricas do tipo, número de mensagens enviadas, número de recebimentos vazios, tamanho das mensagens enviadas, número de mensagens recebidas e etc. Para o nosso exemplo, vamos escolher a métrica baseada em número de mensagens enviadas (numberOfMessagesSent). Na prática, poderíamos escolher essa métrica por vários motivos. Imagine em uma aplicação que, em casos de instabilidade, mensagens serão enviadas para uma determinada fila, assim, evitando perdas. É de grande importância sabermos que existem mensagens que foram enviadas para uma determinada fila neste contexto de instabilidade. Dessa forma, o alarme deverá ser acionado. Acesse a AWS via console e procure por Cloudwatch na barra de pesquisa, conforme imagem abaixo. Após acessar o serviço, clique na opção Em alarme no canto esquerdo da tela e em seguida clique no botão Criar alarme. Selecione a métrica conforme a tela abaixo Escolha a opção SQS Em seguida, clique em Métricas da fila Na barra de pesquisa, digite sqs-messages para buscar as métricas relacionadas a SQS criada nos passos anteriores. Após a pesquisa da fila e suas métricas, selecione o item da coluna Nome da métrica identificado como NumberOfMessagesSent, e em seguida, clique em Selecionar métrica. Na próxima tela iremos configurar mais detalhes sobre o alarme como: período, estatística, condição para o acionamento, valor limite e pontos de dados. Configurando as métricas Nome da métrica: é a métrica escolhida nos passos anteriores, ou seja, esta métrica mede o número de mensagens enviadas para a SQS (NumberOfMessagesSent). QueueName: Nome da SQS no qual o alarme será configurado. Estatística: Neste campo podemos escolher opções como Média, Soma, Mínimo e entre outros. Isso vai depender do contexto o qual você vai precisar para configurar o alarme e a métrica. Neste exemplo escolhemos Soma, pois queremos pegar a soma do número de mensagens enviadas em um determinado período. Período: Neste campo definimos o período em que o alarme será acionado caso atinja a condição limite, no qual, será definido nos próximos passos. Configurando as condições Tipo de limite: Para este exemplo vamos utilizar o Estático. Sempre que o NumberOfMessagesSent for...: Vamos selecionar a opção Maior que...: Neste campo vamos configurar a quantidade de NumberOfMessagesSent como condição para acionar o alarme. Vamos colocar 5. Configuração adicional Para a configuração adicional, temos o campo Pontos de dados para o alarme no qual gostaria de detalhar um pouco mais o seu funcionamento. Pontos de dados para o alarme Esta opção adicional, flexibiliza a configuração do alarme combinado as condições definidas anteriormente. Por padrão, esta configuração é: 1 de 1 Como funciona? O primeiro campo refere-se a quantidade de pontos e o segundo campo, refere-se ao período. Mantendo as configurações anteriores mais a adicional significa que, o alarme será acionado caso a métrica NumberOfMessagesSent for maior que a soma de 5 em um período de 5 minutos. Até então, a configuração adicional padrão não altera as configurações definidas anteriormente, nada muda. Agora, vamos alterar esta configuração para entender melhor. Vamos alterar de: 1 de 1 para 2 de 2. Isso nos diz que, quando a condição do alarme for atingida, ou seja, para a métrica NumberOfMessagesSent, a soma for maior do que 5, o alarme será acionado para 2 pontos de dados em 10 minutos. Perceba que o período foi multiplicado devido ao segundo campo com o valor 2. Resumindo de forma mais objetiva, mesmo que a condição seja atingida, o alarme somente será acionado se existir 2 pontos de dados acima do limite em um período de 10 minutos. Isso nos dá uma certa flexibilidade afim de evitar falsos alarmes. Vamos entender melhor ainda quando efetuamos alguns testes de acionamento do alarme. Vamos manter as configurações a seguir e clique em Próximo Configurando as ações Na próxima tela, vamos configurar as ações responsáveis por notificar um destino caso o alarme seja acionado. Nesta tela, vamos manter a configuração Em alarme e em seguida, vamos criar um novo tópico e por último, vamos adicionar um email no qual desejamos receber as notificações de erros. Na prática, existem melhores formas de notificação, mas não entraremos nestes detalhes. Selecione a opção Criar novo tópico e preencha com um nome desejado e em seguida, digite um email valido no campo Endpoints de e-mail que receberão a notificação ... Feito o preenchimento, clique em Criar tópico e em seguida, um email será enviado para confirmar a inscrição no tópico criado. Faça a confirmação no seu email e clique em Próximo na tela do alarme para prosseguir com a criação. Agora, precisamos adicionar o nome do alarme na tela abaixo e em seguida clicar em Próximo. A próxima tela será a de revisão, clique em Criar alarme para finalizar a criação. Pronto, agora temos um alarme criado e é hora de testar. Testando o alarme Para testar o alarme, vamos enviar 6 mensagens para a fila criada anteriormente conforme a tela abaixo. Repita esta ação por 6 vezes clicando em Enviar mensagem, fique a vontade para mudar o conteúdo da mensagem. Após o envio das mensagens, perceba que mesmo que o limite seja ultrapassado, o alarme não foi acionado. Isso se deve a configuração adicional, caso contrário, bastaria ter configurado os pontos de dados para 1 de 1 para que o alarme fosse acionado. Agora, vamos enviar várias outras mensagens que exceda o limite em períodos curtos dentro da janela de 10 minutos. Perceba que na imagem acima o alarme foi acionado, pois além de ter atingido a condição especificada nas configurações, também atingiu os 2 pontos de dados. Verifique no email adicionado nas configurações de notificação pois, provavelmente um email foi enviado com os detalhes do alarme. Após o período de 10 minutos, o alarme passa de Em alarme para OK. É isso, curtiu? Até mais!

Configurando S3 Bucket event notification com SQS via Terraform

Configurando S3 Bucket event notification com SQS via Terraform

O S3 (Simple Storage Service) possibilita notificar através de eventos, quando alguma ação ocorre dentro de um Bucket ou em alguma pasta específica. Em outras palavras, funciona como um listener, ou seja, qualquer ação ocorrida em uma origem, ocorrerá um evento de notificação para um destino. O que seriam estas ações? Qualquer ação que acontece dentro de um Bucket como, a criação de um novo arquivo manual ou através de um upload, criação de uma pasta, remoção de arquivos, restauração de arquivos e etc. Destinos Para cada configuração de notificação de evento, deve existir um destino. Para este destino, será enviado informações sobre cada ação, por exemplo: Um novo arquivo foi criado em uma pasta específica, assim, informações sobre o arquivo será enviada, como a data de criação, tamanho do arquivo, o tipo do evento, nome do arquivo e dentre outros. Lembrando que neste processo, não é enviado o conteúdo do arquivo, GG? Existem 3 tipos de destinos disponíveis: Lambda SNS SQS Entendendo o funcionamento na prática Neste post vamos criar uma configuração de notificação de evento em um Bucket S3, simular uma ação e entender o comportamento final. Todo este conteúdo é baseado na documentação oferecida pela HashiCorp, empresa mantenedora do Terraform. Poderíamos criar esta configuração via console mas por motivos de boas práticas, utilizaremos o Terraform como ferramenta IAC. Para quem não conhece muito sobre Terraform, segue este tutorial sobre Primeiros passos utilizando Terraform na AWS. No próximo passo, criaremos um fluxo simulando a imagem abaixo. Criaremos uma configuração no Bucket S3, que para cada arquivo criado na pasta files/, um evento de notificação será enviado para um fila SQS. Criando os arquivos Terraform Crie no seu projeto, uma pasta chamada terraform/ e a partir de agora, todos os arquivos .tf serão criados dentro dela. Agora, crie um arquivo chamado vars.tf onde armazenaremos as variáveis que serão utilizadas. variable "region" {
default = "us-east-1"
type = string
}

variable "bucket" {
type = string
} Crie um arquivo chamado provider.tf , onde adicionaremos a configuração do provider, no qual será a AWS. Isso significa que, o Terraform utilizará a AWS como nuvem para criar os recursos e fará o download dos plugins necessários na inicialização. Copie o código abaixo para o arquivo. provider "aws" {
region = "${var.region}"
} Crie um arquivo chamado s3.tf , onde adicionaremos as configurações para a criação de um novo Bucket S3 que será usado para a prática resource "aws_s3_bucket" "s3_bucket_notification" {
bucket = "${var.bucket}"
} Agora, crie um arquivo chamado sqs.tf , onde adicionaremos as configurações para a criação de uma fila SQS e algumas permissões conforme código abaixo: resource "aws_sqs_queue" "s3-notifications-sqs" {

name = "s3-notifications-sqs"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:*:*:s3-notifications-sqs",
"Condition": {
"ArnEquals": {
"aws:SourceArn":
"${aws_s3_bucket.s3_bucket_notification.arn}"
}
}
}
]
}
POLICY
} Entendendo o código acima No código acima, estamos criando uma SQS e adicionando algumas configurações de policy, segue mais detalhes: O nome da SQS será s3-notifications-sqs, valor detalhado no campo name No campo policy, definimos uma política para que o S3 possa enviar mensagens de notificações para a SQS. Veja que estamos referenciando o Bucket S3 através de ARN no trecho ${aws_s3_bucket.s3_bucket_notification.arn} Para o último arquivo, vamos criar a configuração que permitirá o envio de notificações de eventos do Bucket S3 para uma SQS. Crie o arquivo s3_notification.tf e adicione o código abaixo: resource "aws_s3_bucket_notification" "s3_notification" {
bucket = aws_s3_bucket.s3_bucket_notification.id

queue {
events = ["s3:ObjectCreated:*"]
queue_arn = aws_sqs_queue.s3-notifications-sqs.arn
filter_prefix = "files/"
}
} Entendendo o código acima No código acima, estamos criando um recurso chamado aws_s3_bucket_notification no qual será responsável por habilitar as notificações de um Bucket S3. No campo bucket, estamos referenciando através do ID, o bucket que será criado. Neste caso, estamos utilizando aws_s3_bucket + alias + id para a referência, no qual, o alias é o s3_bucket_notification, definido no arquivo s3.tf e por último, o id que será o identificador do Bucket que será criado. O campo queue é um bloco contendo algumas configurações como: events: É o tipo do evento da notificação. Neste caso, para eventos do tipo ObjectCreated, as notificações serão feitas apenas por objetos criados, ou seja, para objetos deletados, não haverá notificação. Isso ajuda para restringir alguns tipos de eventos. queue_arn: Referência para a SQS definida no arquivo sqs.tf filter_prefix: Este campo define a pasta em que desejamos que as notificações sejam disparadas. No código, definimos que a pasta files/ seja o gatilho. Resumindo, para todos arquivos criados dentro desta pasta, um notificação será enviada para a SQS definida no campo queue_arn. Executando o Terraform Inicie o Terraform terraform init Executando o Plan O plan possibilita verificar quais recursos serão criados, neste caso é necessário passar o valor da variável bucket para a criação do mesmo no S3. terraform plan -var bucket = 'digite o nome do bucket' Executando o Apply Neste passo, a criação dos recursos serão aplicados. Lembre-se de passar o nome do bucket em que deseja criar para a variável bucket e o nome bucket deve ser único. terraform apply -var bucket = 'digite o nome do bucket' Simulando um evento de notificação Após a execução dos passos anteriores e criado os recursos, vamos fazer um upload manual de um arquivo na pasta files/ do bucket que foi criado. Via console, acesse o Bucket criado no S3 e crie uma pasta chamada files. Dentro dela, carregue qualquer arquivo. Carregando um arquivo Após carregar o arquivo na pasta files, acesse o SQS criado. Você verá que existe mensagens disponíveis na SQS que foi criada nos passos anteriores. Acesse as mensagens e navegue pelas informações. Pronto, temos uma notificação de evento criada. Para mais informações, acesse os links: https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_notification https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html Repositório do Github com o código: https://github.com/coffeeandtips/s3-bucket-notification-with-sqs-and-terraform É isso, curtiu? Até mais!

Criando Kinesis Firehose via Terraform e enviando eventos com Java

Criando Kinesis Firehose via Terraform e enviando eventos com Java

Neste post, vamos fazer um passo a passo de como criar um AWS Kinesis Firehose (Delivery Stream) via Terraform e enviar alguns eventos via aplicação Java. Antes de começarmos, precisamos entender o que é o Kinesis Firehose e como é o seu funcionamento. O Kinesis Firehose é um dos recursos do Amazon Kinesis, que se resume ao serviço de streaming de dados da AWS. O Kinesis Firehose possibilita capturar um grande volume de dados através de uma origem, processar em tempo real e entregar os dados para um destino. Para entender melhor, vamos pensar no cenário a seguir: Imagine que você e seu time esteja trabalhando em um processo que será necessário enviar um grande volume de dados em tempo real para o S3. Estes objetos são arquivos no formato JSON representados por dados transacionais de compras e vendas de uma plataforma de e-commerce. A ideia de enviar ao S3 é que, o time está planejando criar um Datalake no futuro, e já pensa em organizar os dados em um repositório único de dados, neste caso, o S3. O time criou uma API que faz o PUT dos objetos no S3, e foram percebendo que aquele processo era limitado, custoso, não muito escalável pelo fato do volume de dados ser alto e o mais importante, não era resiliente. Com base nas limitações, o time entendeu que, o mais certo seria trabalhar em cima de uma ferramenta que pudesse fazer toda a parte de streaming dos dados e que a utilização do Kinesis Firehose seria uma melhor abordagem. Pensando neste novo cenário, arquitetura foi mudada para a seguinte: Certo, mas qual é o benefício de ter adicionado o Kinesis Firehose? O Kinesis possibilita gerenciar todo o volume de dados, independente do volume. Não será mais necessário acessar o S3 de forma contínua e gerenciar todo o fluxo de PUT dos objetos utilizando SDK Client, tornando o processo mais resiliente e simples. O Kinesis Firehose possibilita entregar as mensagens para diferentes destinos, como o S3, Redshift, Elastic Search e dentre outros. É um ótimo recurso para um processo analítico de dados e não necessita de uma administração contínua, pois o recurso já é gerenciado pela AWS. Resumindo, se você e seu time precisa processar grandes volumes de dados em tempo real e entregar os dados para diferentes destinos, o Kinesis Firehose pode ser uma boa solução. O cenário acima descrito, foi apenas para que a gente pudesse entender o contexto em qual cenário aplicar o Kinesis Firehose, existem vários outros cenários em que podemos aplicar. Na próxima etapa, vamos criar um Kinesis utilizando uma ferramenta IAC, neste caso, o Terraform. O Terraform já foi abordado em outros posts e vale a pena dar uma olhada caso você seja novo por aqui, pois o foco não é falar sobre Terraform, combinado? Após a criação do Kinesis Firehose utilizando Terraform, vamos criar um código Java que vai nos permitir enviar alguns eventos para o Kinesis Firehose processar e salvar estes mesmos dados em um Bucket no S3, finalizando o fluxo de streaming. Lembre-se que, para executar os passos seguintes é necessário ter uma conta na AWS e ter as credencias já configuradas, ok? Criando o Terraform Crie um arquivo no seu projeto chamado main.tf e adicione o código abaixo provider "aws" {
region = "${var.region}"
} Entendendo o código acima provider "aws": É no provider que definimos qual é a cloud que utilizaremos, no nosso caso a AWS. region: É onde definimos a região em que o serviço será executado. Agora, crie o arquivo vars.tf . É neste arquivo que vamos definir a variáveis e os seus valores. variable "region" {
default = "us-east-1"
type = "string"
}

variable "bucket" {
type = "string"
} Entendendo o código acima variable "region": Declaração da variável region. Esta mesma variável é utilizado no arquivo main.tf. default: Definição padrão do valor da variável, neste caso us-east-1. Esta será a região em que o serviço será executado. type: Tipo da variável variable "bucket": Declaração da variável bucket. E por último, crie o arquivo kinesis.tf, este nome é sugestivo, combinado? resource "aws_kinesis_firehose_delivery_stream" "kinesis_firehose"
{

destination = "extended_s3"
name = "kns-delivery-event"

extended_s3_configuration {
bucket_arn = "${aws_s3_bucket.bucket.arn}"
role_arn = "${aws_iam_role.firehose_role.arn}"
prefix = "ingest/year=!{timestamp:yyyy}/
month=!{timestamp:MM}/
day=!{timestamp:dd}/
hour=!{timestamp:HH}/"
error_output_prefix = "ingest/
!{firehose:error-output-type}/
year=!{timestamp:yyyy}
/month=!{timestamp:MM}/
day=!{timestamp:dd}/hour=!{timestamp:HH}/"
}

}

resource "aws_s3_bucket" "bucket" {
bucket = "${var.bucket}"
acl = "private"
}

resource "aws_iam_role" "firehose_role" {
name = "firehose_test_role"

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}

resource "aws_iam_policy" "policy" {

name = "kns-s3-policy"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:GetObject",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:PutObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::${var.bucket}/*",
"arn:aws:s3:::${var.bucket}*",
]
}
]
}
EOF
}


resource "aws_iam_role_policy_attachment" "kns-s3-policy-attach" {
policy_arn = aws_iam_policy.policy.arn
role = aws_iam_role.firehose_role.name
} Entendendo o código acima 1.resource "aws_kinesis_firehose_delivery_stream": Bloco responsável pela criação do Kinesis Firehose 2. extended_s3_configuration: Bloco com configurações específicas do destino 3. bucket_arn: Referência do Bucket que será declarado nos próximos passos 4. role_arn: Role que será declarada nos próximos passos 5. prefix: Neste parâmetro, é configurado o caminho em que os arquivos serão salvos. Definimos um prefixo (ingest/) + partições, por exemplo: ingest/year=2021/month=03/day=01/hour=12 6. error_output_prefix: Neste parâmetro, definimos um caminho caso o processamento falhe e todos os erros serão enviados para o caminho ali definido. 7. resource "aws_s3_bucket" "bucket": Bloco responsável pelas configurações do Bucket a ser criado. 8. bucket: Nome do bucket em que será passado via parâmetro. Lembre-se que o bucket deve ser único. 9. acl: Nível de acesso, neste caso, privado. 10. resource "aws_iam_role" "firehose_role": Bloco responsável pela criação da role que será usada pelo Kinesis. 11. resource "aws_iam_policy" "policy": Bloco responsável pela criação da policy. Nela definimos as ações que serão permitidas executar no bucket S3. 12. resource "aws_iam_role_policy_attachment" "kns-s3-policy-attach": Neste bloco, anexamos a policy para a role firehose_role. Executando o Terraform Inicie o Terraform terraform init Executando o Plan O plan possibilita verificar quais recursos serão criados, neste caso é necessário passar o valor da variável bucket para a criação do mesmo no S3. terraform plan -var 'bucket=digite o nome do seu bucket' Executando o apply Neste passo, a criação dos recursos serão aplicados. Lembre-se de passar o nome do bucket em que deseja criar para a variável bucket e o nome bucket deve ser único. terraform apply -var 'bucket=digite o nome do seu bucket' Pronto, recurso criado! Falando um pouco sobre Buffer Existe um detalhe bem interessante no Kinesis que são as configurações de Buffer, dependendo de como está configurado, pode impactar o modo de quando os arquivos serão salvos no destino final, no caso, o S3. Por padrão, o Buffer size é criado com 5 MB e o interval de 300 segundos (5 minutos). Como funciona o Buffer? No exemplo acima, temos 2 configurações de Buffer, o Buffer size e o Interval. No Kinesis, o buffer é utilizado como uma espécie de memória interna que armazenará os dados antes de serem entregues ao S3. Nas configurações acima, o Kinesis vai armazenar até 5 MB de dados antes de serem entregues ou a cada intervalo de 5 minutos. Esta segunda opção pode acontecer caso não haja 5 MB de dados para armazenar. Lembrando que existe a possibilidade do Kinesis aumentar estas configurações de forma dinâmica visando a entrega dos dados. Criando o código Java Após executado os passos anteriores e a criação dos recursos serem bem sucedidas, vamos criar o nosso código Java que enviará os eventos para o Kinesis Firehose. Será um código Java simples, que enviará os eventos para o Kinesis Firehose através da AWS SDK. Neste mesmo código, vamos criar duas formas de envio para o Kinesis Firehose, utilizando PutRecord e PutRecordBatch. Maven <dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.12.70</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies> Criando o modelo Classe Java chamada Event.java que representará o modelo do evento. import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.util.UUID;

@Data
public class Event {

@JsonProperty("event_date")
private String eventDate;

@JsonProperty("event_id")
private UUID eventId;

@JsonProperty("provider")
private String provider;

@JsonProperty("blog")
private String blog;

@JsonProperty("post_id")
private UUID postId;
} Criando o Service A classe KinesisService.java será a responsável por parte da lógica e envio dos eventos. public class KinesisService {

static String PROVIDER = "AWS KINESIS FIREHOSE";
static String BLOG = "Coffee and Tips";
static String KNS_DELIVERY_NAME = "kns-delivery-event";
static String RECORD_ID = "Record ID ";
static String EVENT = "Event ";

public static AmazonKinesisFirehose kinesisFirehoseClient(){
AmazonKinesisFirehose amazonKinesisFirehose =
AmazonKinesisFirehoseClient.builder()
.withRegion(Regions.US_EAST_1.getName())
.build();
return amazonKinesisFirehose;
}

@SneakyThrows
public static void sendDataWithPutRecordBatch(int maxRecords){

PutRecordBatchRequest putRecordBatchRequest =
new PutRecordBatchRequest();

putRecordBatchRequest
.setDeliveryStreamName(KNS_DELIVERY_NAME);

String line = "";
List<Record> records = new ArrayList<>();

while(maxRecords > 0){
line = getData();
String data = line + "\n";
Record record = new Record()
.withData(ByteBuffer.wrap(data.getBytes()));
records.add(record);
maxRecords --;
}

putRecordBatchRequest.setRecords(records);
PutRecordBatchResult putRecordResult =
kinesisFirehoseClient()
.putRecordBatch(putRecordBatchRequest);

putRecordResult
.getRequestResponses()
.forEach(result -> System.out
.println(RECORD_ID + result.getRecordId()));

}

@SneakyThrows
public static void sendDataWithPutRecord(int maxRecords){

PutRecordRequest PutRecordRequest =
new PutRecordRequest();
PutRecordRequest
.setDeliveryStreamName(KNS_DELIVERY_NAME);

String line = "";

while(maxRecords > 0){

line = getData();
String data = line + "\n";

System.out.println(EVENT + data);

Record record = new Record()
.withData(ByteBuffer
.wrap(data.getBytes()));

PutRecordRequest.setRecord(record);

PutRecordResult
putRecordResult = kinesisFirehoseClient()
.putRecord(PutRecordRequest);

System.out.println(RECORD_ID +
putRecordResult.getRecordId());

maxRecords --;
}
}

@SneakyThrows
public static String getData(){
Event event = new Event();
event.setEventId(UUID.randomUUID());
event.setPostId(UUID.randomUUID());
event.setBlog(BLOG);
event.setEventDate(LocalDateTime.now().toString());
event.setProvider(PROVIDER);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(event);
}

} Entendendo o código acima O código acima possui algumas adaptações visando o funcionamento do Kinesis para o Post. Para um ambiente de produção, recomendo utilizar técnicas de padrões de projeto ou adotar melhores práticas visando um código mais organizado, GG? 1. O método kinesisFirehoseClient( ) é o responsável por configurar e manter uma interação entre os serviços do Kinesis Firehose e a nossa aplicação através da interface AmazonKinesisFirehose. Esta interface provê os seguintes contratos: putRecordBatch(PutRecordBatchRequest var1) putRecord(PutRecordRequest var1) Todo o código de envio foi baseado nos contratos acima e falaremos mais sobre eles nos próximos passos. Por último, perceba que estamos utilizando a região US-EAST-1, a mesma região definida no Terraform. 2. O método sendDataWithPutRecordBatch(int maxRecords) será o nosso método responsável por enviar os dados em Batch a partir do contrato putRecordBatch(PutRecordBatchRequest var1). A ideia de usar o envio em Batch, é a possibilidade de, em um único request, enviarmos uma grande quantidade de dados. Este cenário é bem comum para aplicações que trabalham com um grande volume de dados. Um ponto importante a ser lembrado é que, esta operação suporta até 4 MB de dados ou até 500 registros por requisição. Para o método sendDataWithPutRecordBatch(int maxRecords), estamos utilizando o parâmetro maxRecords, que simulará uma quantidade específica de dados dentro do loop para serem enviados via Batch ao fim da interação, ou seja, o seu uso será apenas para a simulação. 3. Para entendermos sobre o contrato putRecord(PutRecordRequest var1) fornecido pela interface AmazonKinesisFirehose, criamos o método sendDataWithPutRecord(int maxRecords). Este cenário difere bastante do envio em Batch, pois o tipo de envio será do tipo single record, ou seja, para cada dado enviado, um novo request deve ser criado e para cada envio, existe um limite de dado de até 1.000 KiB de tamanho. 4. O método getData( ) será o responsável por montar o objeto do tipo Event com valores aleatórios antes do envio e transformar em um JSON através do ObjectMapper. Executando o código Para a execução do código, vamos criar um método main( ) que fará a execução dos métodos acima. Executando o método sendDataWithPutRecord(int maxRecords) public static void main(String[] args) {
sendDataWithPutRecord(2000);
} No código acima estamos invocando o método sendDataWithPutRecord(int maxRecords) e passando no parâmetro o valor 2000, ou seja, iremos enviar 2000 eventos ou 2000 requests. Resposta do envio no console Para cada envio, o campo putRecordResult.getRecordId() retorna um ID. Executando o método sendDataWithPutRecordBatch(int maxRecords) public static void main(String[] args) {
sendDataWithPutRecordBatch(500);
} Já no método acima, especificamos o valor 500 para o parâmetro maxRecords, pois é o valor máximo de dados permitidos para envio em cada requisição. Lembrando que este parâmetro simula a quantidade de dados fictícios a serem enviados. Destino final Após a execução dos envios, os dados serão entregues ao Bucket do S3 definido na execução do Terraform e serão organizados conforme a imagem abaixo. Na imagem acima, os dados estão organizados entre ano, mês, dia e hora de envio. Os arquivos vão estar disponíveis dentro das pastas relacionadas a hora (hour=), e perceba a diferença de 5 minutos entre eles na coluna Última modificação, ou seja, seguindo a configuração de Buffer interval conforme falamos nos passos anteriores. Pronto, os dados estão sendo entregues! Destruindo os recursos Caso você esteja acompanhando este Post de forma experimental, creio que seja necessário apagar toda a stack em que criamos para evitar uma conta inesperada da AWS. Podemos executar o seguinte comando via Terraform para destruir os recursos em que criamos. terraform destroy -var 'bucket=digite o nome do seu bucket' Mas antes, apague todos os arquivos e pastas do Bucket, pois caso o contrário, o destroy não vai funcionar, GG? Após a execução do comando acima, você deverá confirmar a destruição, é só digitar yes para confirmar. Caso queria entender mais a fundo sobre Kinesis Firehose, recomendo a leitura na documentação oficial. Código completo no GitHub. É isso, curtiu? Até mais!

Criando uma Lambda AWS usando Python + Deploy

Criando uma Lambda AWS usando Python + Deploy

Neste post, iremos criar uma aplicação Lambda AWS usando Python. Será um passo a passo, desde criar o código fonte, que será bem simples, passando pela criação da role e por fim, o deploy da aplicação. Este tutorial será baseado na documentação da AWS, pois, muitas vezes os usuários possuem uma dificuldade em ler uma documentação por talvez ser um pouco verbosa ou até estar em outra língua, por isso, decidi criar este passo a passo baseando-se na própria documentação da AWS e espero que gostem. O que é um Lambda? Lambdas são conhecidos como aplicações serverless, ou seja, são aplicações que não tem a necessidade em configurar um servidor memória, CPU etc. Todo o poder computacional é gerenciado pela própria AWS e isso nos dá a flexibilidade de apenas se preocupar com o código que será executado. Característica de um Lambda O Lambda possui uma característica diferente de uma aplicação tradicional, que tem uma característica de uptime 24/7, para isso, temos que ter em mente a finalidade do seu uso. A característica principal é que um Lambda é executado para uma tarefa específica e após o término, ela basicamente "morre". Isso é muito positivo, pois gera uma boa economia e os recursos são bem mais utilizados. Imagine que você recebeu uma tarefa para criar uma aplicação que enviará emails algumas vezes durante o dia. Faz sentido manter uma aplicação online 100% do tempo para este tipo de tarefa? Você até pode fazer, mas estaria desperdiçando recursos, por na maioria do tempo a aplicação estar ociosa e o tempo de execução da tarefa seria em questão de poucos minutos ou nem isso. Já um Lambda, seria uma abordagem ideal para este cenário. Imagine que os emails devem ser enviados 6 vezes ao dia em um intervalo de 4 horas. A aplicação executaria a tarefa, terminaria e depois de 4 horas executaria novamente. E cada execução seria em torna de 2 minutos. Resumindo, o objetivo da tarefa seria alcançado em pouco tempo e com um baixo custo computacional. Mãos na massa Nos próximos passos, criaremos o nosso código em Python na versão 3.8, em seguida, a role com as permissões necessárias para a execução do Lambda. Vamos anexar a role criada á uma policy, criaremos a aplicação Lambda, e em seguida, vamos compactar o arquivo com o código para o deploy, e por fim, faremos uma invocação para execução da Lambda. Os passos após o código, iremos executar utilizando CLI, para isso faça a instalação do AWS CLI seguindo esta documentação . O CLI é uma das maneiras de interação com a AWS e é bem simples a sua utilização. Existem outras maneiras também de interagir com a AWS para a criação de recursos, o mais comum no mercado é utilizando ferramentas IAC, como Cloudformation e Terraform. Já até criamos um tutorial sobre estes recursos, basta acessar os links abaixo para entender melhor, mas para este tutorial, seguiremos como base a documentação da AWS, utilizando CLI. Criando recursos utilizando Cloudformation Primeiros passos utilizando Terraform na AWS Após a instalação do CLI, configure suas credenciais da AWS executando o comando abaixo via terminal. Caso você não conheça este comando, recomendo a leitura de como utilizar, nesta documentação, aws configure Criando o código da Lambda A seguir, criaremos o código que será executado na aplicação Lambda, será um código muito simples apenas para exemplificar a execução. Utilizaremos Python neste exemplo, mas é possível utilizar várias outras linguagens de programação para a criação da Lambda. Crie um arquivo chamado lambda_function.py e escreva o código abaixo. import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):

name = event['yourname']

message = {'Message': 'hello ' + name +
' ,this is your first '
'Lambda application!' }

logger.info(message) Criando a role O comando a seguir criará a role chamada lambda-default-role. A role é uma forma de dar permissão para um recurso específico executar uma ação específica. Execute via terminal. aws iam create-role --role-name lambda-default-role --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{ "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]}' Após a execução do comando acima, um output como este será mostrado. {
"Role": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
]
},
"RoleId": "AROA5WDL5GTDVI43EJEUOV",
"CreateDate": "2021-09-12T22:11:04Z",
"RoleName": "lambda-default-role",
"Path": "/",
"Arn": "arn:aws:iam::id-da-sua-conta:role/lambda-default-role"
}
} Anexando a role á uma policy Para este passo, basta executar o comando abaixo via terminal aws iam attach-role-policy --role-name lambda-default-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Zipando o arquivo Python Neste passo, vamos compactar o arquivo Python contendo o código que escrevemos nos passos anteriores, para isso, acesse a pasta onde o arquivo se encontra e execute o comando abaixo. zip my-deployment-package.zip lambda_function.py Criando a função Lambda e efetuando o deploy Agora criaremos a função Lambda chamada my-first-lambda e no mesmo comando, vamos anexar o código compactado no passo anterior. Lembre-se que este comando deve ser executado na mesma pasta onde o arquivo foi zipado, ok? aws lambda create-function --function-name my-first-lambda --zip-file fileb://my-deployment-package.zip --handler lambda_function.lambda_handler --runtime python3.8 --role arn:aws:iam::id-da-sua-conta:role/lambda-default-role Entendendo o comando acima aws lambda create-function --function-name my-first-lambda: Comando para criar a função Lambda chamada my-first-lambda --zip-file fileb://my-deployment-package.zip: Anexa o arquivo my-deployment-package.zip na criação da função, onde contém o código fonte. --handler lambda_function.lambda_handler: Esse parâmetro seta a nome da função lambda_handler, ou seja, quando o Lambda for invocado, será está função que será executada. Esta mesma função deve existir no arquivo Python, conforme criamos anteriormente no primeiro passo. --runtime python3.8: Neste parâmetro, passamos como argumento a linguagem em que estamos usando para o Lambda, no nosso caso, Python com a versão 3.8 --role arn:aws:iam::id-da-sua-conta:role/lambda-default-role: Por último, passamos o valor do parâmetro role. O valor deste parâmetro é mostrado no output após a criação da role, no campo Arn, basta copiar e colar. Pronto, agora que você entendeu, basta executar via terminal para a criação definitiva do Lambda. Veja que o Lambda foi criado com sucesso, agora vamos fazer algumas invocações para que o Lambda seja executado. Invocando o Lambda Neste passo, vamos executar o Lambda através de uma invocação manual. Existem diversas outras formas em invocar uma aplicação Lambda. A invocação pode ser feita através de um Event-Bridge que funcionaria como um gatilho, no qual podemos configurar um cron, que poderá invocar o Lambda de tempos pré-determinados, como, minuto a minuto, hora em hora, 1 vez por semana e dentre outras opções. Existem várias outras formas, todas elas são descritas na documentação da AWS, mas para este post, faremos de forma manual, utilizando o CLI. aws lambda invoke --function-name my-first-lambda --payload '{"yourname": "Mônica"}' output.txt Entendendo o comando acima aws lambda invoke --function-name my-first-lambda: Invoca o Lambda my-first-lambda ---payload '{"yourname": "Mônica"}': No payload é onde passamos o valor do input que o Lambda vai processar. Neste caso, estamos passando um JSON com uma informação. O segundo parâmetro, o output.txt, será o arquivo que armazenará informações como, o response do Lambda caso exista ou algum log de erro, caso aconteça. Você pode acessar os logs do Lambda através do Cloudwatch para visualizar melhor os logs que definimos no código Python. Atualizando o código Caso precise alterar o código, refaça o passo em que zipamos o arquivo e em seguida, execute o comando abaixo. aws lambda update-function-code --function-name my-first-lambda --zip-file fileb://my-deployment-package.zip Pronto, agora temos um Lambda! Caso queira ler a documentação da AWS sobre como criar um Lambda, siga esta documentação. É isso, curtiu? Até mais!

Enviando mensagens para fila SQS via Python + boto3

Enviando mensagens para fila SQS via Python + boto3

Tenho experimentado Python e tenho me surpreendido com a facilidade no desenvolvimento e sem muitas configurações de projeto. Para este post, vamos criar um exemplo de como criar uma fila SQS e enviar mensagens utilizando Python e boto3, que possui uma grande variedade de métodos para acesso aos recursos da AWS. Instalando boto3 A instalação é bem simples, assegure-se que já exista o Python instalado em seu ambiente. Caso já exista, via terminal, execute o comando a seguir para a instalação do boto3. pip install boto3 Importando boto3 para o projeto Após instalado, vamos escrever o nosso código em partes, começando pelo import da biblioteca boto3 e atribuindo para uma variável sqs e client, conforme imagem abaixo: import boto3
sqs = boto3.resource('sqs')
client = boto3.client('sqs') Criando fila SQS Criando uma function que criará a fila SQS passando alguns parâmetros como, name e attributes. def create_sqs(name, attributes):
sqs.create_queue(QueueName=name, Attributes=attributes) Envio de mensagens para a fila SQS Criando uma function que enviará as mensagens para a fila SQS def send_messages_to_sqs(name, message):
queue = sqs.get_queue_by_name(QueueName=name)
client.send_message(QueueUrl=queue.url, MessageBody=message) No código acima, existe um método chamado get_queue_by_name que no qual precisamos usar para recuperar a url da fila existente, para que seja repassada para o método send_message Executando o código Após criada as functions, vamos executar o código para a criação da fila SQS e o envio das mensagens. if __name__ == '__main__':

attributes = {
'DelaySeconds': '60',
'MessageRetentionPeriod': '600'
}

create_sqs('sqs_messages', attributes)

send_messages_to_sqs('sqs_messages', "Hello, SQS!") Código completo import boto3

sqs = boto3.resource('sqs')
client = boto3.client('sqs')


def create_sqs(name, attributes):
sqs.create_queue(QueueName=name, Attributes=attributes)

def send_messages_to_sqs(name, message):
queue = sqs.get_queue_by_name(QueueName=name)
client.send_message(QueueUrl=queue.url, MessageBody=message)

if __name__ == '__main__':

attributes = {
'DelaySeconds': '60',
'MessageRetentionPeriod': '600'
}

create_sqs('sqs_messages', attributes)

send_messages_to_sqs('sqs_messages', "Hello, SQS!") Simples, certo? É um código prático que permite acessar recursos da AWS com poucas linhas de código. Sou novo em Python e espero trazer mais posts como este, assim vamos aprendendo juntos =) É isso, curtiu? Até mais!