top of page

Search

81 itens encontrados para ""

  • Diferenças entre os modes FAILFAST, PERMISSIVE e DROPMALFORMED em Spark Dataframes

    Existem algumas diferenças entre eles e vamos descobrir neste post. O parâmetro mode permite passar como valor, formas de validar Dataframes visando a qualidade dos dados. Neste post vamos criar um Dataframe utilizando PySpark e comparar as diferenças entre esses três tipos de mode, são eles: PERMISSIVE DROPMALFORMED FAILFAST Conteúdo do arquivo CSV Para este post, usaremos como exemplo um arquivo CSV contendo alguns registros variados, dentre eles, registros corrompidos. A definição de registros corrompidos para este contexto é que, para a coluna engines adicionamos alguns registros com tipos diferentes do que serão definidos no esquema. Neste caso, no esquema, a coluna engines será definida como Integer mas no conteúdo abaixo, temos alguns registro com valores do tipo String. "type","country","city","engines","first_flight","number_built" "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A220","Canada","Calgary","two",2013-03-02,179 "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A320","France","Lyon","two",1986-06-10,10066 "Airbus A330","France","Lyon","two",1992-01-02,1521 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York",2,1967-08-03,10636 "Airbus A220","Canada","Calgary",2,2013-03-02,179 Vamos começar criando um Dataframe simples que irá carregar dados do arquivo CSV contendo os registros acima, vamos supor que o conteúdo acima seja de um arquivo chamado avioes.csv. Para modelar o conteúdo, também estamos criando um esquema que nos permitirá validar os dados. Criando um Dataframe usando o mode PERMISSIVE O mode PERMISSIVE define valores de campos como nulos quando registros corrompidos são detectados para aquela coluna. Por padrão, se você não especificar nenhum valor para o parâmetro mode, Spark definirá o valor PERMISSIVE. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Resultado utilizando PERMISSIVE mode Perceba que para a colune ENGINES que possui registros que não seguem o padrão de tipagem definido, foram convertidas para o valor null. Criando um Dataframe usando o mode DROPMALFORMED O mode DROPMALFORMED ignora registros corrompidos. O que significa que, se você escolher este tipo de mode, os registros corrompidos não serão listados. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Resultado utilizando DROPMALFORMED mode Após definirmos o valor do mode como DROPMALFORMED, perceba que ao listar o registros do Dataframe, os registros corrompidos foram ignorados, não sendo apresentados no Dataframe. Criando um Dataframe usando o mode FAILFAST Diferente do mode DROPMALFORMED e PERMISSIVE, FAILFAST lança uma exceção quando detecta registros corrompidos. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "FAILFAST") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Resultado utilizando FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Spark: The Definitive Guide: Big Data Processing Made Simple (Versão Inglês) é uma referência completa para quem quer aprender o Spark e sobre as suas principais funcionalidades. Lendo esse livro, você vai aprender sobre DataFrames, Spark SQL através de exemplos práticos. O autor mergulha nas APIs de baixo nível do Spark, RDDs e também sobre como o Spark é executado em um cluster e como depurar e monitorar os aplicativos de clusters do Spark. Os exemplos práticos estão em Scala e Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library (Versão Inglês) com a nova versão do Spark, este livro explora os principais recursos do Spark, como o uso de Dataframes, Spark SQL no qual você pode usar SQL para manipular dados e Structured Streaming para processar dados em tempo real. Este livro contém exemplos práticos e trechos de código para facilitar a leitura. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (Versão Inglês) é um livro que explora as melhores práticas usando a linguagem Spark e Scala para lidar com aplicações de dados em larga escala, técnicas para transformações utilizando RDD, e também mostra como as novas interfaces do Spark SQL melhoram o desempenho sobre a estrutura de dados RDD do SQL, exemplos de Spark MLlib e o uso de bibliotecas de aprendizado de máquina de ML e muito mais. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming (Versão Inglês) abrange os conceitos básicos do Python por meio de exemplos interativos e práticas recomendadas. Learning Scala: Practical Functional Programming for the Jvm (Versão Inglês) é um excelente livro que aborda a linguagem Scala através de exemplos e exercícios práticos. Lendo este livro, você aprenderá sobre os principais tipos de dados, literais, valores e variáveis. Construir classes que compõem uma ou mais características para total reutilização, criar novas funcionalidades misturando-as na instanciação e muito mais. Scala é uma das principais linguagens em projetos de Big Data em todo o mundo, com grande uso em grandes empresas de tecnologia como o Twitter e também a linguagem principal do Spark. Bom é isso, espero que tenham gostado!

  • Como gerar massa de testes utilizando biblioteca Java Datafaker

    Às vezes em nossos projetos temos que preencher objetos Java para testes unitários ou até mesmo criar um dump de banco de dados com dados aleatórios para testar uma característica específica e etc. Precisamos ser criativos tentando criar nomes, nomes de ruas, cidades ou documentos. Existe uma biblioteca Java interessante e útil chamada Datafaker que permite criar dados aleatórios com um grande número de provedores. Provedores são objetos baseados em um contexto, por exemplo: Se você deseja gerar dados para o objeto person, existe um provedor específico para este contexto que irá gerar nome, sobrenome e etc. Neste post vamos criar alguns exemplos usando Maven como gerenciador de dependências Java, mas a biblioteca também fornece suporte para projetos Gradle. Maven net.datafaker datafaker 1.1.0 Gerando Dados aleatórios Vamos criar uma classe Java simples que contém algumas propriedades como nome, sobrenome, endereço, gênero musical favorito e comida. public class RandomPerson { public String firstName; public String lastName; public String favoriteMusicGenre; public String favoriteFood; public String streetAddress; public String city; public String country; @Override public String toString() { return "firstName=" + firstName + "\n" + "lastName=" + lastName + "\n" + "favoriteMusicGenre="+favoriteMusicGenre + "\n" + "favoriteFood=" + favoriteFood + "\n" + "streetAddress=" + streetAddress + "\n" + "city=" + city + "\n" + "country=" + country ; } static void print(RandomPerson randomPerson){ System.out.println( randomPerson ); } } No próximo passo, vamos preencher um objeto usando os provedores que citamos na primeira seção. Primeiramente, criamos um objeto chamado randomData do tipo Faker, pois é nesta classe que contém todos os provedores no exemplo abaixo. public static void main(String[] args) { Faker randomData = new Faker(); RandomPerson randomPerson = new RandomPerson(); randomPerson.firstName = randomData.name().firstName(); randomPerson.lastName = randomData.name().lastName(); randomPerson.favoriteMusicGenre = randomData.music().genre(); randomPerson.favoriteFood = randomData.food().dish(); randomPerson.streetAddress = randomData.address().streetAddress(); randomPerson.city = randomData.address().city(); randomPerson.country = randomData.address().country(); print(randomPerson); } Após a execução, podemos ver os resultados como este no console: Resultado firstName=Dorthy lastName=Jones favoriteMusicGenre=Electronic favoriteFood=Cauliflower Penne streetAddress=7411 Darin Gateway city=Gutkowskifort country=Greece Cada execução será um novo resultado porque os provedores são aleatórios. Outra funcionalidade interessante é que podemos configurar o Locale ao instanciar um objeto. Faker randomData = new Faker(Locale.JAPANESE); Veja os resultados com base no Local.JAPANESE: Resultado firstName=航 lastName=横山 favoriteMusicGenre=Non Music favoriteFood=French Fries with Sausages streetAddress=418 美桜Square city=南斉藤区 country=Togo Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Unit Testing Principles, Practices, and Patterns: Effective Testing Styles, Patterns, and Reliable Automation for Unit Testing, Mocking, and Integration Testing with Examples in C# (Versão Inglês) é um livro que cobre Princípios, Padrões e Práticas de Testes unitários. Ensina a projetar e escrever testes que visam áreas-chave do seu código, incluindo o modelo de domínio. Neste guia escrito de forma clara, você aprenderá a desenvolver testes e conjuntos de testes de qualidade profissional e a integrar testes em todo o ciclo de vida do aplicativo. Junit em Ação é um livro de testes popular que cobre técnicas como teste prático de seu código, usando técnicas de testes unitários e muito mais. É através da prática de testes que garantimos uma entrega de qualidade do software ao cliente final, garantindo a execução do Build e Deploy. Mastering Unit Testing Using Mockito and JUnit (Versão Inglês) é um livro que cobre as práticas do JUnit usando uma das mais famosas bibliotecas de teste chamada Mockito. Este livro ensina como criar e manter testes unitários automatizados utilizando recursos avançados do JUnit com o framework Mockito, práticas de integração contínua (famoso CI) utilizando ferramentas de mercado como Jenkins junto com um dos maiores gerenciadores de dependências em projetos Java, o Maven. Para você que está começando nesse mundo, é uma excelente escolha. Bom é isso, espero que tenham gostado!

  • Diferenças entre tabelas externas e internas usando Hive

    Existem duas formas de criar tabelas no contexto Hive e neste post vamos mostrar as diferenças, vantagens e desvantagens. Tabelas Internas Para entender melhor, vamos criar uma tabela interna usando SQL no contexto Hive e falaremos sobre as suas vantagens e desvantagens. create table coffee_and_tips_table (name string, age int, address string) stored as textfile; Vantagens Tabelas internas são gerenciadas pelo Hive. Desvantagens Tabelas internas não podem acessar serviços de armazenamento remoto, por exemplo, em nuvens como Amazon AWS, Microsoft Azure e Google Cloud. Ao apagar (drop table) as tabelas internas, todos os dados, incluindo metadados e partições, serão perdidos. Tabelas externas As tabelas externas possuem recursos interessantes se comparado as tabelas internas, e é uma abordagem recomendada quando precisamos criar tabelas para nossos projetos usando Hive. No script abaixo, você pode ver a diferença entre a criação da tabela interna e a tabela externa relacionada à última seção. Acabamos de adicionar a palavra reservada external como forma de identificar uma tabela externa para a criação. create external table coffee_and_tips_external (name string, age int, address string) stored as textfile; Vantagens Os dados e metadados não serão perdidos caso a tabela seja apagada (drop table). Tabelas externas podem ser acessadas e gerenciadas por processos externos. Tabelas externas permitem acesso ao serviço de armazenamento remoto como um local de origem. Desvantagens Não seria bem uma desvantagem, mas caso seja necessário alterar o esquema ou apagar uma tabela, provavelmente será necessário executar um comando de reparação da tabela, conforme mostrado abaixo. msck repair table Dependendo do volume, esta operação pode levar algum tempo para ser concluída. Para verificar se uma tabela é interna ou externa, execute o seguinte comando abaixo e você verá na coluna table_type o resultado. hive> describe formatted Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Programming Hive (Versão Inglês) este guia introduz você ao mundo do Apache Hive, a infraestrutura de data warehouse do Hadoop. Você aprenderá rapidamente como usar o dialeto SQL do Hive (HiveQL), consultar e analisar grandes conjuntos de dados armazenados no sistema de arquivos distribuído do Hadoop além de ensinar a como configurar um ambiente Hive. Spark: The Definitive Guide: Big Data Processing Made Simple (Versão Inglês) é uma referência completa para quem quer aprender o Spark e sobre as suas principais funcionalidades. Lendo esse livro, você vai aprender sobre DataFrames, Spark SQL através de exemplos práticos. O autor mergulha nas APIs de baixo nível do Spark, RDDs e também sobre como o Spark é executado em um cluster e como depurar e monitorar os aplicativos de clusters do Spark. Os exemplos práticos estão em Scala e Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library (Versão Inglês) com a nova versão do Spark, este livro explora os principais recursos do Spark, como o uso de Dataframes, Spark SQL no qual você pode usar SQL para manipular dados e Structured Streaming para processar dados em tempo real. Este livro contém exemplos práticos e trechos de código para facilitar a leitura. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (Versão Inglês) é um livro que explora as melhores práticas usando a linguagem Spark e Scala para lidar com aplicações de dados em larga escala, técnicas para transformações utilizando RDD, e também mostra como as novas interfaces do Spark SQL melhoram o desempenho sobre a estrutura de dados RDD do SQL, exemplos de Spark MLlib e o uso de bibliotecas de aprendizado de máquina de ML e muito mais. Bom é isso, espero que tenham gostado!

  • Como economizar custos no S3 executando um Data Lake

    Os serviços em nuvem ou Cloud Computing fornecem recursos úteis para escalar seu negócio mais rapidamente, mas nem sempre podemos medir os custos da nuvem quando estamos começando um negócio do zero ou mesmo sendo um negócio sólido. Custos sempre fazem parte da estratégia de qualquer empresa que deseja fornecer um melhor serviço Eu e meus colegas temos trabalhado em uma arquitetura de dados baseada em eventos que permite processar cerca de 350 milhões de eventos diariamente. Fornecemos dados para os consumidores finais e para as equipes estratégicas para tomadas de decisões. É sempre um grande desafio lidar com o tráfego massivo dos dados (Big Data), armazenamento e ao mesmo tempo, pensar em economia de custos com Cloud. O armazenamento é muito caro e existem algumas estratégias para economizar dinheiro. Neste post irei descrever algumas estratégias que temos adotado para economizar custos de S3 (Simple Storage Service) e espero contribuir através destas experiências. Estratégias Estratégia #1 Classe de armazenamento S3 (Amazon S3 Storage Class) O Amazon S3 fornece uma maneira de gerenciar arquivos por meio de configurações de ciclo de vida, onde você pode definir maneiras de mover arquivos para diferentes classes de armazenamento, dependendo da idade do arquivo e da frequência de acesso. Essa estratégia pode economizar muito dinheiro para o seu negócio. Trabalhar com classe de armazenamento nos permite economizar custos. Por padrão, os dados são armazenados na classe de armazenamento S3 Standard. Este tipo de armazenamento tem alguns benefícios de armazenamento e acesso aos dados, mas percebemos que após os dados transformados na camada Silver, os dados na camada Bronze não eram acessados ​​com muita frequência e devido a isso, percebemos que era possível movê-los para uma classe de armazenamento mais barata. Decidimos movê-lo usando as configurações de ciclo de vida para a classe de armazenamento S3 Intelligent Tiering. Essa classe de armazenamento se encaixou perfeitamente em nosso contexto, pois pudemos economizar custos com armazenamento e mesmo no caso de acessar esses arquivos por algum motivo, poderíamos manter um custo viável. Até poderíamos mover estes dados para a classe de armazenamento Glacier, fazendo que o custo fique ainda mais barato, porém, o contexto atual impossibilita, pois como precisamos acessar estes dados de forma regular, a camada Glacier poderia aumentar os custos mas não pelo armazenamento, mas sim pelo acesso, no qual se torna caro dependendo da "profundidade" da camada. Se você verificar a documentação da AWS, notará que existem algumas classes de armazenamento mais baratas, mas você e sua equipe devem analisar cada caso, porque quanto mais barato for armazenar dados, mais caro será acessá-los. Portanto, tenha cuidado, tente entender os padrões de armazenamento e acesso aos dados em sua arquitetura Data Lake antes de escolher uma classe de armazenamento que possa se adequar melhor ao seu contexto. Estratégia #2 Particionamento de Dados O Apache Spark é o framework mais famoso por processar grandes volumes de dados e tem sido adotado por equipes de dados por todo o mundo. Durante as transformações de dados usando o Spark, é possível definir colunas em seu Dataframe nas quais serão utilizadas como partição, oferecendo melhor desempenho ao executar consultas SQL. Observe que a abordagem de particionamento de dados não possui relação direta com o Amazon S3, mas a adoção desta estratégia visa boas práticas ao acesso aos objetos no Amazon S3. Uma das formas de cobrança utilizados no S3 é pelo acesso/leitura dos dados, ou seja, quanto mais dados serem carregados maior a cobrança. Isso é inevitável, porém a abordagem de particionar dados, faz com que as leituras sejam mais controladas, evitando grandes carregamentos de dados desnecessários e consequentemente gerando menor custo. Estratégia #3 Delta Lake vacuum Delta Lake possui um recurso interessante chamado vacuum, que é um mecanismo para remover arquivos do disco sem utilidade, liberando espaço de armazenamento. Normalmente as equipes adotam essa estratégia após a restauração de versões em que alguns arquivos remanescente de versões anteriores e não serão mais gerenciados pelo Delta Lake. Por exemplo, na imagem abaixo temos 5 versões de tabelas Delta e suas partições. Suponha que precisamos restaurar a versão porque encontramos alguns dados inconsistentes após a versão 1. Após este comando, Delta apontará seu gerenciamento para a versão 1 como a versão atual, mas os arquivos de parquet relacionados a outras versões permanecerão lá sem uso. Podemos remover esses parquets executando o comando de vacuum, conforme mostrado abaixo. Observe que os parquets relacionados às versões posteriores à 1 foram removidos, liberando espaço de armazenamento. Para obter mais detalhes, recomendo ver a documentação do Delta Lake. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): AWS Cookbook (Versão Inglês) é um guia prático contendo 70 receitas detalhadas sobre os recursos da AWS e como resolver diferentes desafios. É um livro bem escrito e de fácil entendimento cobrindo os principais serviços da AWS através de exemplos práticos. A AWS ou Amazon Web Services é o serviço de nuvem mais utilizando atualmente em todo o mundo, caso queira entender mais sobre o tema para ficar bem posicionado no mercado, recomendo fortemente o estudo. Bom é isso, espero que tenham gostado!

  • How to save costs on S3 running Data Lake

    Cloud services provides useful resources to scale your business faster but not always we can measure cloud costs when we’re starting a business from the scratch or even being a solid business, costs always makes part of the strategy for any company who want to provide a better service. Me and my teammates have worked in a Data platform based on events enable to process 350 million events every day. We provide data to the client applications and to the businesses teams to make decisions and it always a challenge do deal with the massive data traffic and how we can maintain these data and saving money with storage at the same time. Storage is too expensive and there are some strategies to save money. For this post I’ll describe some strategies that we’ve adopted to save costs on S3 (Simple Storage Service) and I hope we can help it. Strategies Strategy #1 Amazon S3 storage classes Amazon S3 provides a way to manage files through life cycle settings, out there you can set ways to move files to different storage classes depending on the file’s age and access frequency. This strategy can save a lot of money to your company. Working with storage class enable us saving costs. By default, data are stored on S3 Standard storage class. This storage type has some benefits of storage and data access but we realized that after data transformed in the Silver layer, data in the Bronze layer it wasn’t accessed very often and it was totally possible to move them to a cheaper storage class. We decided to move it using life cycle settings to S3 Intelligent Tiering storage class. This storage class it was a perfect fit to our context because we could save costs with storage and even in case to access these files for a reason we could keeping a fair cost. We’re working on for a better scenario which we could set it a life cycle in the Silver layer to move files that hasn’t been accessed for a period to a cheaper storage class but at the moment we need to access historical files with high frequency. If you check AWS documentation you’ll note that there’s some cheapest storage classes but you and your team should to analyse each case because how cheapest is to store data more expensive will be to access them. So, be careful, try to understand the patterns about storage and data access in your Data Lake architecture before choosing a storage class that could fit better to your business. Strategy #2 Partitioning Data Apache Spark is the most famous framework to process a large amount of data and has been adopted by data teams around the world. During the data transformation using Spark you can set it a Dataframe to partition data through a specific column. This approach is too useful to perform SQL queries better. Note that partitioning approach has no relation to S3 directly but the usage avoids full scans in S3 objects. Full scans means that after SQL queries, the SQL engine can load gigabytes even terabytes of data. This could be very expensive to your company, because you can be charged easily depending on amount of loaded data. So, partitioning data has an important role when we need to save costs. Strategy #3 Delta Lake vacuum Delta Lake has an interesting feature called vacuum that’s a mechanism to remove files from the disk with no usage. Usually teams adopt this strategy after restoring versions that some files will be remain and they won’t be managed by Delta Lake. For example, in the image below we have 5 versions of Delta tables and their partitions. Suppose that we need to restore to version because we found some inconsistent data after version 1. After this command, Delta will point his management to version 1 as the current version but the parquet files related to others version will be there with no usage. We can remove these parquets running vacuum command as shown below. Note that parquets related to versions after 1 were removed releasing space in the storage. For more details I strongly recommend seeing Delta Lake documentation. Well that’s it, I hope you enjoyed it!

  • Primeiros passos com DBT - Data Build Tool

    O DBT tem sido utilizado por muitas empresas na área de Dados e acredito que podemos extrair bons insights neste post sobre ele. Esse vai ser um post prático mostrando como o DBT funciona e espero que vocês gostem. O que é DBT? DBT significa Data Build Tool e permite que equipes transformem os dados já carregados em seu warehouse através de operações de DML como um simples Select. DBT representa o T no processo de ELT, ou seja, ele não trabalha para extrair e carregar dados mas sim, para transformá-los. Passo 1: Criando o projeto DBT Agora, assumimos que o DBT já esteja instalado, mas se não estiver, recomendo consultar este link para mais informações. Após a instalado, você pode criar um novo projeto usando CLI ou pode clonar este projeto do repositório DBT no Github. Aqui para este post, vamos usar o modo CLI para criar nosso projeto e também para concluir as próximas etapas. Para criar um novo projeto, execute o comando abaixo no seu terminal. dbt init Depois de executar o comando acima, você precisa digitar o nome do projeto e qual warehouse ou banco de dados você vai usar conforme a imagem abaixo. Para este post, vamos usar o adaptador do postgres. É muito importante que você tenha o banco de dados postgres já instalado ou você pode criar uma imagem postgres usando o docker. Sobre os adaptadores, o DBT suporta vários deles e você pode conferir aqui. Criei uma estrutura de tabela e também carreguei os dados simulando dados de uma plataforma de vídeo chamada wetube e vamos utilizá-los para entender como o DBT funciona. Acompanhe a estrutura: Passo 2: Estrutura e mais sobre DBT Após executar o comando dbt init para criar o projeto, uma estrutura de pastas e arquivos abaixo será criada. Não vou falar sobre todos os diretórios do projeto, mas gostaria de focar em dois deles. Sources Antes de falarmos sobre os dois diretórios, vamos falar sobre os Sources, são basicamente os dados já carregados em seu warehouse. No processo DBT, as fontes têm o mesmo significado de dados brutos. Não há pastas que representem dados Sources para este projeto, mas você precisa saber sobre este termo pois vamos configurar tabelas já criadas como Sources para as próximas seções. Seeds Seeds é um diretório que oferece um mecanismo interessante e útil para carregar dados estáticos em seu warehouse por meio de arquivos CSV. Se você deseja carregar esses dados, você precisa criar um arquivo CSV neste diretório e executar o comando abaixo. dbt seed Para cada campo no arquivo CSV, o DBT irá inferir os tipos e criará tabelas e suas colunas no warehouse ou banco de dados. Models O DBT funciona com o paradigma de Model, a ideia principal é que você pode criar modelos através da transformações utilizando instruções SQL baseadas em fontes de tabelas ou modelos existentes Cada arquivo SQL localizado na pasta de model criará um modelo em seu warehouse ou banco de dados quando o comando abaixo for executado. dbt run Lembre-se que um modelo pode ser criado através de uma fonte ou outro modelo e não se preocupe com isso, vou mostrar mais detalhes sobre isso. Passo 3: Configurando a configuração com o banco de dados Com o projeto já criado, precisamos configurar a conexão com o banco de dados e aqui neste post vamos usar o postgres como banco de dados. Depois de inicializar o projeto, vários arquivos são criados e um deles é chamado de profiles.yml. profiles.yml é o arquivo é responsável por controlar os diferentes perfis/profiles para as diferentes conexões com os bancos de dados, como ambiente de desenvolvimento e produção. Se você notou, não podemos ver este arquivo na imagem acima porque este arquivo é criado fora do projeto para evitar credenciais que sejam confidenciais. Você pode encontrar esse arquivo no diretório ~/.dbt/. Se você observar, temos um perfil chamado dbt_blog e um destino chamado dev, por padrão, o destino refere-se a dev com as configurações de conexão do banco de dados. Além disso, é possível criar um ou mais perfis e alvos(target), permitindo trabalhar com diferentes ambientes. Outro detalhe importante é que o perfil dbt_blog deve ser especificado no arquivo dbt_project.yml como um perfil padrão. Nas próximas seções, discutiremos o que é e como o arquivo dbt_project.yml funciona. Passo 4: Criando o arquivo dbt_project.yml Todo projeto DBT possui um arquivo dbt_project.yml, você pode configurar informações como nome do projeto, diretórios, perfis e tipo de materialização. name: 'dbt_blog' version: '1.0.0' config-version: 2 profile: 'dbt_blog' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_packages" models: dbt_blog: # Config indicated by + and applies to all files under models/example/ mart: +materialized: table Observe que o campo de profile foi configurado como o mesmo profile especificado no arquivo profiles.yml e outro detalhe importante é sobre o campo materialized. Aqui foi configurado como um valor table, mas por padrão, é uma view. O campo materialized permite que você crie modelos como uma tabela ou view em cada execução. Existem outros tipos de materialização, mas não vamos discutir aqui e eu recomendo ver a documentação do dbt. Passo 5: Criando nosso primeiro modelo Criando os primeiros arquivos Vamos mudar um pouco e vamos criar uma subpasta no diretório do model chamada mart e dentro desta pasta vamos criar nossos arquivos .SQL e também outro arquivo importante que ainda não discutimos chamado schema.yml. Criando o arquivo schema Os arquivos de schema são usados ​​para mapear fontes e documentar modelos como o nome do modelo, colunas e muito mais. Agora você pode criar um arquivo chamado schema.yml e preencher com as informações abaixo. version: 2 sources: - name: wetube tables: - name: account - name: city - name: state - name: channel - name: channel_subs - name: video - name: video_like - name: user_address models: - name: number_of_subs_by_channel description: "Number of subscribers by channel" columns: - name: id_channel description: "Channel's ID" tests: - not_null - name: channel description: "Channel's Name" tests: - not_null - name: num_of_subs description: "Number of Subs" tests: - not_null Sources: No campo source você pode incluir tabelas do seu warehouse ou banco de dados que serão utilizadas na criação do modelo. models: No campo models você pode incluir o nome do modelo, colunas e suas descrições Criando um modelo Esta parte é onde podemos criar scripts .SQL que resultarão em nosso primeiro modelo. Para o primeiro modelo, vamos criar uma instrução SQL para representar um modelo que podemos ver os números de inscritos do canal. Vamos criar um arquivo chamado number_of_subs_by_channel.sql e preenchê-lo com os scripts abaixo. with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_channel_subs as ( select * from {{ source('wetube','channel_subs') }} ), number_of_subs_by_channel as ( select source_channel.id_channel, source_channel.name, count(source_channel_subs.id_subscriber) num_subs from source_channel_subs inner join source_channel using (id_channel) group by 1, 2 ) select * from number_of_subs_by_channel Entendendo o modelo Observe que temos vários scripts separados por expressão de tabela comum (CTE) que se torna útil para entender o código. O DBT permite usar o template Jinja {{ }} trazendo uma maior flexibilidade ao nosso código. O uso da palavra-chave source dentro do modelo Jinja significa que estamos nos referindo a tabelas de origem. Para referenciar um modelo, você precisa usar a palavra-chave ref. A última instrução SELECT baseada nas tabelas de origem (source) irá gerar o modelo (model) como tabela no banco de dados. Executando o nosso primeiro modelo Execute o comando abaixo para criar nosso primeiro modelo baseado nos arquivos anteriores. dbt run Saída Criando um novo modelo Imagine que precisamos criar um novo modelo contendo as informações da conta e seus canais. Vamos voltar ao arquivo schema.yml para adicionar esse novo modelo. - name: account_information description: "Model containing account information and it's channels" columns: - name: id_account description: "Account ID" tests: - not_null - name: first_name description: "First name of user's account" tests: - not_null - name: last_name description: "Last name of user's account" tests: - not_null - name: email description: "Account's email" tests: - not_null - name: city_name description: "city's name" tests: - not_null - name: state_name description: "state's name" tests: - not_null - name: id_channel description: "channel's Id" tests: - not_null - name: channel_name description: "channel's name" tests: - not_null - name: channel_creation description: "Date of creation name" tests: - not_null Agora, vamos criar um novo arquivo SQL e nomeá-lo como account_information.sql e adicionar os scripts abaixo: with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_city as ( select * from {{ source('wetube','city') }} ), source_state as ( select * from {{ source('wetube','state') }} ), source_user_address as ( select * from {{ source('wetube','user_address') }} ), source_account as ( select * from {{ source('wetube','account') }} ), account_info as ( select account.id_user as id_account, account.first_name, account.last_name, account.email, city.name as city_name, state.name as state_name, channel.id_channel, channel.name as channel, channel.creation_date as channel_creation FROM source_account account inner join source_channel channel on (channel.id_account = account.id_user) inner join source_user_address user_address using (id_user) inner join source_state state using (id_state) inner join source_city city using (id_city) ) select * from account_info Criando nosso último modelo Para o nosso último modelo, vamos criar um modelo sobre quantas curtidas tem um vídeo. Vamos alterar novamente o schema.yml para descrever e documentar nosso futuro e último modelo. - name: total_likes_by_video description: "Model containing total of likes by video" columns: - name: id_channel description: "Channel's Id" tests: - not_null - name: channel description: "Channel's name" tests: - not_null - name: id_video description: "Video's Id" tests: - not_null - name: title description: "Video's Title" tests: - not_null - name: total_likes description: "Total of likes" tests: - not_null Crie um arquivo chamado total_likes_by_video.sql e coloque o código abaixo: with source_video as ( select * from {{ source('wetube','video') }} ), source_video_like as ( select * from {{ source('wetube','video_like') }} ), source_account_info as ( select * from {{ ref('account_information') }} ), source_total_like_by_video as ( select source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title, count(*) as total_likes FROM source_video_like inner join source_video using (id_video) inner join source_account_info using (id_channel) GROUP BY source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title ORDER BY total_likes DESC ) select * from source_total_like_by_video Executando novamente Após a criação dos arquivos, vamos executar DBT novamente para criar os novos modelos dbt run Saída Os modelos foram criados no banco de dados e você pode executar instruções select diretamente em seu banco de dados para verificá-lo. Perceba que além dos modelos criados, você pode notar as demais tabelas que foram mapeadas no arquivo schema.yml e que já existiam na estrutura do banco inicial. Lembre-se do mecanismo de criar tabelas estáticas através do diretório Seeds, pode ser uma boa escolha para uma carga inicial. Modelo: account_information Modelo: number_of_subs_by_channel Modelo: total_likes_by_video Passo 6: DBT Docs Documentação Depois de gerados nossos modelos, agora vamos gerar documentos com base nestes. O DBT gera uma documentação completa sobre modelos (models), sources e suas colunas, através de uma página da web. Gerando as docs dbt docs generate Disponibilizando as docs no servidor Web Após a geração dos documentos, você pode executar o comando abaixo no seu terminal para iniciar um servidor da Web na porta 8080 e consultar a documentação localmente utilizando o seu navegador. Caso o navegador não abra automaticamente, digite o seguinte endereço localhost:8080 no seu navegador. dbt docs serve Lineage Outro detalhe sobre a documentação é que você pode ver através de um Lineage os modelos e suas dependências. Código no Github Você pode conferir esse código na nossa página do Github. Curtiu? Eu espero que tenha gostado!

  • Understanding Delta Lake Time Travel in 2 minutes

    Delta Lake provides a way to version data for operations like merge, update and delete. This makes transparent how data life cycle inside Delta Lake works it. For each operation a version will be incremented and if you have a table with multiple operations, different versions of table will be created. Delta Lake offers a mechanism to navigate over the different versions called Time Travel. It's a temporary way to access data from the past. For this post we're going to use this feature to see different versions of table. Below we have a Delta Table called people that all versions were generated through write operations using append mode. Current version When we perform a simple read on a table, the current version is always the must recent one. So, for this scenario, the current version is 2 (two). Note that we don't need to specify which version we want to use because we're not using Time Travel yet. session.read().format("delta").load("table/people") .orderBy("id").show(); Nothing changes at the moment, let's keep for the next steps. Working with Time Travel Here begins how we can work with Time Travel, for the next steps, we'll perform readings on the people table specifying different versions to understand how Time travel works. Reading Delta table - Version 0 (zero) Now we're going to work with different versions starting from the 0 (zero) version, let's read the table again but now adding a new parameter, take a look at the code below. session.read().format("delta") .option("versionAsOf", 0) .load("table/people") .orderBy("id").show(); Notice that we added a new parameter called versionAsOf , this parameter allows us to configure the number of version you want to restore temporarily for a table. For this scenario we configure the reading for the Delta Table version zero (0). This was the first version generated by Delta Lake after write operation. Reading Delta table - Version 1 (one) For this last step we're using the version one (1), note that the data from the previous version has been maintained because an append mode was executed. session.read().format("delta") .option("versionAsOf", 1) .load("table/people") .orderBy("id").show(); Delta lake has a lot of benefits and Time travels allows us flexibility in a Big Data architecture, for more details I recommend see the Delta Lake docs . Well that's it, I hope you enjoyed it.

  • Converting Parquet table to Delta Table

    For this post we're going to create examples to how convert parquet table to Delta table. First, we'll create a parquet table from scratch through a Spark Dataframe and then converting to Delta table. Using Delta table has some benefits comparing to a Parquet table. Delta enables to restore versions of your table through time travel function, ACID supports and more. Creating a Parquet table First of all, let's create a parquet table to be converted later to Delta Table. I'll prefer create a parquet table from scratch to bring a better understanding. The following code will be executed once, just to create a parquet table. We're going to use a Spark Dataframe that will be loaded from a JSON file containing semi-structured records. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); Dataset dataFrame = session.read().json("product.json"); dataframe.write().format("parquet").save("table/product"); } The above example, we start creating a SparkSession object to create and manage a Spark Dataframe that was loaded from the product.json file content. Alter load, the Dataframe creates (or write) a table in parquet format in the table/product directory. JSON content File represented by product.json file that contains semi-structured records. {"id":1, "name":"rice", "price":12.0, "qty": 2} {"id":2, "name":"beans", "price":7.50, "qty": 5} {"id":3, "name":"coke", "price":5.50, "qty": 2} {"id":4, "name":"juice", "price":3.80, "qty": 1} {"id":5, "name":"meat", "price":1.50, "qty": 1} {"id":6, "name":"ice-cream", "price":6.0, "qty": 2} {"id":7, "name":"potato", "price":3.70, "qty": 10} {"id":8, "name":"apple", "price":5.60, "qty": 5} After running the code above, parquet files will be generated in the table/product directory containing the files below. Converting Parquet table to Delta Table Now that we have a Parquet table already created, we can convert easily to Delta Table, let's do this. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); DeltaTable.convertToDelta(session, "parquet.`table/product`"); } DeltaTable.convertToDelta method is responsible to convert parquet table to Delta table. Note that we had to use SparkSession as a parameter and also specify the path of parquet table using this format "parquet.``" . The result after execution you can see in the picture below. After conversion running, Delta creates the famous _delta_log directory containing commit info and checkpoint files. Well that's it, I hope you enjoyed it!

  • Primeiros passos utilizando Terraform na AWS

    O Terraform é uma ferramenta do tipo IaC (Infrastructure as code) que possibilita provisionar infra-estrutura nos serviços de nuvem, ou seja, ao invés de criar manualmente recursos na nuvem, o Terraform facilita a criação e o controle deste serviços através de gerenciamento de estado em poucas linhas código. O Terraform tem sua linguagem própria e pode ser utilizada de forma independente com outras linguagens de forma isolada. Para este tutorial, iremos criar um Bucket S3 e uma SQS utilizando Terraform na AWS. Instalação Terraform Para a instalação, faça o download do instalador neste link https://www.terraform.io/downloads.html e escolha o seu sistema operacional. Provider AWS Utilizaremos a AWS como provider, ou seja, quando selecionamos a AWS como provider, o Terraform fará o download dos pacotes que possibilitará a criação de recursos específicos para a AWS. Para seguir nos próximo passos, estamos levando em conta que você já possui: Credenciais da AWS O seu usuário já possui permissões necessárias para criar recursos na AWS Autenticação Como nós estamos utilizando a AWS como provider, precisamos configurar o Terraform para autenticar e em seguida criar os recursos. Existem algumas maneiras de autenticação. Pare este tutorial, escolhi utilizar um dos mecanismos da AWS que permite alocar as credencias em um arquivo na pasta $HOME/.aws e utilizar como fonte de autenticação única. Para criar esta pasta com as credenciais, precisamos instalar o AWS CLI, acesse este link e siga os passos de instalação. Este mecanismo evita a utilização das credenciais diretamente no código, dessa forma, caso precise executar algum comando ou SDK que conecte a AWS localmente, estas credencias serão carregadas a partir deste arquivo. Configuração das credenciais Após instalar o AWS CLI, abra o terminal e execute o comando a seguir: aws configure No próprio terminal, preencha os campos utilizando os dados das credencias do seu usuário: Após o preenchimento, 2 arquivos textos serão criados no diretório $HOME/.aws config: contendo o profile, neste caso o profile default foi criado credentials: contendo as credenciais Vamos alterar os arquivos para adequar a este tutorial, altere o arquivo config conforme abaixo: [profile staging] output = json region = us-east-1 [default] output = json region = us-east-1 No caso, temos 2 perfis configurados, o default e um perfil de staging. Altere o arquivo credentials conforme abaixo, substituindo pelas suas credenciais. [staging] aws_access_key_id = [Access key ID] aws_secret_access_key = [Secret access key] [default] aws_access_key_id = [Access key ID] aws_secret_access_key = [Secret access key] Criando os arquivos Terraform base Após todas estas configurações, iremos começar a trabalhar de fato com o Terraform. Para isso precisamos criar alguns arquivos base que vai nos auxiliar na criação dos recursos na AWS. 1º Passo: No diretório root do seu projeto, crie um pasta chamado terraform/ 2º Passo: Dentro da pasta terraform/, crie os arquivos: main.tf vars.tf 3º Passo: Crie uma pasta chamada staging dentro de terraform/ 4º Passo: Dentro da pasta terraform/staging/ crie o arquivo: vars.tfvars Pronto, agora temos a estrutura de pasta que vamos utilizar nos próximos passos. Configurando os arquivos Terraform Vamos começar pela declaração das variáveis utilizando o arquivo vars.tf. vars.tf Neste arquivo é onde vamos criar a variáveis em que vamos utilizar em nosso contexto, podemos criar variáveis com um valor default ou simplesmente vazias, onde estas serão preenchidas de acordo com o ambiente de execução, onde será explicado mais a frente. variable "region" { default = "us-east-1" type = "string" } variable "environment" { } Criamos 2 variáveis: region: Variável do tipo string e seu valor default é a região da AWS em que vamos criar os recursos environment: Variável que vai representar o ambiente de execução staging/vars.tfvars Neste arquivo estamos definindo o valor da variável environment criada anteriormente sem valor default. environment = "staging" Essa separação é bem útil quando temos mais de um ambiente, caso tivéssemos um ambiente de produção, poderíamos ter criado outro arquivo vars.tfvars em uma pasta chamada production. Dessa forma, podemos escolher em qual ambiente vamos executar o Terraform. Vamos entender esta parte, quando executamos mais a frente. main.tf Este será o principal arquivo onde iremos declarar os recursos para que sejam criados na AWS. Nesta etapa vamos declarar os recursos para que seja criado um Bucket S3 e uma SQS. Vamos entendendo o arquivo em partes. Nesta primeira parte estamos declarando a AWS como provider e setando a região utilizando a variável que criamos anteriormente através de interpolação ${..}. Provider provider "aws" { region = "${var.region}" } Criando o Bucket S3 Para criar um recurso via Terraform, sempre começamos com a palavra chave resource e em seguida o nome do recurso e por fim um identificador. resource "nome do recurso" "identificador" {} Neste trecho estamos criando um Bucket chamado bucket.blog.data, lembre-se que nomes de Buckets devem ser únicos. O campo acl define as restrições do Bucket, neste caso, private. O campo tags é utilizado para passar informações extras ao recurso, neste caso será passando o valor da variável environment. Mais campos são descritos na documentação. resource "aws_s3_bucket" "s3_bucket" { bucket = "bucket.blog.data" acl = "private" tags = { Environment = "${var.environment}" } } Criando a SQS No próximo trecho, vamos criar uma SQS chamada sqs-posts. A criação do recurso segue as mesmas regras que descrevemos anteriormente. Para este cenário configuramos os campos delay_seconds que define o tempo de espera de uma mensagem ser entregue. Mais campos são descritos na documentação. resource "aws_sqs_queue" "sqs-blog" { name = "sqs-posts" delay_seconds = 90 tags = { Environment = "${var.environment}" } } Executando o Terraform 1º Passo : Inicializar o Terraform Dentro do diretório /terraform execute o comando: terraform init Mensagens no console após a execução: 2º Passo: No Terraform existem workspaces. São ambientes de execução em que o Terraform provê para executar os recursos e separar os estados entre eles. Após inicializado, um workspace default é criado. terraform workspace list Para este tutorial vamos simular um ambiente de desenvolvimento. Lembra que criamos uma pasta chamada /staging ? Sim, esta pasta simula um ambiente de desenvolvimento. Para isso, vamos criar um workspace no Terraform chamado staging também. Se tivéssemos um ambiente de produção, um workspace de produção poderia ser criado. terraform workspace new "staging" Pronto, criamos um novo workspace e já estamos utilizando. 3º Passo: Neste passo, vamos listar todos os recursos existentes ou os que serão criados, neste caso, a última opção. terraform plan -var-file=staging/vars.tfvars O argumento plan possibilita visualizar os recursos que serão criados ou atualizados, é uma boa opção para entender o comportamento antes que o recurso seja criado definitivamente. O segundo argumento -var-file possibilita escolher um caminho específico contendo os valores das variáveis que serão utilizadas de acordo com o ambiente de execução. Neste caso o arquivo /staging/vars.tfvars contém valores referentes ao ambiente de staging. Caso existisse um workspace de produção, a execução seria a mesma, porém para uma pasta diferente. Mensagens no console após a execução: 4º Passo: Neste passo, vamos criar os recursos definitivamente. terraform apply -var-file=staging/vars.tfvars Basta substituir o plan por apply, em seguida uma mensagem de confirmação será mostrada no console: Digite yes para aplicar os recursos e aguarde o fim da execução. Pronto, o Bucket S3 e a SQS form criados! Agora você pode conferir direto no console da AWS. Escolha de workspace Caso necessite mudar de workspace, execute o comando selecionando o workspace em que deseja utilizar: terraform workspace select "[workspace]" Destruindo os recursos Esta parte do tutorial requer muita atenção. O próximo comando possibilita remover todos os recursos que foram criados sem a necessidade em remover um por um. terraform destroy -var-file=staging/vars.tfvars Digite yes, caso deseja que todos os recursos criados sejam destruídos. Não recomendo utilizar este comando em um ambiente profissional, mas para este tutorial é útil para que você não esqueça de apagar e a AWS te cobrar no futuro. Conclusão Terraform possibilita criar infra-estruturas de forma bem simples através de código e também oferece bastante segurança mantendo os recursos através do uso de estados. Para este tutorial utilizamos a AWS como provider, mas é possível utilizar Google Cloud, Azure e entre outros. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Terraform: Up & Running: Writing Infrastructure as Code (Versão Inglês) é um livro focado em como utilizar Terraform e seus benefícios. O autor buscou fazer comparações com diversas outras ferramentas IAC (Infrastructure as code) como Ansible e Cloudformation (IAC nativa da AWS) e principalmente de como criar e provisionar diferentes recursos para múltiplos serviços na nuvem. Atualmente, Terraform é a ferramenta mais utilizada em projetos de Software para a criação e gerenciamento de recursos nos serviços de nuvem como AWS, Azure, Google cloud e vários outros. Se você quer ser um engenheiro completo ou atuar na área de Devops, recomendo fortemente aprender sobre o tema. Amazon AWS: Descomplicando a computação na nuvem é um livro para aqueles que estão começando na AWS e querem entender o funcionamento e a dinâmica dos serviços como S3, EC2, ElasticCache, Route 53, SNS, Cloudwatch e muito mais. AWS Cookbook (Versão Inglês) é um guia prático contendo 70 receitas detalhadas sobre os recursos da AWS e como resolver diferentes desafios. É um livro bem escrito e de fácil entendimento cobrindo os principais serviços da AWS através de exemplos práticos. A AWS ou Amazon Web Services é o serviço de nuvem mais utilizando atualmente em todo o mundo, caso queira entender mais sobre o tema para ficar mais bem posicionado no mercado, recomendo fortemente o estudo.

  • Entendendo o AWS SNS - Simple Notification Service

    O SNS (Simple Notification Service), provê um serviço de notificação utilizando o paradigma Pub/Sub. É uma forma de publicar mensagens destinadas a um ou mais inscritos na forma de endpoints. Confuso? Vamos aprofundar um pouco mais sobre o assunto. O termo Pub/Sub é um tema bastante relacionado em arquiteturas guiada a eventos, conhecida tecnicamente como event-driven architecture. Nesta arquitetura a publicação de mensagens podem ser feitas através de notificações para um ou mais destinos já conhecidos, criando uma arquitetura mais assíncrona. Para que um destino se torna conhecido, deve haver uma forma de sinalizar que aquele destino seja um candidato a receber qualquer mensagem da origem, ou seja, o destino é um subscriber (sub) ou inscrito. Mas inscrito aonde? Todo subscriber pode ser inscrito em um ou mais publicadores, no contexto do SNS, seria Tópicos, no qual falaremos mais adiante. Dessa forma, para cada publicação feita, um inscrito naquela publicação, receberá uma mensagem. Um exemplo, é quando recebemos notificações de algum aplicativo instalado no nosso Smartphone via push, ou seja, na instalação daquele aplicativo nos tornamos um inscrito (sub ou assinante), ou seja, para que qualquer publicação feita pelo aplicativo, seremos notificados. Provavelmente este serviço pode utilizar SNS como solução. O exemplo anterior é uma visão de mais alto nível como forma de introdução. O tema é um pouco mais amplo e será abordado a seguir. O SNS é dividido em Tópicos e Assinaturas, ambos trabalham de forma conjunta e oferecem diversos recursos através do próprio console da AWS ou de APIs. 1. Tópicos Os Tópicos são pontos de acesso que funciona como interface entre o Publisher (publicador) e o Subscriber (inscrito). Todo aplicativo deve estar inscrito a um Tópico para que receba notificações, ou seja, é o único ponto de acesso para a comunicação. Um Tópico é dividido entre o tipo Fifo e o Padrão: Fifo: O tipo Fifo permite um controle mais rigoroso de ordenação das mensagens (first in/first out), possui um limite de throughput de até 300 publicações por segundo, garante a entrega da mensagem uma única vez e por fim, fornece suporte somente ao protocolo de assinatura SQS. Padrão: O tipo padrão possui algumas diferenças que o torna mais flexível, porém menos rigoroso se comparado ao Fifo. Começando pela ordenação de mensagens. Este padrão visa uma ordenação de mensagens da maneira mais apropriada, ou seja, não possui uma regra que visa ordenar as mensagens por chegada. O throughput de publicações/segundo é maior que a do tipo Fifo e fornece suporte de protocolos de assinaturas para SQS, Lambda, HTTP, SMS, E-mail e endpoints de aplicativos móveis. Limite de tópicos Por conta da AWS, é permitido criar até 100.000 tópicos 2. Assinaturas A Assinatura é a forma de conectar ou inscrever um endpoint para um Tópico específico. Ou seja, cada Assinatura deve-se especificar um Tópico (existente) e o endpoint em que deseja receber as notificações publicadas pelo Tópico que será assinado. O endpoint é representado por diferentes tipos: AWS SQS HTTP HTTPS AWS Kinesis Data Firehose E-mail SMS AWS Lambda Resumindo, cada endpoint acima, são formatos de entrega/transporte para recebimento de notificações. Limite de Assinaturas A AWS permite até 10 milhões de assinaturas por tópico. 3. Limite de tamanho da mensagem O SNS possui um limite de tamanho de mensagem de até 256 KB. Já as mensagens para SMS são de 140 bytes. 4. Tipos de mensagens O SNS possui suporte para diferentes tipos de mensagens, como por exemplo texto, XML, JSON e texto sem formato. 5. SNS X SQS O SNS e o SQS são coisas diferentes, mas que possuem relação. Como falamos anteriormente, o SQS pode ser utlizado como endpoint, ou seja, um protocolo SQS que assina um Tópico SNS passa a receber qualquer mensagem publicada no Tópico tornando um processo de integração assíncrona. Na imagem acima descreve o contexto do SNS junto aos Tópicos e algumas SQS (subscribers) simulando as assinaturas. Após assinadas, todas estas SQS receberão mensagens publicadas do(s) Tópico(s). A SQS 1 receberá notificações dos Tópicos 1 e 2, a SQS 2 receberá notificações dos Tópicos 2 e 3 e por fim, a SQS 3 receberá somente do Tópico 3. Em breve será liberado um post com alguns exemplos de códigos que te ajudará entender ainda mais sobre o SNS mais a fundo. Caso queira saber mais detalhes, recomendo ler a documentação oficial através deste link. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Amazon AWS: Descomplicando a computação na nuvem é um livro para aqueles que estão começando na AWS e querem entender o funcionamento e a dinâmicas dos serviços como S3, EC2, ElasticCache, Route 53, SNS, Cloudwatch e muito mais. AWS Cookbook (Versão Inglês) é um guia prático contendo 70 receitas detalhadas sobre os recursos da AWS e como resolver diferentes desafios. É um livro bem escrito e de fácil entendimento cobrindo os principais serviços da AWS através de exemplos práticos. A AWS ou Amazon Web Services é o serviço de nuvem mais utilizando atualmente em todo o mundo, caso queira entender mais sobre o tema para ficar bem posicionado no mercado, recomendo fortemente o estudo. É isso, curtiu? Até mais!

  • First steps with DBT - Data Build Tool

    DBT has been used by a lot of companies on Data area and I believe that we can extract good insights in this post about it. That's going to be a practical post showing how DBT works it and hope you guys enjoy it. What's DBT? DBT means Data Build Tool and enables teams to transform data already loaded in their warehouse with simple select statements. DBT does the T in ELT processes, in the other words, he doesn't work to extract and load data but he's useful to transform it. Step 1: Creating a DBT Project Now, we're assume that DBT is already installed but if not, I recommend see this link. After DBT installed you can create a new project using CLI or you can clone this project from the DBT Github repository. Here for this post we're going to use CLI mode to create our project and also to complete the next steps. To create a new project, run the command below. dbt init After running this command, you need to type the project's name and which warehouse or database you're going to use like the image below. For this post, we're going to use postgres adapter. It's very important that you have a postgres database already installed or you can up a postgres image using docker. About adapters, DBT supports different of them and you can check here. I created a table structure and also loaded it with data simulating data from a video platform called wetube and we're going to use them to understand how DBT works it. Follow the structure: Step 2: Structure and more about DBT After running dbt init command to create the project, a structure of folders and files below will be created. I won't talk about the whole directories of project but I'd like to focus in two of them. Sources Sources are basically the data already loaded into your warehouse. In DBT process, sources have the same meaning of raw data. There's no folders representing source data for this project but you need to know about this term because we're going to set up tables already created as sources for the next sections. Seeds Seeds is an interesting and useful mechanism to load static data into your warehouse through CSV files. If you want to load these data you need to create a CSV file on this directory and run the command below. dbt seed For each field on CSV file, DBT will infer their types and create a table into warehouse or database. Models DBT works with Model paradigm, the main idea is that you can create models through the transformation using SQL statements based on tables sources or existing models Every SQL file located in your model folder will create a model into your warehouse when the command below runs. dbt run Remember that a model can be created through a source or another model and don't worry about this, I'll show you more details about it. Step 3: Setting up database connection After project already created, we need to set up our database's connection and here at this post, we're going to use postgres as database. After initialize the project a bunch of files are created and one of them is called profiles.yml. profiles.yml file is responsible to control the different profiles to the different database's connection like dev and production environment. If you've noticed, we can't see this file on the image above because this file is created outside of project to avoid sensitive credentials. You can find this file in ~/.dbt/ directory. If you note, we have one profile named dbt_blog and a target called dev, by default the target refer to dev with the database's connection settings. Also, It's possible to create one or more profiles and targets, it enables working with different environments. Another important detail is that dbt_blog profile should be specified on dbt_project.yml file as a default profile. For the next sections, we'll discuss what and how dbt_project.yml file works it. Step 4: Creating dbt_project.yml file Every DBT project has a dbt_project.yml file, you can set up informations like project name, directories, profiles and materialization type. name: 'dbt_blog' version: '1.0.0' config-version: 2 profile: 'dbt_blog' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_packages" models: dbt_blog: # Config indicated by + and applies to all files under models/example/ mart: +materialized: table Note that profile field was set up as the same profile specified on profiles.yml file and another important detail is about materialized field. Here was set up as a "table" value but by default, is a "view". Materialized fields allows you to create models as a table or view on each run. There are others type of materialization but we won't discuss here and I recommend see dbt docs. Step 5: Creating our first model Creating first files Let's change a little and let's going to create a sub-folder on model directory called mart and inside this folder we're going to create our .SQL files and also another important file that we don't discuss yet called schema.yml. Creating schema file Schema files are used to map sources and to document models like model's name, columns and more. Now you can create a file called schema.yml e fill up with these informations below. version: 2 sources: - name: wetube tables: - name: account - name: city - name: state - name: channel - name: channel_subs - name: video - name: video_like - name: user_address models: - name: number_of_subs_by_channel description: "Number of subscribers by channel" columns: - name: id_channel description: "Channel's ID" tests: - not_null - name: channel description: "Channel's Name" tests: - not_null - name: num_of_subs description: "Number of Subs" tests: - not_null Sources: At sources field you can include tables from your warehouse or database that's going to be used on model creation. models: At models field you can include the name's model, columns and their description Creating a model This part is where we can create SQL scripts that's going to result in our first model. For the first model, we're going to create a SQL statement to represent a model that we can see the numbers of subscribers by channel. Let's create a file called number_of_subs_by_channel.sql and fill up with these scripts below. with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_channel_subs as ( select * from {{ source('wetube','channel_subs') }} ), number_of_subs_by_channel as ( select source_channel.id_channel, source_channel.name, count(source_channel_subs.id_subscriber) num_subs from source_channel_subs inner join source_channel using (id_channel) group by 1, 2 ) select * from number_of_subs_by_channel Understanding model creation Note that we have multiple scripts separated by common table expression (CTE) that becomes useful to understand the code. DBT enables using Jinja template {{ }} bringing a better flexibility to our code. The usage of keyword source inside Jinja template means that we're referring source tables. To refer a model you need to use ref keyword. The last SELECT statement based on source tables generates the model that will be created as table in the database. Running our first model Run the command below to create our first model dbt run Output: Creating another model Imagine that we need to create a model containing account information and it's channels. Let's get back to schema.yml file to describe this new model. - name: account_information description: "Model containing account information and it's channels" columns: - name: id_account description: "Account ID" tests: - not_null - name: first_name description: "First name of user's account" tests: - not_null - name: last_name description: "Last name of user's account" tests: - not_null - name: email description: "Account's email" tests: - not_null - name: city_name description: "city's name" tests: - not_null - name: state_name description: "state's name" tests: - not_null - name: id_channel description: "channel's Id" tests: - not_null - name: channel_name description: "channel's name" tests: - not_null - name: channel_creation description: "Date of creation name" tests: - not_null Now, let's create a new SQL file and name it as account_information.sql and put scripts below: with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_city as ( select * from {{ source('wetube','city') }} ), source_state as ( select * from {{ source('wetube','state') }} ), source_user_address as ( select * from {{ source('wetube','user_address') }} ), source_account as ( select * from {{ source('wetube','account') }} ), account_info as ( select account.id_user as id_account, account.first_name, account.last_name, account.email, city.name as city_name, state.name as state_name, channel.id_channel, channel.name as channel, channel.creation_date as channel_creation FROM source_account account inner join source_channel channel on (channel.id_account = account.id_user) inner join source_user_address user_address using (id_user) inner join source_state state using (id_state) inner join source_city city using (id_city) ) select * from account_info Creating our last model For our last model, we going to create a model about how many likes has a video. Let's change again the schema.yml to describe and to document our future and last model. - name: total_likes_by_video description: "Model containing total of likes by video" columns: - name: id_channel description: "Channel's Id" tests: - not_null - name: channel description: "Channel's name" tests: - not_null - name: id_video description: "Video's Id" tests: - not_null - name: title description: "Video's Title" tests: - not_null - name: total_likes description: "Total of likes" tests: - not_null Name it a file called total_likes_by_video.sql and put the code below: with source_video as ( select * from {{ source('wetube','video') }} ), source_video_like as ( select * from {{ source('wetube','video_like') }} ), source_account_info as ( select * from {{ ref('account_information') }} ), source_total_like_by_video as ( select source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title, count(*) as total_likes FROM source_video_like inner join source_video using (id_video) inner join source_account_info using (id_channel) GROUP BY source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title ORDER BY total_likes DESC ) select * from source_total_like_by_video Running DBT again After creation of our files, let's run them again to create the models dbt run Output The models were created in the database and you can run select statements directly in your database to check it. Model: account_information Model: number_of_subs_by_channel Model: total_likes_by_video Step 6: DBT Docs Documentation After generated our models, now we're going to generate docs based on these models. DBT generates a complete documentation about models and sources and their columns and also you can see through a web page. Generating docs dbt docs generate Running docs on webserver After docs generated you can run command below to start a webserver on port 8080 and see the documentation locally. dbt docs serve Lineage Another detail about documentation is that you can see through of a Lineage the models and it's dependencies. Github code You can checkout this code through our Github page. Cool? I hope you guys enjoyed it!

  • Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes

    There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent. In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode: PERMISSIVE DROPMALFORMED FAILFAST CSV file content This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema. "type","country","city","engines","first_flight","number_built" "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A220","Canada","Calgary","two",2013-03-02,179 "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A320","France","Lyon","two",1986-06-10,10066 "Airbus A330","France","Lyon","two",1992-01-02,1521 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York",2,1967-08-03,10636 "Airbus A220","Canada","Calgary",2,2013-03-02,179 Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate. Creating a Dataframe using PERMISSIVE mode The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of PERMISSIVE mode Creating a Dataframe using DROPMALFORMED mode The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of DROPMALFORMED mode After execution it's possible to realize that the corrupted records aren't available at Dataframe. Creating a Dataframe using FAILFAST mode Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "FAILFAST") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Books to study and read If you want to learn more about and reach a high level of knowledge, I strongly recommend reading the following book(s): Spark: The Definitive Guide: Big Data Processing Made Simple is a complete reference for those who want to learn Spark and about the main Spark's feature. Reading this book you will understand about DataFrames, Spark SQL through practical examples. The author dives into Spark low-level APIs, RDDs and also about how Spark runs on a cluster and how to debug and monitor Spark clusters applications. The practical examples are in Scala and Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library with the new version of Spark, this book explores the main Spark's features like Dataframes usage, Spark SQL that you can uses SQL to manipulate data and Structured Streaming to process data in real time. This book contains practical examples and code snippets to facilitate the reading. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark is a book that explores best practices using Spark and Scala language to handle large-scale data applications, techniques for getting the most out of standard RDD transformations, how Spark SQL's new interfaces improve performance over SQL's RDD data structure, examples of Spark MLlib and Spark ML machine learning libraries usage and more. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming covers the basic concepts of Python through interactive examples and best practices. Learning Scala: Practical Functional Programming for the Jvm is an excellent book that covers Scala through examples and exercises. Reading this bool you will learn about the core data types, literals, values and variables. Building classes that compose one or more traits for full reusability, create new functionality by mixing them in at instantiation and more. Scala is one the main languages in Big Data projects around the world with a huge usage in big tech companies like Twitter and also the Spark's core language. Cool? I hope you enjoyed it!

bottom of page