Buscar Resultados

49 itens encontrados

  • First steps with DBT - Data Build Tool

    DBT has been used by a lot of companies on Data area and I believe that we can extract good insights in this post about it. That's going to be a practical post showing how DBT works it and hope you guys enjoy it. What's DBT? DBT means Data Build Tool and enables teams to transform data already loaded in their warehouse with simple select statements. DBT does the T in ELT processes, in the other words, he doesn't work to extract and load data but he's useful to transform it. Step 1: Creating a DBT Project Now, we're assume that DBT is already installed but if not, I recommend see this link. After DBT installed you can create a new project using CLI or you can clone this project from the DBT Github repository. Here for this post we're going to use CLI mode to create our project and also to complete the next steps. To create a new project, run the command below. dbt init After running this command, you need to type the project's name and which warehouse or database you're going to use like the image below. For this post, we're going to use postgres adapter. It's very important that you have a postgres database already installed or you can up a postgres image using docker. About adapters, DBT supports different of them and you can check here. I created a table structure and also loaded it with data simulating data from a video platform called wetube and we're going to use them to understand how DBT works it. Follow the structure: Step 2: Structure and more about DBT After running dbt init command to create the project, a structure of folders and files below will be created. I won't talk about the whole directories of project but I'd like to focus in two of them. Sources Sources are basically the data already loaded into your warehouse. In DBT process, sources have the same meaning of raw data. There's no folders representing source data for this project but you need to know about this term because we're going to set up tables already created as sources for the next sections. Seeds Seeds is an interesting and useful mechanism to load static data into your warehouse through CSV files. If you want to load these data you need to create a CSV file on this directory and run the command below. dbt seed For each field on CSV file, DBT will infer their types and create a table into warehouse or database. Models DBT works with Model paradigm, the main idea is that you can create models through the transformation using SQL statements based on tables sources or existing models Every SQL file located in your model folder will create a model into your warehouse when the command below runs. dbt run Remember that a model can be created through a source or another model and don't worry about this, I'll show you more details about it. Step 3: Setting up database connection After project already created, we need to set up our database's connection and here at this post, we're going to use postgres as database. After initialize the project a bunch of files are created and one of them is called profiles.yml. profiles.yml file is responsible to control the different profiles to the different database's connection like dev and production environment. If you've noticed, we can't see this file on the image above because this file is created outside of project to avoid sensitive credentials. You can find this file in ~/.dbt/ directory. If you note, we have one profile named dbt_blog and a target called dev, by default the target refer to dev with the database's connection settings. Also, It's possible to create one or more profiles and targets, it enables working with different environments. Another important detail is that dbt_blog profile should be specified on dbt_project.yml file as a default profile. For the next sections, we'll discuss what and how dbt_project.yml file works it. Step 4: Creating dbt_project.yml file Every DBT project has a dbt_project.yml file, you can set up informations like project name, directories, profiles and materialization type. name: 'dbt_blog' version: '1.0.0' config-version: 2 profile: 'dbt_blog' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_packages" models: dbt_blog: # Config indicated by + and applies to all files under models/example/ mart: +materialized: table Note that profile field was set up as the same profile specified on profiles.yml file and another important detail is about materialized field. Here was set up as a "table" value but by default, is a "view". Materialized fields allows you to create models as a table or view on each run. There are others type of materialization but we won't discuss here and I recommend see dbt docs. Step 5: Creating our first model Creating first files Let's change a little and let's going to create a sub-folder on model directory called mart and inside this folder we're going to create our .SQL files and also another important file that we don't discuss yet called schema.yml. Creating schema file Schema files are used to map sources and to document models like model's name, columns and more. Now you can create a file called schema.yml e fill up with these informations below. version: 2 sources: - name: wetube tables: - name: account - name: city - name: state - name: channel - name: channel_subs - name: video - name: video_like - name: user_address models: - name: number_of_subs_by_channel description: "Number of subscribers by channel" columns: - name: id_channel description: "Channel's ID" tests: - not_null - name: channel description: "Channel's Name" tests: - not_null - name: num_of_subs description: "Number of Subs" tests: - not_null Sources: At sources field you can include tables from your warehouse or database that's going to be used on model creation. models: At models field you can include the name's model, columns and their description Creating a model This part is where we can create SQL scripts that's going to result in our first model. For the first model, we're going to create a SQL statement to represent a model that we can see the numbers of subscribers by channel. Let's create a file called number_of_subs_by_channel.sql and fill up with these scripts below. with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_channel_subs as ( select * from {{ source('wetube','channel_subs') }} ), number_of_subs_by_channel as ( select source_channel.id_channel, source_channel.name, count(source_channel_subs.id_subscriber) num_subs from source_channel_subs inner join source_channel using (id_channel) group by 1, 2 ) select * from number_of_subs_by_channel Understanding model creation Note that we have multiple scripts separated by common table expression (CTE) that becomes useful to understand the code. DBT enables using Jinja template {{ }} bringing a better flexibility to our code. The usage of keyword source inside Jinja template means that we're referring source tables. To refer a model you need to use ref keyword. The last SELECT statement based on source tables generates the model that will be created as table in the database. Running our first model Run the command below to create our first model dbt run Output: Creating another model Imagine that we need to create a model containing account information and it's channels. Let's get back to schema.yml file to describe this new model. - name: account_information description: "Model containing account information and it's channels" columns: - name: id_account description: "Account ID" tests: - not_null - name: first_name description: "First name of user's account" tests: - not_null - name: last_name description: "Last name of user's account" tests: - not_null - name: email description: "Account's email" tests: - not_null - name: city_name description: "city's name" tests: - not_null - name: state_name description: "state's name" tests: - not_null - name: id_channel description: "channel's Id" tests: - not_null - name: channel_name description: "channel's name" tests: - not_null - name: channel_creation description: "Date of creation name" tests: - not_null Now, let's create a new SQL file and name it as account_information.sql and put scripts below: with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_city as ( select * from {{ source('wetube','city') }} ), source_state as ( select * from {{ source('wetube','state') }} ), source_user_address as ( select * from {{ source('wetube','user_address') }} ), source_account as ( select * from {{ source('wetube','account') }} ), account_info as ( select account.id_user as id_account, account.first_name, account.last_name, account.email, city.name as city_name, state.name as state_name, channel.id_channel, channel.name as channel, channel.creation_date as channel_creation FROM source_account account inner join source_channel channel on (channel.id_account = account.id_user) inner join source_user_address user_address using (id_user) inner join source_state state using (id_state) inner join source_city city using (id_city) ) select * from account_info Creating our last model For our last model, we going to create a model about how many likes has a video. Let's change again the schema.yml to describe and to document our future and last model. - name: total_likes_by_video description: "Model containing total of likes by video" columns: - name: id_channel description: "Channel's Id" tests: - not_null - name: channel description: "Channel's name" tests: - not_null - name: id_video description: "Video's Id" tests: - not_null - name: title description: "Video's Title" tests: - not_null - name: total_likes description: "Total of likes" tests: - not_null Name it a file called total_likes_by_video.sql and put the code below: with source_video as ( select * from {{ source('wetube','video') }} ), source_video_like as ( select * from {{ source('wetube','video_like') }} ), source_account_info as ( select * from {{ ref('account_information') }} ), source_total_like_by_video as ( select source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title, count(*) as total_likes FROM source_video_like inner join source_video using (id_video) inner join source_account_info using (id_channel) GROUP BY source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title ORDER BY total_likes DESC ) select * from source_total_like_by_video Running DBT again After creation of our files, let's run them again to create the models dbt run Output The models were created in the database and you can run select statements directly in your database to check it. Model: account_information Model: number_of_subs_by_channel Model: total_likes_by_video Step 6: DBT Docs Documentation After generated our models, now we're going to generate docs based on these models. DBT generates a complete documentation about models and sources and their columns and also you can see through a web page. Generating docs dbt docs generate Running docs on webserver After docs generated you can run command below to start a webserver on port 8080 and see the documentation locally. dbt docs serve Lineage Another detail about documentation is that you can see through of a Lineage the models and it's dependencies. Github code You can checkout this code through our Github page. Cool? I hope you guys enjoyed it!

  • Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes

    There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent. In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode: PERMISSIVE DROPMALFORMED FAILFAST CSV file content This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema. "type","country","city","engines","first_flight","number_built" "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A220","Canada","Calgary","two",2013-03-02,179 "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A320","France","Lyon","two",1986-06-10,10066 "Airbus A330","France","Lyon","two",1992-01-02,1521 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York",2,1967-08-03,10636 "Airbus A220","Canada","Calgary",2,2013-03-02,179 Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate. Creating a Dataframe using PERMISSIVE mode The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of PERMISSIVE mode Creating a Dataframe using DROPMALFORMED mode The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of DROPMALFORMED mode After execution it's possible to realize that the corrupted records aren't available at Dataframe. Creating a Dataframe using FAILFAST mode Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "FAILFAST") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Cool? I hope you enjoyed it!

  • Integração Salesforce com AWS AppFlow e S3

    É muito comum que Devs Salesforce precisem integrar dados do CRM com um banco de dados ou data lake por exemplo de uma empresa. Quando o volume desses dados são altos, fazer essa integração via REST pode não ser a forma mais elegante, até porque o próprio Salesforce possui algumas travas que dificultam por exemplo enviar um payload com uma lista de objetos para integração. O passo a passo a seguir mostra como podemos enviar dados de uma API qualquer para o Salesforce ou do Salesforce para qualquer API utilizando o serviço AppFlow da AWS para a orquestração e S3 para integração. Envio de dados do Salesforce para S3. Vamos criar um objeto simples para simular a integração. No Salesforce vá em Setup e procure por Integrations >> Platform Events e crie um novo evento. Na seção Custom Fields, vamos criar apenas dois campos para exemplificar, ID e Name. Agora, vamos focar em algumas tarefas no painel AWS. Procure pelo serviço AppFlow e crie um novo Fluxo. Dê um nome para o Fluxo e clique em próximo. No segundo passo, você deverá selecionar a origem do evento. Nesse caso, dentre as diversas opções você encontrará “Salesforce”. Após selecionar, você precisará especificar a conexão. Faça login na conta de sua preferencia (Production ou Sandbox) e prossiga. Ainda nesse formulário de origem, selecione o evento criado no Salesforce no primeiro passo deste tutorial. No formulário de destino, selecione um bucket do S3 onde deseja que os arquivos de integração sejam salvos e prossiga. Na próxima tela, faça o mapeamento dos campos do Evento que você deseja salvar no arquivo S3. Clique em próximo no Passo 4 e em Criar Fluxo no passo 5. Será exibida uma mensagem que o Fluxo foi criado. Por fim, clique em Ativar Fluxo. Pronto!!! O Fluxo já está com status ativo. Agora vamos escrever um simples teste no Salesforce para enviar um evento e verificar o funcionamento do nosso fluxo criado acima. Abra o Developer Console do Salesforce, vá em Debug e selecione Open Execute Anonymous Window. Vamos publicar um simples evento como o exemplo abaixo: EventTest__e myEvent = new EventTest__e(); myEvent.ID__c = 1; myEvent.Name__c = 'My First Event'; EventBus.publish(myEvent); E… PRONTO! Agora é só verificar o S3 e você irá encontrar uma pasta com o nome do Fluxo e os arquivos do evento na pasta. Nessa situação você pode conectar qualquer API do seu ambiente externo ao Salesforce e consumir esses dados. Envio de Dados S3 para Salesforce E se podemos usar o S3 para consumir os dados do Salesforce por uma API externa, também podemos usá-lo para enviar dados para o CRM. Nesse caso faremos da seguinte forma. Dentro do mesmo bucket S3 do exemplo anterior, criaremos uma pasta chamada ingest que usaremos para enviar os arquivos com os dados que queremos integrar com o Salesforce. Agora, voltamos ao AppFlow e vamos criar um novo fluxo. No próximo passo, selecionamos o bucket e a pasta criada e ainda o formato de dados JSON. Em seguida, informamos o destino desses dados, selecionamos Salesforce, a sandbox e o objeto onde será inserido os dados. Nesse caso, criamos um objeto customizado no Salesforce chamado S3EventIngest. Nesse primeiro momento, deve ser adicionado a pasta especificada acima “ingest”, um arquivo JSON contendo um mapeamento DE-PARA relacionando os atributos do Json que será consumido pelo fluxo e os campos referentes ao salesforce. No caso, adicionamos o um arquivo.json com o seguinte código: {"ID":"ID__c","Name":"Name__c"} No próximo passo, faça o mapeamento dos atributos JSON que serão enviados no arquivo para o S3 para os campos do objeto Salesforce. Com o mapeamento feito, seguiremos os próximos passos sem fazer nenhuma modificação até que o fluxo seja concluído e criado. É possível configurar triggers para que o fluxo seja executado de forma periódica, mas aqui vamos apenas executar manualmente. Depois de criado, podemos fazer um teste simples. Para isso, vamos EXCLUIR o arquivo da pasta ingest que adicionamos anteriormente e adicionar um arquivo com registros reais de fato. Nesse caso, vamos adicionar a pasta ingest um arquivo com o seguinte json: [ {"ID":779,"Name":"My Event Json 1"}, {"ID":780,"Name":"My Event Json 2"}, {"ID":781,"Name":"My Event Json 3"}, {"ID":782,"Name":"My Event Json 4"} ] Após carregar o arquivo no S3, volte ao fluxo e clique no botão “Executar Fluxo” Espere alguns segundos e você verá uma mensagem de sucesso e também pode consultar o histórico de execução com os últimos processamentos. Além disso, consulte também no Salesforce e você verá os dados inseridos. É isso pessoal, espero ter ajudado!!

  • Working with Schemas in Spark Dataframes using PySpark

    What's a schema in the Dataframes context? Schemas are metadata that allows working with a standardized Data. Well, that was my definition about schemas but we also can understanding schemas as a structure that represents a data context or a business model. Spark enables using schemas with Dataframes and I believe that is a good point to keep data quality, reliability and we also can use these points to understand the data and connect to the business. But if you know a little more about Dataframes, working with schema isn't a rule. Spark provides features that we can infer to a schema without defined schemas and reach to the same result, but depending on the data source, the inference couldn't work as we expect. In this post we're going to create a simple Dataframe example that will read a CSV file without a schema and another one using a defined schema. Through examples we'll can see the advantages and disadvantages. Let's to the work! CSV File content "type","country","engines","first_flight","number_built" "Airbus A220","Canada",2,2013-03-02,179 "Airbus A320","France",2,1986-06-10,10066 "Airbus A330","France",2,1992-01-02,1521 "Boeing 737","USA",2,1967-08-03,10636 "Boeing 747","USA",4,1969-12-12,1562 "Boeing 767","USA",2,1981-03-22,1219 If you noticed in the content above, we have different data types. We have string, numeric and date column types. The content above will be represented by airliners.csv in the code. Writing a Dataframe without Schema from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result It seems that worked fine but if you look with attention, you'll realize that in the schema structure there are some field types that don't match with their values, for example fields like number_built, engines and first_flight. They aren't string types, right? We can try to fix it adding the following parameter called "inferSchema" and setting up to "true". from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result Even inferring the schema, the field first_flight keeping as a string type. Let's try to use Dataframe with a defined schema to see if this details will be fixed. Writing a Dataframe with Schema Now it's possible to see the differences between the codes. We're adding an object that represents the schema. This schema describes the content in CSV file, you can note that we have to describe the column name and type. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() StructSchema = StructType([ StructField("type", StringType()), StructField("country", StringType()), StructField("engines", IntegerType()), StructField("first_flight", DateType()), StructField("number_built", IntegerType()) ]) air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .schema(StructSchema) \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result After we defined the schema, all the field types match with their values. This shows how important is to use schemas with Dataframes. Now it's possible to manipulate the data according to the type with no concerns. Cool? I hope you enjoyed it!

  • Differences between External and Internal tables in Hive

    There are two ways to create tables in the Hive context and this post we'll show the differences, advantages and disadvantages. Internal Table Internal tables are known as Managed tables and we'll understand the reason in the following. Now, let's create an internal table using SQL in the Hive context and see the advantages and disadvantages. create table coffee_and_tips_table (name string, age int, address string) stored as textfile; Advantages To be honest I wouldn't say that it's an advantage but Internal tables are managed by Hive Disadvantages Internal tables can't access remote storage services for example in clouds like Amazon AWS, Microsoft Azure and Google Cloud. Dropping Internal tables all the data including metadata and partitions will be lost. External Table External tables has some interesting features compared to Internal tables and it's a good and recommended approach when we need to create tables. In the script below you can see the difference between Internal table creation and External table related to the last section. We just added the reserved word external in the script. create external table coffee_and_tips_external (name string, age int, address string) stored as textfile; Advantages The data and metadata won't be lost if drop table External tables can be accessed and managed by external process External tables allows access to remote storage service as a source location Disadvantages Again, I wouldn't say that it's a disadvantage but if you need to change schema or dropping a table, probably you'll need to run a command to repair the table as shown below. msck repair table Depending on the volume, this operation may take some time to complete. To check out a table type, run the following command below and you'll see at the column table_type the result. hive> describe formatted That's it, I hope you guys enjoy it! References: https://hive.apache.org/

  • How to generate random Data using Datafaker lib

    Sometimes in our projects we have to fill Java objects for unit tests or even to create a database dump with random data to test a specific feature and etc. We need to be creative trying to create names, street names, cities or documents. There's an interesting and helpful Java library called Datafaker that allows to create random data with a large number of providers. Providers are objects based on a context, for example: If you want to generate data about a Person object, there's a specific provider for this context that will generate name, last name and etc. If you need to create a unit test that you need data about address, you'll find it. In this post we'll create some examples using Maven but the library also provides support for Gradle projects. Maven net.datafaker datafaker 1.1.0 Generating Random Data Let's create a simple Java class that contains some properties like name, last name, address, favorite music genre and food. public class RandomPerson { public String firstName; public String lastName; public String favoriteMusicGenre; public String favoriteFood; public String streetAddress; public String city; public String country; @Override public String toString() { return "firstName=" + firstName + "\n" + "lastName=" + lastName + "\n" + "favoriteMusicGenre="+favoriteMusicGenre + "\n" + "favoriteFood=" + favoriteFood + "\n" + "streetAddress=" + streetAddress + "\n" + "city=" + city + "\n" + "country=" + country ; } static void print(RandomPerson randomPerson){ System.out.println( randomPerson ); } } In the next step we'll fill an object using the providers that we quote in the first section. First of all, we create an object called randomData that represents Faker class. This class contains all the providers in the example below. public static void main(String[] args) { Faker randomData = new Faker(); RandomPerson randomPerson = new RandomPerson(); randomPerson.firstName = randomData.name().firstName(); randomPerson.lastName = randomData.name().lastName(); randomPerson.favoriteMusicGenre = randomData.music().genre(); randomPerson.favoriteFood = randomData.food().dish(); randomPerson.streetAddress = randomData.address().streetAddress(); randomPerson.city = randomData.address().city(); randomPerson.country = randomData.address().country(); print(randomPerson); } After the execution, we can see the results like this at the console: Result firstName=Dorthy lastName=Jones favoriteMusicGenre=Electronic favoriteFood=Cauliflower Penne streetAddress=7411 Darin Gateway city=Gutkowskifort country=Greece Every execution will be a new result because of providers are randoms. Another interesting feature is that we can set up the Locale when instantiate an object. Faker randomData = new Faker(Locale.JAPANESE); See the results based on Local.JAPANESE: Result firstName=航 lastName=横山 favoriteMusicGenre=Non Music favoriteFood=French Fries with Sausages streetAddress=418 美桜Square city=南斉藤区 country=Togo Isn't a cool library!? See you!

  • Creating a Java code using Builder pattern

    If you're using a language that supports oriented object in your project, probably there's some lines of codes with Builder pattern. If not, this post will help you to understand about it. What's Builder Pattern? Builder Pattern belongs to an area in Software Engineer called Design Patterns, the idea behind of a pattern is to solve commons problems in your project following best practices. Builder Pattern is very useful when we need to provide a better solution in the creational objects part in our project. Sometimes we need to instantiate an object with a lot of parameters and this could be a problem if you pass a wrong parameter value. Things like this happen every time and results in bugs and you will need to find out where's the issue and maybe, refactoring code to improve it. Let's write some lines of code to see how does Builder Pattern works and when to apply it. The code below is an example of a traditional Class with constructor used to load values when the object instantiated. public class PersonalInfo { private final String firstName; private final String lastName; private final Date birthDate; private final String address; private final String city; private final String zipCode; private final String state; private final int population; public PersonalInfo(String firstName, String lastName, Date birthDate, String address, String city, String zipCode, String state, int population){ this.firstName = firstName; this.lastName = lastName; this.birthDate = birthDate; this.address = address; this.city = city; this.zipCode = zipCode; this.state = state; this.population = population; } } And now, we can instantiate the object simulating the client code. PersonalInfo personalInfo = new BuilderPattern("Mônica", "Avelar", new Date(), "23 Market Street", "San Francisco", "94016", "CA", 800000); If you notice, to instantiate the object we should pass all the values related to each property of our class and there's a big chance to pass a wrong value. Another disadvantage of this approach is the possibility to not scale it. In this example we have a few properties but tomorrow we can add more properties and the disadvantage becomes clearer. Working with Builder Pattern Let's rewrite the code above to the Builder Pattern and see the differences. public class PersonalInfo { private final String firstName; private final String lastName; private final Date birthDate; private final String address; private final String city; private final String zipCode; private final String state; private final int population; public static class Builder { private String firstName; private String lastName; private Date birthDate; private String address; private String city; private String zipCode; private String state; private int population; public Builder firstName(String value) { firstName = value; return this; } public Builder lastName(String value) { lastName = value; return this; } public Builder birthDate(Date value) { birthDate = value; return this; } public Builder address(String value) { address = value; return this; } public Builder city(String value) { city = value; return this; } public Builder zipCode(String value) { zipCode = value; return this; } public Builder state(String value) { state = value; return this; } public Builder population(int value) { population = value; return this; } public BuilderPattern build() { return new BuilderPattern(this); } } public PersonalInfo(Builder builder){ firstName = builder.firstName; lastName = builder.lastName; birthDate = builder.birthDate; address = builder.address; city = builder.city; zipCode = builder.zipCode; state = builder.state; population = builder.population; } } If you compare both codes you will conclude that the first one is smaller and better to understand than the second one and I agree it. The advantage of the usage is going to be clear for the next example when we create an object based on Builder pattern. Simulating client code using Builder Pattern PersonalInfo personalInfo = new Builder() .firstName("Mônica") .lastName("Avelar") .birthDate(new Date()) .address("23 Market Street") .city("San Francisco") .zipCode("94016") .state("CA") .population(80000) .build(); This last example of creation object using Builder Pattern turns an organized code following the best practices and easy to read. Another advantage of Builder is that we can identify each property before passing values. To be honest I've been using Builder Pattern in my projects and I strongly recommend you do it the same in your next projects. There's an easier way to implement Builder pattern in projects nowadays and I'll write a post about it, see you soon!

  • Versionamento de Banco de Dados utilizando Flyway

    O versionamento de dados em banco de dados é uma ferramenta muito poderosa, facilitando a gestão das mudanças do banco, além de ajudar a contornar cenários de crise. No mundo Java existem duas ferramentas bastante utilizadas, o Liquibase e o Flyway, para este Post utilizaremos o Flyway. Sobre o Flyway O Flyway trabalha com um conceito de migrations, basicamente uma migration é uma script SQL que será executado dentro do banco de dados. Caso ocorra algum erro, o próprio flyway se encarregará de fazer o rollback daquele script SQL. A vantagem de utilizar tal sistema é que os desenvolvedores podem ter uma forma única de criação e alteração das estruturas do banco de dados. Sendo assim, caso alguém crie uma nova tabela ou um novo índice de tabela, outras pessoas podem pegar essa alteração e aplicar em seu banco local. Além disso, suas migrations se tornam uma espécie de documentação do seu banco de dados, refletindo tudo que foi acontecendo ao longo do tempo. Para fazer essas mudanças, o flyway utiliza uma tabela de versionamento (por padrão, a tabela flyway_schema_history) onde ele verifica qual foi a última modificação executada e qual foi o hash do script executado. Essa verificação do hash é muito importante porque é a forma que a ferramenta possui para controlar as modificações em um script já executado, avisando a quem esta executando para revisar aquela migration tendo em vista que pode ter ocorrido alguma corrupção de dados ou se o script foi alterado. Instalação Existem várias formas de utilizar o Flyway, podendo ser dentro da aplicação Java, Maven, Gradle ou linha de comando. Para este Post, iremos explorar a linha de comando. Faça a instalação de acordo com o seu sistema operacional acessando este link https://flywaydb.org/download/community Para verificar se foi instalado corretamente, execute o comando abaixo em seu terminal: * Você deve estar dentro da pasta descompactada caso não tenha adicionado a pasta a seu PATH flyway -v Logo após a instalação, basta clonar o projeto base no seguinte repositório no Github . Execução Para executar as migrations é necessário informar ao Flyway alguns parâmetros para que ele possa se conectar ao banco de dados: Via terminal, execute o comando abaixo passando os parâmetros necessários como a URL, usuário e senha. flyway migrate -url=... -user=... -password=... Esse usuário informado precisa de ter acesso e permissão para o comandos listados nas migrations, ok? Também é possível criar um arquivo de configuração flyway.conf dentro do projeto para que não seja necessário informar os dados de: url , user e password. Lembrando que o arquivo deve estar na raiz do projeto, conforme o projeto base indicado no artigo. É isso, curtiu? Até mais!

  • Criando Alarmes com AWS CloudWatch

    A utilização de alarmes é um requisito imprescindível quando se trabalha com diversos recursos na nuvem. É uma das formas mais eficientes em monitorar e entender o comportamento de uma aplicação caso as métricas estejam diferentes do esperado. Neste post, vamos criar um alarme do zero utilizando AWS CloudWatch. Existem diversas outras ferramentas que nos possibilita configurarmos alarmes, mas quando se trabalha com AWS, configurar alarmes utilizando CloudWatch é bem simples e rápido. Let's do this! Primeiramente, precisamos escolher algum recurso no qual vamos monitorar, para simplificar, vamos criar um SQS e utilizar uma de suas métricas para que a gente possa utilizar no nosso alarme. Criando uma fila SQS Vamos criar uma fila SQS simples e escolher alguma métrica para que a gente possa utilizar no nosso alarme. Acesso o console da AWS e na barra de pesquisa, digite "SQS" conforme imagem abaixo e em seguida, acesse o serviço. Após acessar o serviço, clique em Criar Fila Vamos criar uma fila padrão para o exemplo, o nome da fila será sqs-messages conforme mostrado abaixo. Não precise se atentar aos outros detalhes, apenas clique no botão Criar Fila para finalizar a criação. Fila criada, agora o próximo passo é criar o nosso alarme. Criando o Alarme Todo recurso/serviço é composto de uma ou mais métricas, são basicamente características. Imagine um carro, no carro temos características do tipo, KMs rodados, KM/hora, quantidade de passageiros em um certo momento e quantidade de combustível no tanque e entre outros. Em uma fila SQS temos métricas do tipo, número de mensagens enviadas, número de recebimentos vazios, tamanho das mensagens enviadas, número de mensagens recebidas e etc. Para o nosso exemplo, vamos escolher a métrica baseada em número de mensagens enviadas (numberOfMessagesSent). Na prática, poderíamos escolher essa métrica por vários motivos. Imagine em uma aplicação que, em casos de instabilidade, mensagens serão enviadas para uma determinada fila, assim, evitando perdas. É de grande importância sabermos que existem mensagens que foram enviadas para uma determinada fila neste contexto de instabilidade. Dessa forma, o alarme deverá ser acionado. Acesse a AWS via console e procure por Cloudwatch na barra de pesquisa, conforme imagem abaixo. Após acessar o serviço, clique na opção Em alarme no canto esquerdo da tela e em seguida clique no botão Criar alarme. Selecione a métrica conforme a tela abaixo Escolha a opção SQS Em seguida, clique em Métricas da fila Na barra de pesquisa, digite sqs-messages para buscar as métricas relacionadas a SQS criada nos passos anteriores. Após a pesquisa da fila e suas métricas, selecione o item da coluna Nome da métrica identificado como NumberOfMessagesSent, e em seguida, clique em Selecionar métrica. Na próxima tela iremos configurar mais detalhes sobre o alarme como: período, estatística, condição para o acionamento, valor limite e pontos de dados. Configurando as métricas Nome da métrica: é a métrica escolhida nos passos anteriores, ou seja, esta métrica mede o número de mensagens enviadas para a SQS (NumberOfMessagesSent). QueueName: Nome da SQS no qual o alarme será configurado. Estatística: Neste campo podemos escolher opções como Média, Soma, Mínimo e entre outros. Isso vai depender do contexto o qual você vai precisar para configurar o alarme e a métrica. Neste exemplo escolhemos Soma, pois queremos pegar a soma do número de mensagens enviadas em um determinado período. Período: Neste campo definimos o período em que o alarme será acionado caso atinja a condição limite, no qual, será definido nos próximos passos. Configurando as condições Tipo de limite: Para este exemplo vamos utilizar o Estático. Sempre que o NumberOfMessagesSent for...: Vamos selecionar a opção Maior que...: Neste campo vamos configurar a quantidade de NumberOfMessagesSent como condição para acionar o alarme. Vamos colocar 5. Configuração adicional Para a configuração adicional, temos o campo Pontos de dados para o alarme no qual gostaria de detalhar um pouco mais o seu funcionamento. Pontos de dados para o alarme Esta opção adicional, flexibiliza a configuração do alarme combinado as condições definidas anteriormente. Por padrão, esta configuração é: 1 de 1 Como funciona? O primeiro campo refere-se a quantidade de pontos e o segundo campo, refere-se ao período. Mantendo as configurações anteriores mais a adicional significa que, o alarme será acionado caso a métrica NumberOfMessagesSent for maior que a soma de 5 em um período de 5 minutos. Até então, a configuração adicional padrão não altera as configurações definidas anteriormente, nada muda. Agora, vamos alterar esta configuração para entender melhor. Vamos alterar de: 1 de 1 para 2 de 2. Isso nos diz que, quando a condição do alarme for atingida, ou seja, para a métrica NumberOfMessagesSent, a soma for maior do que 5, o alarme será acionado para 2 pontos de dados em 10 minutos. Perceba que o período foi multiplicado devido ao segundo campo com o valor 2. Resumindo de forma mais objetiva, mesmo que a condição seja atingida, o alarme somente será acionado se existir 2 pontos de dados acima do limite em um período de 10 minutos. Isso nos dá uma certa flexibilidade afim de evitar falsos alarmes. Vamos entender melhor ainda quando efetuamos alguns testes de acionamento do alarme. Vamos manter as configurações a seguir e clique em Próximo Configurando as ações Na próxima tela, vamos configurar as ações responsáveis por notificar um destino caso o alarme seja acionado. Nesta tela, vamos manter a configuração Em alarme e em seguida, vamos criar um novo tópico e por último, vamos adicionar um email no qual desejamos receber as notificações de erros. Na prática, existem melhores formas de notificação, mas não entraremos nestes detalhes. Selecione a opção Criar novo tópico e preencha com um nome desejado e em seguida, digite um email valido no campo Endpoints de e-mail que receberão a notificação ... Feito o preenchimento, clique em Criar tópico e em seguida, um email será enviado para confirmar a inscrição no tópico criado. Faça a confirmação no seu email e clique em Próximo na tela do alarme para prosseguir com a criação. Agora, precisamos adicionar o nome do alarme na tela abaixo e em seguida clicar em Próximo. A próxima tela será a de revisão, clique em Criar alarme para finalizar a criação. Pronto, agora temos um alarme criado e é hora de testar. Testando o alarme Para testar o alarme, vamos enviar 6 mensagens para a fila criada anteriormente conforme a tela abaixo. Repita esta ação por 6 vezes clicando em Enviar mensagem, fique a vontade para mudar o conteúdo da mensagem. Após o envio das mensagens, perceba que mesmo que o limite seja ultrapassado, o alarme não foi acionado. Isso se deve a configuração adicional, caso contrário, bastaria ter configurado os pontos de dados para 1 de 1 para que o alarme fosse acionado. Agora, vamos enviar várias outras mensagens que exceda o limite em períodos curtos dentro da janela de 10 minutos. Perceba que na imagem acima o alarme foi acionado, pois além de ter atingido a condição especificada nas configurações, também atingiu os 2 pontos de dados. Verifique no email adicionado nas configurações de notificação pois, provavelmente um email foi enviado com os detalhes do alarme. Após o período de 10 minutos, o alarme passa de Em alarme para OK. É isso, curtiu? Até mais!

  • Configurando S3 Bucket event notification com SQS via Terraform

    O S3 (Simple Storage Service) possibilita notificar através de eventos, quando alguma ação ocorre dentro de um Bucket ou em alguma pasta específica. Em outras palavras, funciona como um listener, ou seja, qualquer ação ocorrida em uma origem, ocorrerá um evento de notificação para um destino. O que seriam estas ações? Qualquer ação que acontece dentro de um Bucket como, a criação de um novo arquivo manual ou através de um upload, criação de uma pasta, remoção de arquivos, restauração de arquivos e etc. Destinos Para cada configuração de notificação de evento, deve existir um destino. Para este destino, será enviado informações sobre cada ação, por exemplo: Um novo arquivo foi criado em uma pasta específica, assim, informações sobre o arquivo será enviada, como a data de criação, tamanho do arquivo, o tipo do evento, nome do arquivo e dentre outros. Lembrando que neste processo, não é enviado o conteúdo do arquivo, GG? Existem 3 tipos de destinos disponíveis: Lambda SNS SQS Entendendo o funcionamento na prática Neste post vamos criar uma configuração de notificação de evento em um Bucket S3, simular uma ação e entender o comportamento final. Todo este conteúdo é baseado na documentação oferecida pela HashiCorp, empresa mantenedora do Terraform. Poderíamos criar esta configuração via console mas por motivos de boas práticas, utilizaremos o Terraform como ferramenta IAC. Para quem não conhece muito sobre Terraform, segue este tutorial sobre Primeiros passos utilizando Terraform na AWS. No próximo passo, criaremos um fluxo simulando a imagem abaixo. Criaremos uma configuração no Bucket S3, que para cada arquivo criado na pasta files/, um evento de notificação será enviado para um fila SQS. Criando os arquivos Terraform Crie no seu projeto, uma pasta chamada terraform/ e a partir de agora, todos os arquivos .tf serão criados dentro dela. Agora, crie um arquivo chamado vars.tf onde armazenaremos as variáveis que serão utilizadas. variable "region" { default = "us-east-1" type = string } variable "bucket" { type = string } Crie um arquivo chamado provider.tf , onde adicionaremos a configuração do provider, no qual será a AWS. Isso significa que, o Terraform utilizará a AWS como nuvem para criar os recursos e fará o download dos plugins necessários na inicialização. Copie o código abaixo para o arquivo. provider "aws" { region = "${var.region}" } Crie um arquivo chamado s3.tf , onde adicionaremos as configurações para a criação de um novo Bucket S3 que será usado para a prática resource "aws_s3_bucket" "s3_bucket_notification" { bucket = "${var.bucket}" } Agora, crie um arquivo chamado sqs.tf , onde adicionaremos as configurações para a criação de uma fila SQS e algumas permissões conforme código abaixo: resource "aws_sqs_queue" "s3-notifications-sqs" { name = "s3-notifications-sqs" policy = <

  • Criando Kinesis Firehose via Terraform e enviando eventos com Java

    Neste post, vamos fazer um passo a passo de como criar um AWS Kinesis Firehose (Delivery Stream) via Terraform e enviar alguns eventos via aplicação Java. Antes de começarmos, precisamos entender o que é o Kinesis Firehose e como é o seu funcionamento. O Kinesis Firehose é um dos recursos do Amazon Kinesis, que se resume ao serviço de streaming de dados da AWS. O Kinesis Firehose possibilita capturar um grande volume de dados através de uma origem, processar em tempo real e entregar os dados para um destino. Para entender melhor, vamos pensar no cenário a seguir: Imagine que você e seu time esteja trabalhando em um processo que será necessário enviar um grande volume de dados em tempo real para o S3. Estes objetos são arquivos no formato JSON representados por dados transacionais de compras e vendas de uma plataforma de e-commerce. A ideia de enviar ao S3 é que, o time está planejando criar um Datalake no futuro, e já pensa em organizar os dados em um repositório único de dados, neste caso, o S3. O time criou uma API que faz o PUT dos objetos no S3, e foram percebendo que aquele processo era limitado, custoso, não muito escalável pelo fato do volume de dados ser alto e o mais importante, não era resiliente. Com base nas limitações, o time entendeu que, o mais certo seria trabalhar em cima de uma ferramenta que pudesse fazer toda a parte de streaming dos dados e que a utilização do Kinesis Firehose seria uma melhor abordagem. Pensando neste novo cenário, arquitetura foi mudada para a seguinte: Certo, mas qual é o benefício de ter adicionado o Kinesis Firehose? O Kinesis possibilita gerenciar todo o volume de dados, independente do volume. Não será mais necessário acessar o S3 de forma contínua e gerenciar todo o fluxo de PUT dos objetos utilizando SDK Client, tornando o processo mais resiliente e simples. O Kinesis Firehose possibilita entregar as mensagens para diferentes destinos, como o S3, Redshift, Elastic Search e dentre outros. É um ótimo recurso para um processo analítico de dados e não necessita de uma administração contínua, pois o recurso já é gerenciado pela AWS. Resumindo, se você e seu time precisa processar grandes volumes de dados em tempo real e entregar os dados para diferentes destinos, o Kinesis Firehose pode ser uma boa solução. O cenário acima descrito, foi apenas para que a gente pudesse entender o contexto em qual cenário aplicar o Kinesis Firehose, existem vários outros cenários em que podemos aplicar. Na próxima etapa, vamos criar um Kinesis utilizando uma ferramenta IAC, neste caso, o Terraform. O Terraform já foi abordado em outros posts e vale a pena dar uma olhada caso você seja novo por aqui, pois o foco não é falar sobre Terraform, combinado? Após a criação do Kinesis Firehose utilizando Terraform, vamos criar um código Java que vai nos permitir enviar alguns eventos para o Kinesis Firehose processar e salvar estes mesmos dados em um Bucket no S3, finalizando o fluxo de streaming. Lembre-se que, para executar os passos seguintes é necessário ter uma conta na AWS e ter as credencias já configuradas, ok? Criando o Terraform Crie um arquivo no seu projeto chamado main.tf e adicione o código abaixo provider "aws" { region = "${var.region}" } Entendendo o código acima provider "aws": É no provider que definimos qual é a cloud que utilizaremos, no nosso caso a AWS. region: É onde definimos a região em que o serviço será executado. Agora, crie o arquivo vars.tf . É neste arquivo que vamos definir a variáveis e os seus valores. variable "region" { default = "us-east-1" type = "string" } variable "bucket" { type = "string" } Entendendo o código acima variable "region": Declaração da variável region. Esta mesma variável é utilizado no arquivo main.tf. default: Definição padrão do valor da variável, neste caso us-east-1. Esta será a região em que o serviço será executado. type: Tipo da variável variable "bucket": Declaração da variável bucket. E por último, crie o arquivo kinesis.tf, este nome é sugestivo, combinado? resource "aws_kinesis_firehose_delivery_stream" "kinesis_firehose" { destination = "extended_s3" name = "kns-delivery-event" extended_s3_configuration { bucket_arn = "${aws_s3_bucket.bucket.arn}" role_arn = "${aws_iam_role.firehose_role.arn}" prefix = "ingest/year=!{timestamp:yyyy}/ month=!{timestamp:MM}/ day=!{timestamp:dd}/ hour=!{timestamp:HH}/" error_output_prefix = "ingest/ !{firehose:error-output-type}/ year=!{timestamp:yyyy} /month=!{timestamp:MM}/ day=!{timestamp:dd}/hour=!{timestamp:HH}/" } } resource "aws_s3_bucket" "bucket" { bucket = "${var.bucket}" acl = "private" } resource "aws_iam_role" "firehose_role" { name = "firehose_test_role" assume_role_policy = < com.amazonaws aws-java-sdk-kinesis 1.12.70 org.projectlombok lombok 1.18.20 provided Criando o modelo Classe Java chamada Event.java que representará o modelo do evento. import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; import java.util.UUID; @Data public class Event { @JsonProperty("event_date") private String eventDate; @JsonProperty("event_id") private UUID eventId; @JsonProperty("provider") private String provider; @JsonProperty("blog") private String blog; @JsonProperty("post_id") private UUID postId; } Criando o Service A classe KinesisService.java será a responsável por parte da lógica e envio dos eventos. public class KinesisService { static String PROVIDER = "AWS KINESIS FIREHOSE"; static String BLOG = "Coffee and Tips"; static String KNS_DELIVERY_NAME = "kns-delivery-event"; static String RECORD_ID = "Record ID "; static String EVENT = "Event "; public static AmazonKinesisFirehose kinesisFirehoseClient(){ AmazonKinesisFirehose amazonKinesisFirehose = AmazonKinesisFirehoseClient.builder() .withRegion(Regions.US_EAST_1.getName()) .build(); return amazonKinesisFirehose; } @SneakyThrows public static void sendDataWithPutRecordBatch(int maxRecords){ PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); putRecordBatchRequest .setDeliveryStreamName(KNS_DELIVERY_NAME); String line = ""; List records = new ArrayList<>(); while(maxRecords > 0){ line = getData(); String data = line + "\n"; Record record = new Record() .withData(ByteBuffer.wrap(data.getBytes())); records.add(record); maxRecords --; } putRecordBatchRequest.setRecords(records); PutRecordBatchResult putRecordResult = kinesisFirehoseClient() .putRecordBatch(putRecordBatchRequest); putRecordResult .getRequestResponses() .forEach(result -> System.out .println(RECORD_ID + result.getRecordId())); } @SneakyThrows public static void sendDataWithPutRecord(int maxRecords){ PutRecordRequest PutRecordRequest = new PutRecordRequest(); PutRecordRequest .setDeliveryStreamName(KNS_DELIVERY_NAME); String line = ""; while(maxRecords > 0){ line = getData(); String data = line + "\n"; System.out.println(EVENT + data); Record record = new Record() .withData(ByteBuffer .wrap(data.getBytes())); PutRecordRequest.setRecord(record); PutRecordResult putRecordResult = kinesisFirehoseClient() .putRecord(PutRecordRequest); System.out.println(RECORD_ID + putRecordResult.getRecordId()); maxRecords --; } } @SneakyThrows public static String getData(){ Event event = new Event(); event.setEventId(UUID.randomUUID()); event.setPostId(UUID.randomUUID()); event.setBlog(BLOG); event.setEventDate(LocalDateTime.now().toString()); event.setProvider(PROVIDER); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(event); } } Entendendo o código acima O código acima possui algumas adaptações visando o funcionamento do Kinesis para o Post. Para um ambiente de produção, recomendo utilizar técnicas de padrões de projeto ou adotar melhores práticas visando um código mais organizado, GG? 1. O método kinesisFirehoseClient( ) é o responsável por configurar e manter uma interação entre os serviços do Kinesis Firehose e a nossa aplicação através da interface AmazonKinesisFirehose. Esta interface provê os seguintes contratos: putRecordBatch(PutRecordBatchRequest var1) putRecord(PutRecordRequest var1) Todo o código de envio foi baseado nos contratos acima e falaremos mais sobre eles nos próximos passos. Por último, perceba que estamos utilizando a região US-EAST-1, a mesma região definida no Terraform. 2. O método sendDataWithPutRecordBatch(int maxRecords) será o nosso método responsável por enviar os dados em Batch a partir do contrato putRecordBatch(PutRecordBatchRequest var1). A ideia de usar o envio em Batch, é a possibilidade de, em um único request, enviarmos uma grande quantidade de dados. Este cenário é bem comum para aplicações que trabalham com um grande volume de dados. Um ponto importante a ser lembrado é que, esta operação suporta até 4 MB de dados ou até 500 registros por requisição. Para o método sendDataWithPutRecordBatch(int maxRecords), estamos utilizando o parâmetro maxRecords, que simulará uma quantidade específica de dados dentro do loop para serem enviados via Batch ao fim da interação, ou seja, o seu uso será apenas para a simulação. 3. Para entendermos sobre o contrato putRecord(PutRecordRequest var1) fornecido pela interface AmazonKinesisFirehose, criamos o método sendDataWithPutRecord(int maxRecords). Este cenário difere bastante do envio em Batch, pois o tipo de envio será do tipo single record, ou seja, para cada dado enviado, um novo request deve ser criado e para cada envio, existe um limite de dado de até 1.000 KiB de tamanho. 4. O método getData( ) será o responsável por montar o objeto do tipo Event com valores aleatórios antes do envio e transformar em um JSON através do ObjectMapper. Executando o código Para a execução do código, vamos criar um método main( ) que fará a execução dos métodos acima. Executando o método sendDataWithPutRecord(int maxRecords) public static void main(String[] args) { sendDataWithPutRecord(2000); } No código acima estamos invocando o método sendDataWithPutRecord(int maxRecords) e passando no parâmetro o valor 2000, ou seja, iremos enviar 2000 eventos ou 2000 requests. Resposta do envio no console Para cada envio, o campo putRecordResult.getRecordId() retorna um ID. Executando o método sendDataWithPutRecordBatch(int maxRecords) public static void main(String[] args) { sendDataWithPutRecordBatch(500); } Já no método acima, especificamos o valor 500 para o parâmetro maxRecords, pois é o valor máximo de dados permitidos para envio em cada requisição. Lembrando que este parâmetro simula a quantidade de dados fictícios a serem enviados. Destino final Após a execução dos envios, os dados serão entregues ao Bucket do S3 definido na execução do Terraform e serão organizados conforme a imagem abaixo. Na imagem acima, os dados estão organizados entre ano, mês, dia e hora de envio. Os arquivos vão estar disponíveis dentro das pastas relacionadas a hora (hour=), e perceba a diferença de 5 minutos entre eles na coluna Última modificação, ou seja, seguindo a configuração de Buffer interval conforme falamos nos passos anteriores. Pronto, os dados estão sendo entregues! Destruindo os recursos Caso você esteja acompanhando este Post de forma experimental, creio que seja necessário apagar toda a stack em que criamos para evitar uma conta inesperada da AWS. Podemos executar o seguinte comando via Terraform para destruir os recursos em que criamos. terraform destroy -var 'bucket=digite o nome do seu bucket' Mas antes, apague todos os arquivos e pastas do Bucket, pois caso o contrário, o destroy não vai funcionar, GG? Após a execução do comando acima, você deverá confirmar a destruição, é só digitar yes para confirmar. Caso queria entender mais a fundo sobre Kinesis Firehose, recomendo a leitura na documentação oficial. Código completo no GitHub. É isso, curtiu? Até mais!

  • Criando uma Lambda AWS usando Python + Deploy

    Neste post, iremos criar uma aplicação Lambda AWS usando Python. Será um passo a passo, desde criar o código fonte, que será bem simples, passando pela criação da role e por fim, o deploy da aplicação. Este tutorial será baseado na documentação da AWS, pois, muitas vezes os usuários possuem uma dificuldade em ler uma documentação por talvez ser um pouco verbosa ou até estar em outra língua, por isso, decidi criar este passo a passo baseando-se na própria documentação da AWS e espero que gostem. O que é um Lambda? Lambdas são conhecidos como aplicações serverless, ou seja, são aplicações que não tem a necessidade em configurar um servidor memória, CPU etc. Todo o poder computacional é gerenciado pela própria AWS e isso nos dá a flexibilidade de apenas se preocupar com o código que será executado. Característica de um Lambda O Lambda possui uma característica diferente de uma aplicação tradicional, que tem uma característica de uptime 24/7, para isso, temos que ter em mente a finalidade do seu uso. A característica principal é que um Lambda é executado para uma tarefa específica e após o término, ela basicamente "morre". Isso é muito positivo, pois gera uma boa economia e os recursos são bem mais utilizados. Imagine que você recebeu uma tarefa para criar uma aplicação que enviará emails algumas vezes durante o dia. Faz sentido manter uma aplicação online 100% do tempo para este tipo de tarefa? Você até pode fazer, mas estaria desperdiçando recursos, por na maioria do tempo a aplicação estar ociosa e o tempo de execução da tarefa seria em questão de poucos minutos ou nem isso. Já um Lambda, seria uma abordagem ideal para este cenário. Imagine que os emails devem ser enviados 6 vezes ao dia em um intervalo de 4 horas. A aplicação executaria a tarefa, terminaria e depois de 4 horas executaria novamente. E cada execução seria em torna de 2 minutos. Resumindo, o objetivo da tarefa seria alcançado em pouco tempo e com um baixo custo computacional. Mãos na massa Nos próximos passos, criaremos o nosso código em Python na versão 3.8, em seguida, a role com as permissões necessárias para a execução do Lambda. Vamos anexar a role criada á uma policy, criaremos a aplicação Lambda, e em seguida, vamos compactar o arquivo com o código para o deploy, e por fim, faremos uma invocação para execução da Lambda. Os passos após o código, iremos executar utilizando CLI, para isso faça a instalação do AWS CLI seguindo esta documentação . O CLI é uma das maneiras de interação com a AWS e é bem simples a sua utilização. Existem outras maneiras também de interagir com a AWS para a criação de recursos, o mais comum no mercado é utilizando ferramentas IAC, como Cloudformation e Terraform. Já até criamos um tutorial sobre estes recursos, basta acessar os links abaixo para entender melhor, mas para este tutorial, seguiremos como base a documentação da AWS, utilizando CLI. Criando recursos utilizando Cloudformation Primeiros passos utilizando Terraform na AWS Após a instalação do CLI, configure suas credenciais da AWS executando o comando abaixo via terminal. Caso você não conheça este comando, recomendo a leitura de como utilizar, nesta documentação, aws configure Criando o código da Lambda A seguir, criaremos o código que será executado na aplicação Lambda, será um código muito simples apenas para exemplificar a execução. Utilizaremos Python neste exemplo, mas é possível utilizar várias outras linguagens de programação para a criação da Lambda. Crie um arquivo chamado lambda_function.py e escreva o código abaixo. import logging logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): name = event['yourname'] message = {'Message': 'hello ' + name + ' ,this is your first ' 'Lambda application!' } logger.info(message) Criando a role O comando a seguir criará a role chamada lambda-default-role. A role é uma forma de dar permissão para um recurso específico executar uma ação específica. Execute via terminal. aws iam create-role --role-name lambda-default-role --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{ "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]}' Após a execução do comando acima, um output como este será mostrado. { "Role": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" } } ] }, "RoleId": "AROA5WDL5GTDVI43EJEUOV", "CreateDate": "2021-09-12T22:11:04Z", "RoleName": "lambda-default-role", "Path": "/", "Arn": "arn:aws:iam::id-da-sua-conta:role/lambda-default-role" } } Anexando a role á uma policy Para este passo, basta executar o comando abaixo via terminal aws iam attach-role-policy --role-name lambda-default-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Zipando o arquivo Python Neste passo, vamos compactar o arquivo Python contendo o código que escrevemos nos passos anteriores, para isso, acesse a pasta onde o arquivo se encontra e execute o comando abaixo. zip my-deployment-package.zip lambda_function.py Criando a função Lambda e efetuando o deploy Agora criaremos a função Lambda chamada my-first-lambda e no mesmo comando, vamos anexar o código compactado no passo anterior. Lembre-se que este comando deve ser executado na mesma pasta onde o arquivo foi zipado, ok? aws lambda create-function --function-name my-first-lambda --zip-file fileb://my-deployment-package.zip --handler lambda_function.lambda_handler --runtime python3.8 --role arn:aws:iam::id-da-sua-conta:role/lambda-default-role Entendendo o comando acima aws lambda create-function --function-name my-first-lambda: Comando para criar a função Lambda chamada my-first-lambda --zip-file fileb://my-deployment-package.zip: Anexa o arquivo my-deployment-package.zip na criação da função, onde contém o código fonte. --handler lambda_function.lambda_handler: Esse parâmetro seta a nome da função lambda_handler, ou seja, quando o Lambda for invocado, será está função que será executada. Esta mesma função deve existir no arquivo Python, conforme criamos anteriormente no primeiro passo. --runtime python3.8: Neste parâmetro, passamos como argumento a linguagem em que estamos usando para o Lambda, no nosso caso, Python com a versão 3.8 --role arn:aws:iam::id-da-sua-conta:role/lambda-default-role: Por último, passamos o valor do parâmetro role. O valor deste parâmetro é mostrado no output após a criação da role, no campo Arn, basta copiar e colar. Pronto, agora que você entendeu, basta executar via terminal para a criação definitiva do Lambda. Veja que o Lambda foi criado com sucesso, agora vamos fazer algumas invocações para que o Lambda seja executado. Invocando o Lambda Neste passo, vamos executar o Lambda através de uma invocação manual. Existem diversas outras formas em invocar uma aplicação Lambda. A invocação pode ser feita através de um Event-Bridge que funcionaria como um gatilho, no qual podemos configurar um cron, que poderá invocar o Lambda de tempos pré-determinados, como, minuto a minuto, hora em hora, 1 vez por semana e dentre outras opções. Existem várias outras formas, todas elas são descritas na documentação da AWS, mas para este post, faremos de forma manual, utilizando o CLI. aws lambda invoke --function-name my-first-lambda --payload '{"yourname": "Mônica"}' output.txt Entendendo o comando acima aws lambda invoke --function-name my-first-lambda: Invoca o Lambda my-first-lambda ---payload '{"yourname": "Mônica"}': No payload é onde passamos o valor do input que o Lambda vai processar. Neste caso, estamos passando um JSON com uma informação. O segundo parâmetro, o output.txt, será o arquivo que armazenará informações como, o response do Lambda caso exista ou algum log de erro, caso aconteça. Você pode acessar os logs do Lambda através do Cloudwatch para visualizar melhor os logs que definimos no código Python. Atualizando o código Caso precise alterar o código, refaça o passo em que zipamos o arquivo e em seguida, execute o comando abaixo. aws lambda update-function-code --function-name my-first-lambda --zip-file fileb://my-deployment-package.zip Pronto, agora temos um Lambda! Caso queira ler a documentação da AWS sobre como criar um Lambda, siga esta documentação. É isso, curtiu? Até mais!