top of page

Search

52 itens encontrados para ""

  • How to save costs on S3 running Data Lake

    Cloud services provides useful resources to scale your business faster but not always we can measure cloud costs when we’re starting a business from the scratch or even being a solid business, costs always makes part of the strategy for any company who want to provide a better service. Me and my teammates have worked in a Data platform based on events enable to process 350 million events every day. We provide data to the client applications and to the businesses teams to make decisions and it always a challenge do deal with the massive data traffic and how we can maintain these data and saving money with storage at the same time. Storage is too expensive and there are some strategies to save money. For this post I’ll describe some strategies that we’ve adopted to save costs on S3 (Simple Storage Service) and I hope we can help it. Strategies Strategy #1 Amazon S3 storage classes Amazon S3 provides a way to manage files through life cycle settings, out there you can set ways to move files to different storage classes depending on the file’s age and access frequency. This strategy can save a lot of money to your company. Working with storage class enable us saving costs. By default, data are stored on S3 Standard storage class. This storage type has some benefits of storage and data access but we realized that after data transformed in the Silver layer, data in the Bronze layer it wasn’t accessed very often and it was totally possible to move them to a cheaper storage class. We decided to move it using life cycle settings to S3 Intelligent Tiering storage class. This storage class it was a perfect fit to our context because we could save costs with storage and even in case to access these files for a reason we could keeping a fair cost. We’re working on for a better scenario which we could set it a life cycle in the Silver layer to move files that hasn’t been accessed for a period to a cheaper storage class but at the moment we need to access historical files with high frequency. If you check AWS documentation you’ll note that there’s some cheapest storage classes but you and your team should to analyse each case because how cheapest is to store data more expensive will be to access them. So, be careful, try to understand the patterns about storage and data access in your Data Lake architecture before choosing a storage class that could fit better to your business. Strategy #2 Partitioning Data Apache Spark is the most famous framework to process a large amount of data and has been adopted by data teams around the world. During the data transformation using Spark you can set it a Dataframe to partition data through a specific column. This approach is too useful to perform SQL queries better. Note that partitioning approach has no relation to S3 directly but the usage avoids full scans in S3 objects. Full scans means that after SQL queries, the SQL engine can load gigabytes even terabytes of data. This could be very expensive to your company, because you can be charged easily depending on amount of loaded data. So, partitioning data has an important role when we need to save costs. Strategy #3 Delta Lake vacuum Delta Lake has an interesting feature called vacuum that’s a mechanism to remove files from the disk with no usage. Usually teams adopt this strategy after restoring versions that some files will be remain and they won’t be managed by Delta Lake. For example, in the image below we have 5 versions of Delta tables and their partitions. Suppose that we need to restore to version because we found some inconsistent data after version 1. After this command, Delta will point his management to version 1 as the current version but the parquet files related to others version will be there with no usage. We can remove these parquets running vacuum command as shown below. Note that parquets related to versions after 1 were removed releasing space in the storage. For more details I strongly recommend seeing Delta Lake documentation. Well that’s it, I hope you enjoyed it!

  • Understanding Delta Lake Time Travel in 2 minutes

    Delta Lake provides a way to version data for operations like merge, update and delete. This makes transparent how data life cycle inside Delta Lake works it. For each operation a version will be incremented and if you have a table with multiple operations, different versions of table will be created. Delta Lake offers a mechanism to navigate over the different versions called Time Travel. It's a temporary way to access data from the past. For this post we're going to use this feature to see different versions of table. Below we have a Delta Table called people that all versions were generated through write operations using append mode. Current version When we perform a simple read on a table, the current version is always the must recent one. So, for this scenario, the current version is 2 (two). Note that we don't need to specify which version we want to use because we're not using Time Travel yet. session.read().format("delta").load("table/people") .orderBy("id").show(); Nothing changes at the moment, let's keep for the next steps. Working with Time Travel Here begins how we can work with Time Travel, for the next steps, we'll perform readings on the people table specifying different versions to understand how Time travel works. Reading Delta table - Version 0 (zero) Now we're going to work with different versions starting from the 0 (zero) version, let's read the table again but now adding a new parameter, take a look at the code below. session.read().format("delta") .option("versionAsOf", 0) .load("table/people") .orderBy("id").show(); Notice that we added a new parameter called versionAsOf , this parameter allows us to configure the number of version you want to restore temporarily for a table. For this scenario we configure the reading for the Delta Table version zero (0). This was the first version generated by Delta Lake after write operation. Reading Delta table - Version 1 (one) For this last step we're using the version one (1), note that the data from the previous version has been maintained because an append mode was executed. session.read().format("delta") .option("versionAsOf", 1) .load("table/people") .orderBy("id").show(); Delta lake has a lot of benefits and Time travels allows us flexibility in a Big Data architecture, for more details I recommend see the Delta Lake docs . Well that's it, I hope you enjoyed it.

  • Converting Parquet table to Delta Table

    For this post we're going to create examples to how convert parquet table to Delta table. First, we'll create a parquet table from scratch through a Spark Dataframe and then converting to Delta table. Using Delta table has some benefits comparing to a Parquet table. Delta enables to restore versions of your table through time travel function, ACID supports and more. Creating a Parquet table First of all, let's create a parquet table to be converted later to Delta Table. I'll prefer create a parquet table from scratch to bring a better understanding. The following code will be executed once, just to create a parquet table. We're going to use a Spark Dataframe that will be loaded from a JSON file containing semi-structured records. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); Dataset dataFrame = session.read().json("product.json"); dataframe.write().format("parquet").save("table/product"); } The above example, we start creating a SparkSession object to create and manage a Spark Dataframe that was loaded from the product.json file content. Alter load, the Dataframe creates (or write) a table in parquet format in the table/product directory. JSON content File represented by product.json file that contains semi-structured records. {"id":1, "name":"rice", "price":12.0, "qty": 2} {"id":2, "name":"beans", "price":7.50, "qty": 5} {"id":3, "name":"coke", "price":5.50, "qty": 2} {"id":4, "name":"juice", "price":3.80, "qty": 1} {"id":5, "name":"meat", "price":1.50, "qty": 1} {"id":6, "name":"ice-cream", "price":6.0, "qty": 2} {"id":7, "name":"potato", "price":3.70, "qty": 10} {"id":8, "name":"apple", "price":5.60, "qty": 5} After running the code above, parquet files will be generated in the table/product directory containing the files below. Converting Parquet table to Delta Table Now that we have a Parquet table already created, we can convert easily to Delta Table, let's do this. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); DeltaTable.convertToDelta(session, "parquet.`table/product`"); } DeltaTable.convertToDelta method is responsible to convert parquet table to Delta table. Note that we had to use SparkSession as a parameter and also specify the path of parquet table using this format "parquet.``" . The result after execution you can see in the picture below. After conversion running, Delta creates the famous _delta_log directory containing commit info and checkpoint files. Well that's it, I hope you enjoyed it!

  • First steps with DBT - Data Build Tool

    DBT has been used by a lot of companies on Data area and I believe that we can extract good insights in this post about it. That's going to be a practical post showing how DBT works it and hope you guys enjoy it. What's DBT? DBT means Data Build Tool and enables teams to transform data already loaded in their warehouse with simple select statements. DBT does the T in ELT processes, in the other words, he doesn't work to extract and load data but he's useful to transform it. Step 1: Creating a DBT Project Now, we're assume that DBT is already installed but if not, I recommend see this link. After DBT installed you can create a new project using CLI or you can clone this project from the DBT Github repository. Here for this post we're going to use CLI mode to create our project and also to complete the next steps. To create a new project, run the command below. dbt init After running this command, you need to type the project's name and which warehouse or database you're going to use like the image below. For this post, we're going to use postgres adapter. It's very important that you have a postgres database already installed or you can up a postgres image using docker. About adapters, DBT supports different of them and you can check here. I created a table structure and also loaded it with data simulating data from a video platform called wetube and we're going to use them to understand how DBT works it. Follow the structure: Step 2: Structure and more about DBT After running dbt init command to create the project, a structure of folders and files below will be created. I won't talk about the whole directories of project but I'd like to focus in two of them. Sources Sources are basically the data already loaded into your warehouse. In DBT process, sources have the same meaning of raw data. There's no folders representing source data for this project but you need to know about this term because we're going to set up tables already created as sources for the next sections. Seeds Seeds is an interesting and useful mechanism to load static data into your warehouse through CSV files. If you want to load these data you need to create a CSV file on this directory and run the command below. dbt seed For each field on CSV file, DBT will infer their types and create a table into warehouse or database. Models DBT works with Model paradigm, the main idea is that you can create models through the transformation using SQL statements based on tables sources or existing models Every SQL file located in your model folder will create a model into your warehouse when the command below runs. dbt run Remember that a model can be created through a source or another model and don't worry about this, I'll show you more details about it. Step 3: Setting up database connection After project already created, we need to set up our database's connection and here at this post, we're going to use postgres as database. After initialize the project a bunch of files are created and one of them is called profiles.yml. profiles.yml file is responsible to control the different profiles to the different database's connection like dev and production environment. If you've noticed, we can't see this file on the image above because this file is created outside of project to avoid sensitive credentials. You can find this file in ~/.dbt/ directory. If you note, we have one profile named dbt_blog and a target called dev, by default the target refer to dev with the database's connection settings. Also, It's possible to create one or more profiles and targets, it enables working with different environments. Another important detail is that dbt_blog profile should be specified on dbt_project.yml file as a default profile. For the next sections, we'll discuss what and how dbt_project.yml file works it. Step 4: Creating dbt_project.yml file Every DBT project has a dbt_project.yml file, you can set up informations like project name, directories, profiles and materialization type. name: 'dbt_blog' version: '1.0.0' config-version: 2 profile: 'dbt_blog' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_packages" models: dbt_blog: # Config indicated by + and applies to all files under models/example/ mart: +materialized: table Note that profile field was set up as the same profile specified on profiles.yml file and another important detail is about materialized field. Here was set up as a "table" value but by default, is a "view". Materialized fields allows you to create models as a table or view on each run. There are others type of materialization but we won't discuss here and I recommend see dbt docs. Step 5: Creating our first model Creating first files Let's change a little and let's going to create a sub-folder on model directory called mart and inside this folder we're going to create our .SQL files and also another important file that we don't discuss yet called schema.yml. Creating schema file Schema files are used to map sources and to document models like model's name, columns and more. Now you can create a file called schema.yml e fill up with these informations below. version: 2 sources: - name: wetube tables: - name: account - name: city - name: state - name: channel - name: channel_subs - name: video - name: video_like - name: user_address models: - name: number_of_subs_by_channel description: "Number of subscribers by channel" columns: - name: id_channel description: "Channel's ID" tests: - not_null - name: channel description: "Channel's Name" tests: - not_null - name: num_of_subs description: "Number of Subs" tests: - not_null Sources: At sources field you can include tables from your warehouse or database that's going to be used on model creation. models: At models field you can include the name's model, columns and their description Creating a model This part is where we can create SQL scripts that's going to result in our first model. For the first model, we're going to create a SQL statement to represent a model that we can see the numbers of subscribers by channel. Let's create a file called number_of_subs_by_channel.sql and fill up with these scripts below. with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_channel_subs as ( select * from {{ source('wetube','channel_subs') }} ), number_of_subs_by_channel as ( select source_channel.id_channel, source_channel.name, count(source_channel_subs.id_subscriber) num_subs from source_channel_subs inner join source_channel using (id_channel) group by 1, 2 ) select * from number_of_subs_by_channel Understanding model creation Note that we have multiple scripts separated by common table expression (CTE) that becomes useful to understand the code. DBT enables using Jinja template {{ }} bringing a better flexibility to our code. The usage of keyword source inside Jinja template means that we're referring source tables. To refer a model you need to use ref keyword. The last SELECT statement based on source tables generates the model that will be created as table in the database. Running our first model Run the command below to create our first model dbt run Output: Creating another model Imagine that we need to create a model containing account information and it's channels. Let's get back to schema.yml file to describe this new model. - name: account_information description: "Model containing account information and it's channels" columns: - name: id_account description: "Account ID" tests: - not_null - name: first_name description: "First name of user's account" tests: - not_null - name: last_name description: "Last name of user's account" tests: - not_null - name: email description: "Account's email" tests: - not_null - name: city_name description: "city's name" tests: - not_null - name: state_name description: "state's name" tests: - not_null - name: id_channel description: "channel's Id" tests: - not_null - name: channel_name description: "channel's name" tests: - not_null - name: channel_creation description: "Date of creation name" tests: - not_null Now, let's create a new SQL file and name it as account_information.sql and put scripts below: with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_city as ( select * from {{ source('wetube','city') }} ), source_state as ( select * from {{ source('wetube','state') }} ), source_user_address as ( select * from {{ source('wetube','user_address') }} ), source_account as ( select * from {{ source('wetube','account') }} ), account_info as ( select account.id_user as id_account, account.first_name, account.last_name, account.email, city.name as city_name, state.name as state_name, channel.id_channel, channel.name as channel, channel.creation_date as channel_creation FROM source_account account inner join source_channel channel on (channel.id_account = account.id_user) inner join source_user_address user_address using (id_user) inner join source_state state using (id_state) inner join source_city city using (id_city) ) select * from account_info Creating our last model For our last model, we going to create a model about how many likes has a video. Let's change again the schema.yml to describe and to document our future and last model. - name: total_likes_by_video description: "Model containing total of likes by video" columns: - name: id_channel description: "Channel's Id" tests: - not_null - name: channel description: "Channel's name" tests: - not_null - name: id_video description: "Video's Id" tests: - not_null - name: title description: "Video's Title" tests: - not_null - name: total_likes description: "Total of likes" tests: - not_null Name it a file called total_likes_by_video.sql and put the code below: with source_video as ( select * from {{ source('wetube','video') }} ), source_video_like as ( select * from {{ source('wetube','video_like') }} ), source_account_info as ( select * from {{ ref('account_information') }} ), source_total_like_by_video as ( select source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title, count(*) as total_likes FROM source_video_like inner join source_video using (id_video) inner join source_account_info using (id_channel) GROUP BY source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title ORDER BY total_likes DESC ) select * from source_total_like_by_video Running DBT again After creation of our files, let's run them again to create the models dbt run Output The models were created in the database and you can run select statements directly in your database to check it. Model: account_information Model: number_of_subs_by_channel Model: total_likes_by_video Step 6: DBT Docs Documentation After generated our models, now we're going to generate docs based on these models. DBT generates a complete documentation about models and sources and their columns and also you can see through a web page. Generating docs dbt docs generate Running docs on webserver After docs generated you can run command below to start a webserver on port 8080 and see the documentation locally. dbt docs serve Lineage Another detail about documentation is that you can see through of a Lineage the models and it's dependencies. Github code You can checkout this code through our Github page. Cool? I hope you guys enjoyed it!

  • Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes

    There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent. In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode: PERMISSIVE DROPMALFORMED FAILFAST CSV file content This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema. "type","country","city","engines","first_flight","number_built" "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A220","Canada","Calgary","two",2013-03-02,179 "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A320","France","Lyon","two",1986-06-10,10066 "Airbus A330","France","Lyon","two",1992-01-02,1521 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York",2,1967-08-03,10636 "Airbus A220","Canada","Calgary",2,2013-03-02,179 Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate. Creating a Dataframe using PERMISSIVE mode The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of PERMISSIVE mode Creating a Dataframe using DROPMALFORMED mode The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of DROPMALFORMED mode After execution it's possible to realize that the corrupted records aren't available at Dataframe. Creating a Dataframe using FAILFAST mode Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "FAILFAST") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Cool? I hope you enjoyed it!

  • Integração Salesforce com AWS AppFlow e S3

    É muito comum que Devs Salesforce precisem integrar dados do CRM com um banco de dados ou data lake por exemplo de uma empresa. Quando o volume desses dados são altos, fazer essa integração via REST pode não ser a forma mais elegante, até porque o próprio Salesforce possui algumas travas que dificultam por exemplo enviar um payload com uma lista de objetos para integração. O passo a passo a seguir mostra como podemos enviar dados de uma API qualquer para o Salesforce ou do Salesforce para qualquer API utilizando o serviço AppFlow da AWS para a orquestração e S3 para integração. Envio de dados do Salesforce para S3. Vamos criar um objeto simples para simular a integração. No Salesforce vá em Setup e procure por Integrations >> Platform Events e crie um novo evento. Na seção Custom Fields, vamos criar apenas dois campos para exemplificar, ID e Name. Agora, vamos focar em algumas tarefas no painel AWS. Procure pelo serviço AppFlow e crie um novo Fluxo. Dê um nome para o Fluxo e clique em próximo. No segundo passo, você deverá selecionar a origem do evento. Nesse caso, dentre as diversas opções você encontrará “Salesforce”. Após selecionar, você precisará especificar a conexão. Faça login na conta de sua preferencia (Production ou Sandbox) e prossiga. Ainda nesse formulário de origem, selecione o evento criado no Salesforce no primeiro passo deste tutorial. No formulário de destino, selecione um bucket do S3 onde deseja que os arquivos de integração sejam salvos e prossiga. Na próxima tela, faça o mapeamento dos campos do Evento que você deseja salvar no arquivo S3. Clique em próximo no Passo 4 e em Criar Fluxo no passo 5. Será exibida uma mensagem que o Fluxo foi criado. Por fim, clique em Ativar Fluxo. Pronto!!! O Fluxo já está com status ativo. Agora vamos escrever um simples teste no Salesforce para enviar um evento e verificar o funcionamento do nosso fluxo criado acima. Abra o Developer Console do Salesforce, vá em Debug e selecione Open Execute Anonymous Window. Vamos publicar um simples evento como o exemplo abaixo: EventTest__e myEvent = new EventTest__e(); myEvent.ID__c = 1; myEvent.Name__c = 'My First Event'; EventBus.publish(myEvent); E… PRONTO! Agora é só verificar o S3 e você irá encontrar uma pasta com o nome do Fluxo e os arquivos do evento na pasta. Nessa situação você pode conectar qualquer API do seu ambiente externo ao Salesforce e consumir esses dados. Envio de Dados S3 para Salesforce E se podemos usar o S3 para consumir os dados do Salesforce por uma API externa, também podemos usá-lo para enviar dados para o CRM. Nesse caso faremos da seguinte forma. Dentro do mesmo bucket S3 do exemplo anterior, criaremos uma pasta chamada ingest que usaremos para enviar os arquivos com os dados que queremos integrar com o Salesforce. Agora, voltamos ao AppFlow e vamos criar um novo fluxo. No próximo passo, selecionamos o bucket e a pasta criada e ainda o formato de dados JSON. Em seguida, informamos o destino desses dados, selecionamos Salesforce, a sandbox e o objeto onde será inserido os dados. Nesse caso, criamos um objeto customizado no Salesforce chamado S3EventIngest. Nesse primeiro momento, deve ser adicionado a pasta especificada acima “ingest”, um arquivo JSON contendo um mapeamento DE-PARA relacionando os atributos do Json que será consumido pelo fluxo e os campos referentes ao salesforce. No caso, adicionamos o um arquivo.json com o seguinte código: {"ID":"ID__c","Name":"Name__c"} No próximo passo, faça o mapeamento dos atributos JSON que serão enviados no arquivo para o S3 para os campos do objeto Salesforce. Com o mapeamento feito, seguiremos os próximos passos sem fazer nenhuma modificação até que o fluxo seja concluído e criado. É possível configurar triggers para que o fluxo seja executado de forma periódica, mas aqui vamos apenas executar manualmente. Depois de criado, podemos fazer um teste simples. Para isso, vamos EXCLUIR o arquivo da pasta ingest que adicionamos anteriormente e adicionar um arquivo com registros reais de fato. Nesse caso, vamos adicionar a pasta ingest um arquivo com o seguinte json: [ {"ID":779,"Name":"My Event Json 1"}, {"ID":780,"Name":"My Event Json 2"}, {"ID":781,"Name":"My Event Json 3"}, {"ID":782,"Name":"My Event Json 4"} ] Após carregar o arquivo no S3, volte ao fluxo e clique no botão “Executar Fluxo” Espere alguns segundos e você verá uma mensagem de sucesso e também pode consultar o histórico de execução com os últimos processamentos. Além disso, consulte também no Salesforce e você verá os dados inseridos. É isso pessoal, espero ter ajudado!!

  • Working with Schemas in Spark Dataframes using PySpark

    What's a schema in the Dataframes context? Schemas are metadata that allows working with a standardized Data. Well, that was my definition about schemas but we also can understanding schemas as a structure that represents a data context or a business model. Spark enables using schemas with Dataframes and I believe that is a good point to keep data quality, reliability and we also can use these points to understand the data and connect to the business. But if you know a little more about Dataframes, working with schema isn't a rule. Spark provides features that we can infer to a schema without defined schemas and reach to the same result, but depending on the data source, the inference couldn't work as we expect. In this post we're going to create a simple Dataframe example that will read a CSV file without a schema and another one using a defined schema. Through examples we'll can see the advantages and disadvantages. Let's to the work! CSV File content "type","country","engines","first_flight","number_built" "Airbus A220","Canada",2,2013-03-02,179 "Airbus A320","France",2,1986-06-10,10066 "Airbus A330","France",2,1992-01-02,1521 "Boeing 737","USA",2,1967-08-03,10636 "Boeing 747","USA",4,1969-12-12,1562 "Boeing 767","USA",2,1981-03-22,1219 If you noticed in the content above, we have different data types. We have string, numeric and date column types. The content above will be represented by airliners.csv in the code. Writing a Dataframe without Schema from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result It seems that worked fine but if you look with attention, you'll realize that in the schema structure there are some field types that don't match with their values, for example fields like number_built, engines and first_flight. They aren't string types, right? We can try to fix it adding the following parameter called "inferSchema" and setting up to "true". from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result Even inferring the schema, the field first_flight keeping as a string type. Let's try to use Dataframe with a defined schema to see if this details will be fixed. Writing a Dataframe with Schema Now it's possible to see the differences between the codes. We're adding an object that represents the schema. This schema describes the content in CSV file, you can note that we have to describe the column name and type. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() StructSchema = StructType([ StructField("type", StringType()), StructField("country", StringType()), StructField("engines", IntegerType()), StructField("first_flight", DateType()), StructField("number_built", IntegerType()) ]) air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .schema(StructSchema) \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result After we defined the schema, all the field types match with their values. This shows how important is to use schemas with Dataframes. Now it's possible to manipulate the data according to the type with no concerns. Cool? I hope you enjoyed it!

  • Differences between External and Internal tables in Hive

    There are two ways to create tables in the Hive context and this post we'll show the differences, advantages and disadvantages. Internal Table Internal tables are known as Managed tables and we'll understand the reason in the following. Now, let's create an internal table using SQL in the Hive context and see the advantages and disadvantages. create table coffee_and_tips_table (name string, age int, address string) stored as textfile; Advantages To be honest I wouldn't say that it's an advantage but Internal tables are managed by Hive Disadvantages Internal tables can't access remote storage services for example in clouds like Amazon AWS, Microsoft Azure and Google Cloud. Dropping Internal tables all the data including metadata and partitions will be lost. External Table External tables has some interesting features compared to Internal tables and it's a good and recommended approach when we need to create tables. In the script below you can see the difference between Internal table creation and External table related to the last section. We just added the reserved word external in the script. create external table coffee_and_tips_external (name string, age int, address string) stored as textfile; Advantages The data and metadata won't be lost if drop table External tables can be accessed and managed by external process External tables allows access to remote storage service as a source location Disadvantages Again, I wouldn't say that it's a disadvantage but if you need to change schema or dropping a table, probably you'll need to run a command to repair the table as shown below. msck repair table Depending on the volume, this operation may take some time to complete. To check out a table type, run the following command below and you'll see at the column table_type the result. hive> describe formatted That's it, I hope you guys enjoy it! References: https://hive.apache.org/

  • How to generate random Data using Datafaker lib

    Sometimes in our projects we have to fill Java objects for unit tests or even to create a database dump with random data to test a specific feature and etc. We need to be creative trying to create names, street names, cities or documents. There's an interesting and helpful Java library called Datafaker that allows to create random data with a large number of providers. Providers are objects based on a context, for example: If you want to generate data about a Person object, there's a specific provider for this context that will generate name, last name and etc. If you need to create a unit test that you need data about address, you'll find it. In this post we'll create some examples using Maven but the library also provides support for Gradle projects. Maven net.datafaker datafaker 1.1.0 Generating Random Data Let's create a simple Java class that contains some properties like name, last name, address, favorite music genre and food. public class RandomPerson { public String firstName; public String lastName; public String favoriteMusicGenre; public String favoriteFood; public String streetAddress; public String city; public String country; @Override public String toString() { return "firstName=" + firstName + "\n" + "lastName=" + lastName + "\n" + "favoriteMusicGenre="+favoriteMusicGenre + "\n" + "favoriteFood=" + favoriteFood + "\n" + "streetAddress=" + streetAddress + "\n" + "city=" + city + "\n" + "country=" + country ; } static void print(RandomPerson randomPerson){ System.out.println( randomPerson ); } } In the next step we'll fill an object using the providers that we quote in the first section. First of all, we create an object called randomData that represents Faker class. This class contains all the providers in the example below. public static void main(String[] args) { Faker randomData = new Faker(); RandomPerson randomPerson = new RandomPerson(); randomPerson.firstName = randomData.name().firstName(); randomPerson.lastName = randomData.name().lastName(); randomPerson.favoriteMusicGenre = randomData.music().genre(); randomPerson.favoriteFood = randomData.food().dish(); randomPerson.streetAddress = randomData.address().streetAddress(); randomPerson.city = randomData.address().city(); randomPerson.country = randomData.address().country(); print(randomPerson); } After the execution, we can see the results like this at the console: Result firstName=Dorthy lastName=Jones favoriteMusicGenre=Electronic favoriteFood=Cauliflower Penne streetAddress=7411 Darin Gateway city=Gutkowskifort country=Greece Every execution will be a new result because of providers are randoms. Another interesting feature is that we can set up the Locale when instantiate an object. Faker randomData = new Faker(Locale.JAPANESE); See the results based on Local.JAPANESE: Result firstName=航 lastName=横山 favoriteMusicGenre=Non Music favoriteFood=French Fries with Sausages streetAddress=418 美桜Square city=南斉藤区 country=Togo Isn't a cool library!? See you!

  • Creating a Java code using Builder pattern

    If you're using a language that supports oriented object in your project, probably there's some lines of codes with Builder pattern. If not, this post will help you to understand about it. What's Builder Pattern? Builder Pattern belongs to an area in Software Engineer called Design Patterns, the idea behind of a pattern is to solve commons problems in your project following best practices. Builder Pattern is very useful when we need to provide a better solution in the creational objects part in our project. Sometimes we need to instantiate an object with a lot of parameters and this could be a problem if you pass a wrong parameter value. Things like this happen every time and results in bugs and you will need to find out where's the issue and maybe, refactoring code to improve it. Let's write some lines of code to see how does Builder Pattern works and when to apply it. The code below is an example of a traditional Class with constructor used to load values when the object instantiated. public class PersonalInfo { private final String firstName; private final String lastName; private final Date birthDate; private final String address; private final String city; private final String zipCode; private final String state; private final int population; public PersonalInfo(String firstName, String lastName, Date birthDate, String address, String city, String zipCode, String state, int population){ this.firstName = firstName; this.lastName = lastName; this.birthDate = birthDate; this.address = address; this.city = city; this.zipCode = zipCode; this.state = state; this.population = population; } } And now, we can instantiate the object simulating the client code. PersonalInfo personalInfo = new BuilderPattern("Mônica", "Avelar", new Date(), "23 Market Street", "San Francisco", "94016", "CA", 800000); If you notice, to instantiate the object we should pass all the values related to each property of our class and there's a big chance to pass a wrong value. Another disadvantage of this approach is the possibility to not scale it. In this example we have a few properties but tomorrow we can add more properties and the disadvantage becomes clearer. Working with Builder Pattern Let's rewrite the code above to the Builder Pattern and see the differences. public class PersonalInfo { private final String firstName; private final String lastName; private final Date birthDate; private final String address; private final String city; private final String zipCode; private final String state; private final int population; public static class Builder { private String firstName; private String lastName; private Date birthDate; private String address; private String city; private String zipCode; private String state; private int population; public Builder firstName(String value) { firstName = value; return this; } public Builder lastName(String value) { lastName = value; return this; } public Builder birthDate(Date value) { birthDate = value; return this; } public Builder address(String value) { address = value; return this; } public Builder city(String value) { city = value; return this; } public Builder zipCode(String value) { zipCode = value; return this; } public Builder state(String value) { state = value; return this; } public Builder population(int value) { population = value; return this; } public BuilderPattern build() { return new BuilderPattern(this); } } public PersonalInfo(Builder builder){ firstName = builder.firstName; lastName = builder.lastName; birthDate = builder.birthDate; address = builder.address; city = builder.city; zipCode = builder.zipCode; state = builder.state; population = builder.population; } } If you compare both codes you will conclude that the first one is smaller and better to understand than the second one and I agree it. The advantage of the usage is going to be clear for the next example when we create an object based on Builder pattern. Simulating client code using Builder Pattern PersonalInfo personalInfo = new Builder() .firstName("Mônica") .lastName("Avelar") .birthDate(new Date()) .address("23 Market Street") .city("San Francisco") .zipCode("94016") .state("CA") .population(80000) .build(); This last example of creation object using Builder Pattern turns an organized code following the best practices and easy to read. Another advantage of Builder is that we can identify each property before passing values. To be honest I've been using Builder Pattern in my projects and I strongly recommend you do it the same in your next projects. There's an easier way to implement Builder pattern in projects nowadays and I'll write a post about it, see you soon!

  • Versionamento de Banco de Dados utilizando Flyway

    O versionamento de dados em banco de dados é uma ferramenta muito poderosa, facilitando a gestão das mudanças do banco, além de ajudar a contornar cenários de crise. No mundo Java existem duas ferramentas bastante utilizadas, o Liquibase e o Flyway, para este Post utilizaremos o Flyway. Sobre o Flyway O Flyway trabalha com um conceito de migrations, basicamente uma migration é uma script SQL que será executado dentro do banco de dados. Caso ocorra algum erro, o próprio flyway se encarregará de fazer o rollback daquele script SQL. A vantagem de utilizar tal sistema é que os desenvolvedores podem ter uma forma única de criação e alteração das estruturas do banco de dados. Sendo assim, caso alguém crie uma nova tabela ou um novo índice de tabela, outras pessoas podem pegar essa alteração e aplicar em seu banco local. Além disso, suas migrations se tornam uma espécie de documentação do seu banco de dados, refletindo tudo que foi acontecendo ao longo do tempo. Para fazer essas mudanças, o flyway utiliza uma tabela de versionamento (por padrão, a tabela flyway_schema_history) onde ele verifica qual foi a última modificação executada e qual foi o hash do script executado. Essa verificação do hash é muito importante porque é a forma que a ferramenta possui para controlar as modificações em um script já executado, avisando a quem esta executando para revisar aquela migration tendo em vista que pode ter ocorrido alguma corrupção de dados ou se o script foi alterado. Instalação Existem várias formas de utilizar o Flyway, podendo ser dentro da aplicação Java, Maven, Gradle ou linha de comando. Para este Post, iremos explorar a linha de comando. Faça a instalação de acordo com o seu sistema operacional acessando este link https://flywaydb.org/download/community Para verificar se foi instalado corretamente, execute o comando abaixo em seu terminal: * Você deve estar dentro da pasta descompactada caso não tenha adicionado a pasta a seu PATH flyway -v Logo após a instalação, basta clonar o projeto base no seguinte repositório no Github . Execução Para executar as migrations é necessário informar ao Flyway alguns parâmetros para que ele possa se conectar ao banco de dados: Via terminal, execute o comando abaixo passando os parâmetros necessários como a URL, usuário e senha. flyway migrate -url=... -user=... -password=... Esse usuário informado precisa de ter acesso e permissão para o comandos listados nas migrations, ok? Também é possível criar um arquivo de configuração flyway.conf dentro do projeto para que não seja necessário informar os dados de: url , user e password. Lembrando que o arquivo deve estar na raiz do projeto, conforme o projeto base indicado no artigo. É isso, curtiu? Até mais!

  • Criando Alarmes com AWS CloudWatch

    A utilização de alarmes é um requisito imprescindível quando se trabalha com diversos recursos na nuvem. É uma das formas mais eficientes em monitorar e entender o comportamento de uma aplicação caso as métricas estejam diferentes do esperado. Neste post, vamos criar um alarme do zero utilizando AWS CloudWatch. Existem diversas outras ferramentas que nos possibilita configurarmos alarmes, mas quando se trabalha com AWS, configurar alarmes utilizando CloudWatch é bem simples e rápido. Let's do this! Primeiramente, precisamos escolher algum recurso no qual vamos monitorar, para simplificar, vamos criar um SQS e utilizar uma de suas métricas para que a gente possa utilizar no nosso alarme. Criando uma fila SQS Vamos criar uma fila SQS simples e escolher alguma métrica para que a gente possa utilizar no nosso alarme. Acesso o console da AWS e na barra de pesquisa, digite "SQS" conforme imagem abaixo e em seguida, acesse o serviço. Após acessar o serviço, clique em Criar Fila Vamos criar uma fila padrão para o exemplo, o nome da fila será sqs-messages conforme mostrado abaixo. Não precise se atentar aos outros detalhes, apenas clique no botão Criar Fila para finalizar a criação. Fila criada, agora o próximo passo é criar o nosso alarme. Criando o Alarme Todo recurso/serviço é composto de uma ou mais métricas, são basicamente características. Imagine um carro, no carro temos características do tipo, KMs rodados, KM/hora, quantidade de passageiros em um certo momento e quantidade de combustível no tanque e entre outros. Em uma fila SQS temos métricas do tipo, número de mensagens enviadas, número de recebimentos vazios, tamanho das mensagens enviadas, número de mensagens recebidas e etc. Para o nosso exemplo, vamos escolher a métrica baseada em número de mensagens enviadas (numberOfMessagesSent). Na prática, poderíamos escolher essa métrica por vários motivos. Imagine em uma aplicação que, em casos de instabilidade, mensagens serão enviadas para uma determinada fila, assim, evitando perdas. É de grande importância sabermos que existem mensagens que foram enviadas para uma determinada fila neste contexto de instabilidade. Dessa forma, o alarme deverá ser acionado. Acesse a AWS via console e procure por Cloudwatch na barra de pesquisa, conforme imagem abaixo. Após acessar o serviço, clique na opção Em alarme no canto esquerdo da tela e em seguida clique no botão Criar alarme. Selecione a métrica conforme a tela abaixo Escolha a opção SQS Em seguida, clique em Métricas da fila Na barra de pesquisa, digite sqs-messages para buscar as métricas relacionadas a SQS criada nos passos anteriores. Após a pesquisa da fila e suas métricas, selecione o item da coluna Nome da métrica identificado como NumberOfMessagesSent, e em seguida, clique em Selecionar métrica. Na próxima tela iremos configurar mais detalhes sobre o alarme como: período, estatística, condição para o acionamento, valor limite e pontos de dados. Configurando as métricas Nome da métrica: é a métrica escolhida nos passos anteriores, ou seja, esta métrica mede o número de mensagens enviadas para a SQS (NumberOfMessagesSent). QueueName: Nome da SQS no qual o alarme será configurado. Estatística: Neste campo podemos escolher opções como Média, Soma, Mínimo e entre outros. Isso vai depender do contexto o qual você vai precisar para configurar o alarme e a métrica. Neste exemplo escolhemos Soma, pois queremos pegar a soma do número de mensagens enviadas em um determinado período. Período: Neste campo definimos o período em que o alarme será acionado caso atinja a condição limite, no qual, será definido nos próximos passos. Configurando as condições Tipo de limite: Para este exemplo vamos utilizar o Estático. Sempre que o NumberOfMessagesSent for...: Vamos selecionar a opção Maior que...: Neste campo vamos configurar a quantidade de NumberOfMessagesSent como condição para acionar o alarme. Vamos colocar 5. Configuração adicional Para a configuração adicional, temos o campo Pontos de dados para o alarme no qual gostaria de detalhar um pouco mais o seu funcionamento. Pontos de dados para o alarme Esta opção adicional, flexibiliza a configuração do alarme combinado as condições definidas anteriormente. Por padrão, esta configuração é: 1 de 1 Como funciona? O primeiro campo refere-se a quantidade de pontos e o segundo campo, refere-se ao período. Mantendo as configurações anteriores mais a adicional significa que, o alarme será acionado caso a métrica NumberOfMessagesSent for maior que a soma de 5 em um período de 5 minutos. Até então, a configuração adicional padrão não altera as configurações definidas anteriormente, nada muda. Agora, vamos alterar esta configuração para entender melhor. Vamos alterar de: 1 de 1 para 2 de 2. Isso nos diz que, quando a condição do alarme for atingida, ou seja, para a métrica NumberOfMessagesSent, a soma for maior do que 5, o alarme será acionado para 2 pontos de dados em 10 minutos. Perceba que o período foi multiplicado devido ao segundo campo com o valor 2. Resumindo de forma mais objetiva, mesmo que a condição seja atingida, o alarme somente será acionado se existir 2 pontos de dados acima do limite em um período de 10 minutos. Isso nos dá uma certa flexibilidade afim de evitar falsos alarmes. Vamos entender melhor ainda quando efetuamos alguns testes de acionamento do alarme. Vamos manter as configurações a seguir e clique em Próximo Configurando as ações Na próxima tela, vamos configurar as ações responsáveis por notificar um destino caso o alarme seja acionado. Nesta tela, vamos manter a configuração Em alarme e em seguida, vamos criar um novo tópico e por último, vamos adicionar um email no qual desejamos receber as notificações de erros. Na prática, existem melhores formas de notificação, mas não entraremos nestes detalhes. Selecione a opção Criar novo tópico e preencha com um nome desejado e em seguida, digite um email valido no campo Endpoints de e-mail que receberão a notificação ... Feito o preenchimento, clique em Criar tópico e em seguida, um email será enviado para confirmar a inscrição no tópico criado. Faça a confirmação no seu email e clique em Próximo na tela do alarme para prosseguir com a criação. Agora, precisamos adicionar o nome do alarme na tela abaixo e em seguida clicar em Próximo. A próxima tela será a de revisão, clique em Criar alarme para finalizar a criação. Pronto, agora temos um alarme criado e é hora de testar. Testando o alarme Para testar o alarme, vamos enviar 6 mensagens para a fila criada anteriormente conforme a tela abaixo. Repita esta ação por 6 vezes clicando em Enviar mensagem, fique a vontade para mudar o conteúdo da mensagem. Após o envio das mensagens, perceba que mesmo que o limite seja ultrapassado, o alarme não foi acionado. Isso se deve a configuração adicional, caso contrário, bastaria ter configurado os pontos de dados para 1 de 1 para que o alarme fosse acionado. Agora, vamos enviar várias outras mensagens que exceda o limite em períodos curtos dentro da janela de 10 minutos. Perceba que na imagem acima o alarme foi acionado, pois além de ter atingido a condição especificada nas configurações, também atingiu os 2 pontos de dados. Verifique no email adicionado nas configurações de notificação pois, provavelmente um email foi enviado com os detalhes do alarme. Após o período de 10 minutos, o alarme passa de Em alarme para OK. É isso, curtiu? Até mais!

bottom of page