Search
115 resultados encontrados com uma busca vazia
- Java: Streams API - Sorted
Desde o Java 8 lançado em 2014 foram adicionados dezenas de novas funcionalidades dentre melhorias na JVM e funções para facilitar a vida do desenvolvedor, pois ele merece. Dentre estas features, estão as Expression Lambda (EL) que foi o ponta pé inicial para a entrada do Java no mundo da programação funcional, melhoria na API de Data e a não necessidade de criar implementações de Interfaces já existentes com a utilização de Default methods. E a outra novidade é a API de Streams, o foco desse post. A Stream API é uma nova abordagem para se trabalhar com Collections deixando o código menos verboso e mais inteligente. A Stream API trabalha com o processamento de dados sob demanda e fornece dezenas de funcionalidades para manipular Collections diminuindo o código e simplificando o desenvolvimento em uma espécie de pipeline que será explicado mais a frente. Vamos criar um Classe representando a entidade Cidade no qual terá como atributos: nome, estado e população. E por fim um método chamado listaCidades que carrega uma lista de objetos do tipo Cidade. public class Cidade { String nome; String estado; long populacao; public Cidade(){} public Cidade(String nome, String estado, long populacao){ this.nome = nome; this.estado = estado; this.populacao = populacao; } public List listaCidades(){ List cidades = new ArrayList(); cidades.add(new Cidade("Hollywood", "CA", 30L)); cidades.add(new Cidade("Venice", "CA", 10L)); cidades.add(new Cidade("Houston", "TX", 14L)); cidades.add(new Cidade("New York", "NY", 21L)); cidades.add(new Cidade("Albany", "NY", 11L)); cidades.add(new Cidade("Rio de Janeiro", "RJ", 14L)); cidades.add(new Cidade("São Paulo", "SP", 90L)); return cidades; } @Override public String toString() { return "Cidade: " + nome + " /Estado: " + estado + " /População: " + populacao; } } Sorted O método Sorted() possibilita a ordenação de uma Collection através da passagem do campo em que deseja ser ordenado. cidadesComPopulacaoMaiorQue20 = cidade.listaCidades() .stream() .sorted(Comparator.comparing(c -> c.nome)) .collect(Collectors.toList()); cidadesComPopulacaoMaiorQue20.forEach( c -> System.out.println(c) ); No método sorted() é utilizado o método comparing da interface Comparator que aceita uma função como parâmetro e em seguida extrai retornando uma chave de ordenação. No exemplo anterior a ordenação é feita pelo campo Cidade.nome em ordem alfabética. Saída: Cidade: Albany /Estado: NY /População: 11 Cidade: Hollywood /Estado: CA /População: 30 Cidade: Houston /Estado: TX /População: 14 Cidade: New York /Estado: NY /População: 21 Cidade: Rio de Janeiro /Estado: RJ /População: 14 Cidade: São Paulo /Estado: SP /População: 90 Cidade: Venice /Estado: CA /População: 10 Como funciona a pipeline? Seguindo o exemplo anterior, a pipeline é um processo sequencial que se diferencia entre operações intermediárias e finais. No exemplo, a Stream é invocada a partir de uma fonte de dados (lista de objetos do tipo Cidade) que trabalha sob demanda, o método sorted é uma operação intermediária, ou seja, ela processa os dados até que o método collect é invocado, originando uma operação final. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Use a cabeça: Java é um clássico onde qualquer iniciante no assunto deve ler e estudar. Neste livro você aprenderá sobre os principais pilares de Java como, orientação a objetos, criação de classes, interfaces e seu funcionamento, controlar exceções, objetos genéricos e muito mais. Se quer entrar de cabeça neste mundo, é uma excelente opção! Padrões de Projeto é um livro no qual recomendamos após alcançar uma certa maturidade em programação em geral. Escrito por grandes nomes do assunto e traduzido para o português, Padrões de Projeto ou Design patterns como também são conhecidos, são padrões e estratégias que te ajudará a resolver diferentes desafios arquiteturais, aplicando soluções de forma elegante. São estratégias em que desenvolvedores de todo o mundo tem adotado em seus projetos. Imagina se você precisa atuar em um projeto internacional, caso este projetos tem adotado estratégias de Padrões de projeto, você não terá nenhuma grande dificuldade em entender a dinâmica e a arquitetura do código ou caso contrário, você mesmo poderá aplicar os conhecimentos adquiridos através deste livro. O mais interessante que Padrões de Projeto independem de linguagem de programação, é possível aplicar em qualquer outra linguagem, ou seja, é um conteúdo 100% reutilizável. Java Efetivo escrito por Joshua Bloch e traduzido para o português, é um livro excelente para aqueles que possuem um pouco mais de experiência com Java mas que precisam aprofundar em detalhes mais específicos da linguagem como, quando e como usar específicos Padrões de Projeto ou Design Patterns, utilização de Lambdas e Streams, concorrências e serializações. Para aqueles que possuem mais de experiência, é um excelente material de consulta. E aí, curtiu? Até mais!
- Convertendo tabela Parquet para Delta Table
Para este post, vamos criar exemplos de como converter tabelas parquet em Delta table. Primeiro, criaremos uma tabela parquet do zero por meio de um Spark Dataframe e depois a converteremos em um Delta Table. O uso de Delta Table fornece benefícios em comparação com uma tabela em Parquet. O Delta permite restaurar versões de sua tabela por meio da função Time Travel, suporta transações ACID e várias outras features interessantes. Criando Tabela Parquet Em primeiro lugar, vamos criar uma tabela parquet para ser convertida posteriormente em uma Delta Table. Vamos criar uma tabela Parquet do início para um melhor entendimento. O código Java e Spark a seguir, será executado uma vez, apenas para criar uma tabela parquet. Vamos usar um Spark Dataframe que será carregado de um arquivo JSON contendo registros semi-estruturados. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); Dataset dataFrame = session.read().json("product.json"); dataframe.write().format("parquet").save("table/product"); } No exemplo acima, começamos a criar um objeto SparkSession para criar e gerenciar um Spark Dataframe que foi carregado a partir do conteúdo do arquivo product.json. Após a carga, o Dataframe cria (ou escreve) uma tabela em formato parquet no diretório table/product. Conteúdo JSON Conteúdo representado pelo arquivo product.json que contém registros semi-estruturados. {"id":1, "name":"rice", "price":12.0, "qty": 2} {"id":2, "name":"beans", "price":7.50, "qty": 5} {"id":3, "name":"coke", "price":5.50, "qty": 2} {"id":4, "name":"juice", "price":3.80, "qty": 1} {"id":5, "name":"meat", "price":1.50, "qty": 1} {"id":6, "name":"ice-cream", "price":6.0, "qty": 2} {"id":7, "name":"potato", "price":3.70, "qty": 10} {"id":8, "name":"apple", "price":5.60, "qty": 5} Após executar o código acima, os arquivos parquets serão gerados no diretório table/product contendo o arquivo abaixo. Convertendo tabela Parquet em Delta Table Agora que temos uma tabela Parquet já criada, podemos converter facilmente para Delta Table, veja o código a seguir. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); DeltaTable.convertToDelta(session, "parquet.`table/product`"); } O método DeltaTable.convertToDelta é responsável por converter a tabela parquet em Delta Table. Observe que tivemos que usar SparkSession como parâmetro e também especificar o caminho da tabela parquet usando este formato"parquet.``" . O resultado após a execução você pode ver na imagem abaixo. Após a execução da conversão, Delta cria o famoso diretório _delta_log contendo informações de commit e arquivos de checkpoint. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Spark: The Definitive Guide: Big Data Processing Made Simple (Versão Inglês) é uma referência completa para quem quer aprender o Spark e sobre as suas principais funcionalidades. Lendo esse livro, você vai aprender sobre DataFrames, Spark SQL através de exemplos práticos. O autor mergulha nas APIs de baixo nível do Spark, RDDs e também sobre como o Spark é executado em um cluster e como depurar e monitorar os aplicativos de clusters do Spark. Os exemplos práticos estão em Scala e Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library (Versão Inglês) com a nova versão do Spark, este livro explora os principais recursos do Spark, como o uso de Dataframes, Spark SQL no qual você pode usar SQL para manipular dados e Structured Streaming para processar dados em tempo real. Este livro contém exemplos práticos e trechos de código para facilitar a leitura. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (Versão Inglês) é um livro que explora as melhores práticas usando a linguagem Spark e Scala para lidar com aplicações de dados em larga escala, técnicas para transformações utilizando RDD, e também mostra como as novas interfaces do Spark SQL melhoram o desempenho sobre a estrutura de dados RDD do SQL, exemplos de Spark MLlib e o uso de bibliotecas de aprendizado de máquina de ML e muito mais. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming (Versão Inglês) abrange os conceitos básicos do Python por meio de exemplos interativos e práticas recomendadas. Learning Scala: Practical Functional Programming for the Jvm (Versão Inglês) é um excelente livro que aborda a linguagem Scala através de exemplos e exercícios práticos. Lendo este livro, você aprenderá sobre os principais tipos de dados, literais, valores e variáveis. Construir classes que compõem uma ou mais características para total reutilização, criar novas funcionalidades misturando-as na instanciação e muito mais. Scala é uma das principais linguagens em projetos de Big Data em todo o mundo, com grande uso em grandes empresas de tecnologia como o Twitter e também a linguagem principal do Spark. Bom é isso, espero que tenham gostado!
- Como economizar custos no S3 executando um Data Lake
Os serviços em nuvem ou Cloud Computing fornecem recursos úteis para escalar seu negócio mais rapidamente, mas nem sempre podemos medir os custos da nuvem quando estamos começando um negócio do zero ou mesmo sendo um negócio sólido. Custos sempre fazem parte da estratégia de qualquer empresa que deseja fornecer um melhor serviço Eu e meus colegas temos trabalhado em uma arquitetura de dados baseada em eventos que permite processar cerca de 350 milhões de eventos diariamente. Fornecemos dados para os consumidores finais e para as equipes estratégicas para tomadas de decisões. É sempre um grande desafio lidar com o tráfego massivo dos dados (Big Data), armazenamento e ao mesmo tempo, pensar em economia de custos com Cloud. O armazenamento é muito caro e existem algumas estratégias para economizar dinheiro. Neste post irei descrever algumas estratégias que temos adotado para economizar custos de S3 (Simple Storage Service) e espero contribuir através destas experiências. Estratégias Estratégia #1 Classe de armazenamento S3 (Amazon S3 Storage Class) O Amazon S3 fornece uma maneira de gerenciar arquivos por meio de configurações de ciclo de vida, onde você pode definir maneiras de mover arquivos para diferentes classes de armazenamento, dependendo da idade do arquivo e da frequência de acesso. Essa estratégia pode economizar muito dinheiro para o seu negócio. Trabalhar com classe de armazenamento nos permite economizar custos. Por padrão, os dados são armazenados na classe de armazenamento S3 Standard. Este tipo de armazenamento tem alguns benefícios de armazenamento e acesso aos dados, mas percebemos que após os dados transformados na camada Silver, os dados na camada Bronze não eram acessados com muita frequência e devido a isso, percebemos que era possível movê-los para uma classe de armazenamento mais barata. Decidimos movê-lo usando as configurações de ciclo de vida para a classe de armazenamento S3 Intelligent Tiering. Essa classe de armazenamento se encaixou perfeitamente em nosso contexto, pois pudemos economizar custos com armazenamento e mesmo no caso de acessar esses arquivos por algum motivo, poderíamos manter um custo viável. Até poderíamos mover estes dados para a classe de armazenamento Glacier, fazendo que o custo fique ainda mais barato, porém, o contexto atual impossibilita, pois como precisamos acessar estes dados de forma regular, a camada Glacier poderia aumentar os custos mas não pelo armazenamento, mas sim pelo acesso, no qual se torna caro dependendo da "profundidade" da camada. Se você verificar a documentação da AWS, notará que existem algumas classes de armazenamento mais baratas, mas você e sua equipe devem analisar cada caso, porque quanto mais barato for armazenar dados, mais caro será acessá-los. Portanto, tenha cuidado, tente entender os padrões de armazenamento e acesso aos dados em sua arquitetura Data Lake antes de escolher uma classe de armazenamento que possa se adequar melhor ao seu contexto. Estratégia #2 Particionamento de Dados O Apache Spark é o framework mais famoso por processar grandes volumes de dados e tem sido adotado por equipes de dados por todo o mundo. Durante as transformações de dados usando o Spark, é possível definir colunas em seu Dataframe nas quais serão utilizadas como partição, oferecendo melhor desempenho ao executar consultas SQL. Observe que a abordagem de particionamento de dados não possui relação direta com o Amazon S3, mas a adoção desta estratégia visa boas práticas ao acesso aos objetos no Amazon S3. Uma das formas de cobrança utilizados no S3 é pelo acesso/leitura dos dados, ou seja, quanto mais dados serem carregados maior a cobrança. Isso é inevitável, porém a abordagem de particionar dados, faz com que as leituras sejam mais controladas, evitando grandes carregamentos de dados desnecessários e consequentemente gerando menor custo. Estratégia #3 Delta Lake vacuum Delta Lake possui um recurso interessante chamado vacuum, que é um mecanismo para remover arquivos do disco sem utilidade, liberando espaço de armazenamento. Normalmente as equipes adotam essa estratégia após a restauração de versões em que alguns arquivos remanescente de versões anteriores e não serão mais gerenciados pelo Delta Lake. Por exemplo, na imagem abaixo temos 5 versões de tabelas Delta e suas partições. Suponha que precisamos restaurar a versão porque encontramos alguns dados inconsistentes após a versão 1. Após este comando, Delta apontará seu gerenciamento para a versão 1 como a versão atual, mas os arquivos de parquet relacionados a outras versões permanecerão lá sem uso. Podemos remover esses parquets executando o comando de vacuum, conforme mostrado abaixo. Observe que os parquets relacionados às versões posteriores à 1 foram removidos, liberando espaço de armazenamento. Para obter mais detalhes, recomendo ver a documentação do Delta Lake. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): AWS Cookbook (Versão Inglês) é um guia prático contendo 70 receitas detalhadas sobre os recursos da AWS e como resolver diferentes desafios. É um livro bem escrito e de fácil entendimento cobrindo os principais serviços da AWS através de exemplos práticos. A AWS ou Amazon Web Services é o serviço de nuvem mais utilizando atualmente em todo o mundo, caso queira entender mais sobre o tema para ficar bem posicionado no mercado, recomendo fortemente o estudo. Bom é isso, espero que tenham gostado!
- How to save costs on S3 running Data Lake
Cloud services provides useful resources to scale your business faster but not always we can measure cloud costs when we’re starting a business from the scratch or even being a solid business, costs always makes part of the strategy for any company who want to provide a better service. Me and my teammates have worked in a Data platform based on events enable to process 350 million events every day. We provide data to the client applications and to the businesses teams to make decisions and it always a challenge do deal with the massive data traffic and how we can maintain these data and saving money with storage at the same time. Storage is too expensive and there are some strategies to save money. For this post I’ll describe some strategies that we’ve adopted to save costs on S3 (Simple Storage Service) and I hope we can help it. Strategies Strategy #1 Amazon S3 storage classes Amazon S3 provides a way to manage files through life cycle settings, out there you can set ways to move files to different storage classes depending on the file’s age and access frequency. This strategy can save a lot of money to your company. Working with storage class enable us saving costs. By default, data are stored on S3 Standard storage class. This storage type has some benefits of storage and data access but we realized that after data transformed in the Silver layer, data in the Bronze layer it wasn’t accessed very often and it was totally possible to move them to a cheaper storage class. We decided to move it using life cycle settings to S3 Intelligent Tiering storage class. This storage class it was a perfect fit to our context because we could save costs with storage and even in case to access these files for a reason we could keeping a fair cost. We’re working on for a better scenario which we could set it a life cycle in the Silver layer to move files that hasn’t been accessed for a period to a cheaper storage class but at the moment we need to access historical files with high frequency. If you check AWS documentation you’ll note that there’s some cheapest storage classes but you and your team should to analyse each case because how cheapest is to store data more expensive will be to access them. So, be careful, try to understand the patterns about storage and data access in your Data Lake architecture before choosing a storage class that could fit better to your business. Strategy #2 Partitioning Data Apache Spark is the most famous framework to process a large amount of data and has been adopted by data teams around the world. During the data transformation using Spark you can set it a Dataframe to partition data through a specific column. This approach is too useful to perform SQL queries better. Note that partitioning approach has no relation to S3 directly but the usage avoids full scans in S3 objects. Full scans means that after SQL queries, the SQL engine can load gigabytes even terabytes of data. This could be very expensive to your company, because you can be charged easily depending on amount of loaded data. So, partitioning data has an important role when we need to save costs. Strategy #3 Delta Lake vacuum Delta Lake has an interesting feature called vacuum that’s a mechanism to remove files from the disk with no usage. Usually teams adopt this strategy after restoring versions that some files will be remain and they won’t be managed by Delta Lake. For example, in the image below we have 5 versions of Delta tables and their partitions. Suppose that we need to restore to version because we found some inconsistent data after version 1. After this command, Delta will point his management to version 1 as the current version but the parquet files related to others version will be there with no usage. We can remove these parquets running vacuum command as shown below. Note that parquets related to versions after 1 were removed releasing space in the storage. For more details I strongly recommend seeing Delta Lake documentation. Well that’s it, I hope you enjoyed it!
- Understanding Delta Lake Time Travel in 2 minutes
Delta Lake provides a way to version data for operations like merge, update and delete. This makes transparent how data life cycle inside Delta Lake works it. For each operation a version will be incremented and if you have a table with multiple operations, different versions of table will be created. Delta Lake offers a mechanism to navigate over the different versions called Time Travel. It's a temporary way to access data from the past. For this post we're going to use this feature to see different versions of table. Below we have a Delta Table called people that all versions were generated through write operations using append mode. Current version When we perform a simple read on a table, the current version is always the must recent one. So, for this scenario, the current version is 2 (two). Note that we don't need to specify which version we want to use because we're not using Time Travel yet. session.read().format("delta").load("table/people") .orderBy("id").show(); Nothing changes at the moment, let's keep for the next steps. Working with Time Travel Here begins how we can work with Time Travel, for the next steps, we'll perform readings on the people table specifying different versions to understand how Time travel works. Reading Delta table - Version 0 (zero) Now we're going to work with different versions starting from the 0 (zero) version, let's read the table again but now adding a new parameter, take a look at the code below. session.read().format("delta") .option("versionAsOf", 0) .load("table/people") .orderBy("id").show(); Notice that we added a new parameter called versionAsOf , this parameter allows us to configure the number of version you want to restore temporarily for a table. For this scenario we configure the reading for the Delta Table version zero (0). This was the first version generated by Delta Lake after write operation. Reading Delta table - Version 1 (one) For this last step we're using the version one (1), note that the data from the previous version has been maintained because an append mode was executed. session.read().format("delta") .option("versionAsOf", 1) .load("table/people") .orderBy("id").show(); Delta lake has a lot of benefits and Time travels allows us flexibility in a Big Data architecture, for more details I recommend see the Delta Lake docs . Well that's it, I hope you enjoyed it.
- Converting Parquet table to Delta Table
For this post we're going to create examples to how convert parquet table to Delta table. First, we'll create a parquet table from scratch through a Spark Dataframe and then converting to Delta table. Using Delta table has some benefits comparing to a Parquet table. Delta enables to restore versions of your table through time travel function, ACID supports and more. Creating a Parquet table First of all, let's create a parquet table to be converted later to Delta Table. I'll prefer create a parquet table from scratch to bring a better understanding. The following code will be executed once, just to create a parquet table. We're going to use a Spark Dataframe that will be loaded from a JSON file containing semi-structured records. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); Dataset dataFrame = session.read().json("product.json"); dataframe.write().format("parquet").save("table/product"); } The above example, we start creating a SparkSession object to create and manage a Spark Dataframe that was loaded from the product.json file content. Alter load, the Dataframe creates (or write) a table in parquet format in the table/product directory. JSON content File represented by product.json file that contains semi-structured records. {"id":1, "name":"rice", "price":12.0, "qty": 2} {"id":2, "name":"beans", "price":7.50, "qty": 5} {"id":3, "name":"coke", "price":5.50, "qty": 2} {"id":4, "name":"juice", "price":3.80, "qty": 1} {"id":5, "name":"meat", "price":1.50, "qty": 1} {"id":6, "name":"ice-cream", "price":6.0, "qty": 2} {"id":7, "name":"potato", "price":3.70, "qty": 10} {"id":8, "name":"apple", "price":5.60, "qty": 5} After running the code above, parquet files will be generated in the table/product directory containing the files below. Converting Parquet table to Delta Table Now that we have a Parquet table already created, we can convert easily to Delta Table, let's do this. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); DeltaTable.convertToDelta(session, "parquet.`table/product`"); } DeltaTable.convertToDelta method is responsible to convert parquet table to Delta table. Note that we had to use SparkSession as a parameter and also specify the path of parquet table using this format "parquet.``" . The result after execution you can see in the picture below. After conversion running, Delta creates the famous _delta_log directory containing commit info and checkpoint files. Well that's it, I hope you enjoyed it!
- Entendendo o AWS SNS - Simple Notification Service
O SNS (Simple Notification Service), provê um serviço de notificação utilizando o paradigma Pub/Sub. É uma forma de publicar mensagens destinadas a um ou mais inscritos na forma de endpoints. Confuso? Vamos aprofundar um pouco mais sobre o assunto. O termo Pub/Sub é um tema bastante relacionado em arquiteturas guiada a eventos, conhecida tecnicamente como event-driven architecture. Nesta arquitetura a publicação de mensagens podem ser feitas através de notificações para um ou mais destinos já conhecidos, criando uma arquitetura mais assíncrona. Para que um destino se torna conhecido, deve haver uma forma de sinalizar que aquele destino seja um candidato a receber qualquer mensagem da origem, ou seja, o destino é um subscriber (sub) ou inscrito. Mas inscrito aonde? Todo subscriber pode ser inscrito em um ou mais publicadores, no contexto do SNS, seria Tópicos, no qual falaremos mais adiante. Dessa forma, para cada publicação feita, um inscrito naquela publicação, receberá uma mensagem. Um exemplo, é quando recebemos notificações de algum aplicativo instalado no nosso Smartphone via push, ou seja, na instalação daquele aplicativo nos tornamos um inscrito (sub ou assinante), ou seja, para que qualquer publicação feita pelo aplicativo, seremos notificados. Provavelmente este serviço pode utilizar SNS como solução. O exemplo anterior é uma visão de mais alto nível como forma de introdução. O tema é um pouco mais amplo e será abordado a seguir. O SNS é dividido em Tópicos e Assinaturas, ambos trabalham de forma conjunta e oferecem diversos recursos através do próprio console da AWS ou de APIs. 1. Tópicos Os Tópicos são pontos de acesso que funciona como interface entre o Publisher (publicador) e o Subscriber (inscrito). Todo aplicativo deve estar inscrito a um Tópico para que receba notificações, ou seja, é o único ponto de acesso para a comunicação. Um Tópico é dividido entre o tipo Fifo e o Padrão: Fifo: O tipo Fifo permite um controle mais rigoroso de ordenação das mensagens (first in/first out), possui um limite de throughput de até 300 publicações por segundo, garante a entrega da mensagem uma única vez e por fim, fornece suporte somente ao protocolo de assinatura SQS. Padrão: O tipo padrão possui algumas diferenças que o torna mais flexível, porém menos rigoroso se comparado ao Fifo. Começando pela ordenação de mensagens. Este padrão visa uma ordenação de mensagens da maneira mais apropriada, ou seja, não possui uma regra que visa ordenar as mensagens por chegada. O throughput de publicações/segundo é maior que a do tipo Fifo e fornece suporte de protocolos de assinaturas para SQS, Lambda, HTTP, SMS, E-mail e endpoints de aplicativos móveis. Limite de tópicos Por conta da AWS, é permitido criar até 100.000 tópicos 2. Assinaturas A Assinatura é a forma de conectar ou inscrever um endpoint para um Tópico específico. Ou seja, cada Assinatura deve-se especificar um Tópico (existente) e o endpoint em que deseja receber as notificações publicadas pelo Tópico que será assinado. O endpoint é representado por diferentes tipos: AWS SQS HTTP HTTPS AWS Kinesis Data Firehose E-mail SMS AWS Lambda Resumindo, cada endpoint acima, são formatos de entrega/transporte para recebimento de notificações. Limite de Assinaturas A AWS permite até 10 milhões de assinaturas por tópico. 3. Limite de tamanho da mensagem O SNS possui um limite de tamanho de mensagem de até 256 KB. Já as mensagens para SMS são de 140 bytes. 4. Tipos de mensagens O SNS possui suporte para diferentes tipos de mensagens, como por exemplo texto, XML, JSON e texto sem formato. 5. SNS X SQS O SNS e o SQS são coisas diferentes, mas que possuem relação. Como falamos anteriormente, o SQS pode ser utlizado como endpoint, ou seja, um protocolo SQS que assina um Tópico SNS passa a receber qualquer mensagem publicada no Tópico tornando um processo de integração assíncrona. Na imagem acima descreve o contexto do SNS junto aos Tópicos e algumas SQS (subscribers) simulando as assinaturas. Após assinadas, todas estas SQS receberão mensagens publicadas do(s) Tópico(s). A SQS 1 receberá notificações dos Tópicos 1 e 2, a SQS 2 receberá notificações dos Tópicos 2 e 3 e por fim, a SQS 3 receberá somente do Tópico 3. Em breve será liberado um post com alguns exemplos de códigos que te ajudará entender ainda mais sobre o SNS mais a fundo. Caso queira saber mais detalhes, recomendo ler a documentação oficial através deste link. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Amazon AWS: Descomplicando a computação na nuvem é um livro para aqueles que estão começando na AWS e querem entender o funcionamento e a dinâmicas dos serviços como S3, EC2, ElasticCache, Route 53, SNS, Cloudwatch e muito mais. AWS Cookbook (Versão Inglês) é um guia prático contendo 70 receitas detalhadas sobre os recursos da AWS e como resolver diferentes desafios. É um livro bem escrito e de fácil entendimento cobrindo os principais serviços da AWS através de exemplos práticos. A AWS ou Amazon Web Services é o serviço de nuvem mais utilizando atualmente em todo o mundo, caso queira entender mais sobre o tema para ficar bem posicionado no mercado, recomendo fortemente o estudo. É isso, curtiu? Até mais!
- First steps with DBT - Data Build Tool
DBT has been used by a lot of companies on Data area and I believe that we can extract good insights in this post about it. That's going to be a practical post showing how DBT works it and hope you guys enjoy it. What's DBT? DBT means Data Build Tool and enables teams to transform data already loaded in their warehouse with simple select statements. DBT does the T in ELT processes, in the other words, he doesn't work to extract and load data but he's useful to transform it. Step 1: Creating a DBT Project Now, we're assume that DBT is already installed but if not, I recommend see this link. After DBT installed you can create a new project using CLI or you can clone this project from the DBT Github repository. Here for this post we're going to use CLI mode to create our project and also to complete the next steps. To create a new project, run the command below. dbt init After running this command, you need to type the project's name and which warehouse or database you're going to use like the image below. For this post, we're going to use postgres adapter. It's very important that you have a postgres database already installed or you can up a postgres image using docker. About adapters, DBT supports different of them and you can check here. I created a table structure and also loaded it with data simulating data from a video platform called wetube and we're going to use them to understand how DBT works it. Follow the structure: Step 2: Structure and more about DBT After running dbt init command to create the project, a structure of folders and files below will be created. I won't talk about the whole directories of project but I'd like to focus in two of them. Sources Sources are basically the data already loaded into your warehouse. In DBT process, sources have the same meaning of raw data. There's no folders representing source data for this project but you need to know about this term because we're going to set up tables already created as sources for the next sections. Seeds Seeds is an interesting and useful mechanism to load static data into your warehouse through CSV files. If you want to load these data you need to create a CSV file on this directory and run the command below. dbt seed For each field on CSV file, DBT will infer their types and create a table into warehouse or database. Models DBT works with Model paradigm, the main idea is that you can create models through the transformation using SQL statements based on tables sources or existing models Every SQL file located in your model folder will create a model into your warehouse when the command below runs. dbt run Remember that a model can be created through a source or another model and don't worry about this, I'll show you more details about it. Step 3: Setting up database connection After project already created, we need to set up our database's connection and here at this post, we're going to use postgres as database. After initialize the project a bunch of files are created and one of them is called profiles.yml. profiles.yml file is responsible to control the different profiles to the different database's connection like dev and production environment. If you've noticed, we can't see this file on the image above because this file is created outside of project to avoid sensitive credentials. You can find this file in ~/.dbt/ directory. If you note, we have one profile named dbt_blog and a target called dev, by default the target refer to dev with the database's connection settings. Also, It's possible to create one or more profiles and targets, it enables working with different environments. Another important detail is that dbt_blog profile should be specified on dbt_project.yml file as a default profile. For the next sections, we'll discuss what and how dbt_project.yml file works it. Step 4: Creating dbt_project.yml file Every DBT project has a dbt_project.yml file, you can set up informations like project name, directories, profiles and materialization type. name: 'dbt_blog' version: '1.0.0' config-version: 2 profile: 'dbt_blog' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_packages" models: dbt_blog: # Config indicated by + and applies to all files under models/example/ mart: +materialized: table Note that profile field was set up as the same profile specified on profiles.yml file and another important detail is about materialized field. Here was set up as a "table" value but by default, is a "view". Materialized fields allows you to create models as a table or view on each run. There are others type of materialization but we won't discuss here and I recommend see dbt docs. Step 5: Creating our first model Creating first files Let's change a little and let's going to create a sub-folder on model directory called mart and inside this folder we're going to create our .SQL files and also another important file that we don't discuss yet called schema.yml. Creating schema file Schema files are used to map sources and to document models like model's name, columns and more. Now you can create a file called schema.yml e fill up with these informations below. version: 2 sources: - name: wetube tables: - name: account - name: city - name: state - name: channel - name: channel_subs - name: video - name: video_like - name: user_address models: - name: number_of_subs_by_channel description: "Number of subscribers by channel" columns: - name: id_channel description: "Channel's ID" tests: - not_null - name: channel description: "Channel's Name" tests: - not_null - name: num_of_subs description: "Number of Subs" tests: - not_null Sources: At sources field you can include tables from your warehouse or database that's going to be used on model creation. models: At models field you can include the name's model, columns and their description Creating a model This part is where we can create SQL scripts that's going to result in our first model. For the first model, we're going to create a SQL statement to represent a model that we can see the numbers of subscribers by channel. Let's create a file called number_of_subs_by_channel.sql and fill up with these scripts below. with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_channel_subs as ( select * from {{ source('wetube','channel_subs') }} ), number_of_subs_by_channel as ( select source_channel.id_channel, source_channel.name, count(source_channel_subs.id_subscriber) num_subs from source_channel_subs inner join source_channel using (id_channel) group by 1, 2 ) select * from number_of_subs_by_channel Understanding model creation Note that we have multiple scripts separated by common table expression (CTE) that becomes useful to understand the code. DBT enables using Jinja template {{ }} bringing a better flexibility to our code. The usage of keyword source inside Jinja template means that we're referring source tables. To refer a model you need to use ref keyword. The last SELECT statement based on source tables generates the model that will be created as table in the database. Running our first model Run the command below to create our first model dbt run Output: Creating another model Imagine that we need to create a model containing account information and it's channels. Let's get back to schema.yml file to describe this new model. - name: account_information description: "Model containing account information and it's channels" columns: - name: id_account description: "Account ID" tests: - not_null - name: first_name description: "First name of user's account" tests: - not_null - name: last_name description: "Last name of user's account" tests: - not_null - name: email description: "Account's email" tests: - not_null - name: city_name description: "city's name" tests: - not_null - name: state_name description: "state's name" tests: - not_null - name: id_channel description: "channel's Id" tests: - not_null - name: channel_name description: "channel's name" tests: - not_null - name: channel_creation description: "Date of creation name" tests: - not_null Now, let's create a new SQL file and name it as account_information.sql and put scripts below: with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_city as ( select * from {{ source('wetube','city') }} ), source_state as ( select * from {{ source('wetube','state') }} ), source_user_address as ( select * from {{ source('wetube','user_address') }} ), source_account as ( select * from {{ source('wetube','account') }} ), account_info as ( select account.id_user as id_account, account.first_name, account.last_name, account.email, city.name as city_name, state.name as state_name, channel.id_channel, channel.name as channel, channel.creation_date as channel_creation FROM source_account account inner join source_channel channel on (channel.id_account = account.id_user) inner join source_user_address user_address using (id_user) inner join source_state state using (id_state) inner join source_city city using (id_city) ) select * from account_info Creating our last model For our last model, we going to create a model about how many likes has a video. Let's change again the schema.yml to describe and to document our future and last model. - name: total_likes_by_video description: "Model containing total of likes by video" columns: - name: id_channel description: "Channel's Id" tests: - not_null - name: channel description: "Channel's name" tests: - not_null - name: id_video description: "Video's Id" tests: - not_null - name: title description: "Video's Title" tests: - not_null - name: total_likes description: "Total of likes" tests: - not_null Name it a file called total_likes_by_video.sql and put the code below: with source_video as ( select * from {{ source('wetube','video') }} ), source_video_like as ( select * from {{ source('wetube','video_like') }} ), source_account_info as ( select * from {{ ref('account_information') }} ), source_total_like_by_video as ( select source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title, count(*) as total_likes FROM source_video_like inner join source_video using (id_video) inner join source_account_info using (id_channel) GROUP BY source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title ORDER BY total_likes DESC ) select * from source_total_like_by_video Running DBT again After creation of our files, let's run them again to create the models dbt run Output The models were created in the database and you can run select statements directly in your database to check it. Model: account_information Model: number_of_subs_by_channel Model: total_likes_by_video Step 6: DBT Docs Documentation After generated our models, now we're going to generate docs based on these models. DBT generates a complete documentation about models and sources and their columns and also you can see through a web page. Generating docs dbt docs generate Running docs on webserver After docs generated you can run command below to start a webserver on port 8080 and see the documentation locally. dbt docs serve Lineage Another detail about documentation is that you can see through of a Lineage the models and it's dependencies. Github code You can checkout this code through our Github page. Cool? I hope you guys enjoyed it!
- Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes
There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent. In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode: PERMISSIVE DROPMALFORMED FAILFAST CSV file content This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema. "type","country","city","engines","first_flight","number_built" "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A220","Canada","Calgary","two",2013-03-02,179 "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A320","France","Lyon","two",1986-06-10,10066 "Airbus A330","France","Lyon","two",1992-01-02,1521 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York",2,1967-08-03,10636 "Airbus A220","Canada","Calgary",2,2013-03-02,179 Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate. Creating a Dataframe using PERMISSIVE mode The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of PERMISSIVE mode Creating a Dataframe using DROPMALFORMED mode The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of DROPMALFORMED mode After execution it's possible to realize that the corrupted records aren't available at Dataframe. Creating a Dataframe using FAILFAST mode Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "FAILFAST") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Books to study and read If you want to learn more about and reach a high level of knowledge, I strongly recommend reading the following book(s): Spark: The Definitive Guide: Big Data Processing Made Simple is a complete reference for those who want to learn Spark and about the main Spark's feature. Reading this book you will understand about DataFrames, Spark SQL through practical examples. The author dives into Spark low-level APIs, RDDs and also about how Spark runs on a cluster and how to debug and monitor Spark clusters applications. The practical examples are in Scala and Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library with the new version of Spark, this book explores the main Spark's features like Dataframes usage, Spark SQL that you can uses SQL to manipulate data and Structured Streaming to process data in real time. This book contains practical examples and code snippets to facilitate the reading. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark is a book that explores best practices using Spark and Scala language to handle large-scale data applications, techniques for getting the most out of standard RDD transformations, how Spark SQL's new interfaces improve performance over SQL's RDD data structure, examples of Spark MLlib and Spark ML machine learning libraries usage and more. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming covers the basic concepts of Python through interactive examples and best practices. Learning Scala: Practical Functional Programming for the Jvm is an excellent book that covers Scala through examples and exercises. Reading this bool you will learn about the core data types, literals, values and variables. Building classes that compose one or more traits for full reusability, create new functionality by mixing them in at instantiation and more. Scala is one the main languages in Big Data projects around the world with a huge usage in big tech companies like Twitter and also the Spark's core language. Cool? I hope you enjoyed it!
- Working with Schemas in Spark Dataframes using PySpark
What's a schema in the Dataframes context? Schemas are metadata that allows working with a standardized Data. Well, that was my definition about schemas but we also can understanding schemas as a structure that represents a data context or a business model. Spark enables using schemas with Dataframes and I believe that is a good point to keep data quality, reliability and we also can use these points to understand the data and connect to the business. But if you know a little more about Dataframes, working with schema isn't a rule. Spark provides features that we can infer to a schema without defined schemas and reach to the same result, but depending on the data source, the inference couldn't work as we expect. In this post we're going to create a simple Dataframe example that will read a CSV file without a schema and another one using a defined schema. Through examples we'll can see the advantages and disadvantages. Let's to the work! CSV File content "type","country","engines","first_flight","number_built" "Airbus A220","Canada",2,2013-03-02,179 "Airbus A320","France",2,1986-06-10,10066 "Airbus A330","France",2,1992-01-02,1521 "Boeing 737","USA",2,1967-08-03,10636 "Boeing 747","USA",4,1969-12-12,1562 "Boeing 767","USA",2,1981-03-22,1219 If you noticed in the content above, we have different data types. We have string, numeric and date column types. The content above will be represented by airliners.csv in the code. Writing a Dataframe without Schema from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result It seems that worked fine but if you look with attention, you'll realize that in the schema structure there are some field types that don't match with their values, for example fields like number_built, engines and first_flight. They aren't string types, right? We can try to fix it adding the following parameter called "inferSchema" and setting up to "true". from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result Even inferring the schema, the field first_flight keeping as a string type. Let's try to use Dataframe with a defined schema to see if this details will be fixed. Writing a Dataframe with Schema Now it's possible to see the differences between the codes. We're adding an object that represents the schema. This schema describes the content in CSV file, you can note that we have to describe the column name and type. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() StructSchema = StructType([ StructField("type", StringType()), StructField("country", StringType()), StructField("engines", IntegerType()), StructField("first_flight", DateType()), StructField("number_built", IntegerType()) ]) air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .schema(StructSchema) \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result After we defined the schema, all the field types match with their values. This shows how important is to use schemas with Dataframes. Now it's possible to manipulate the data according to the type with no concerns. Books to study and read If you want to learn more about and reach a high level of knowledge, I strongly recommend reading the following book(s): Spark: The Definitive Guide: Big Data Processing Made Simple is a complete reference for those who want to learn Spark and about the main Spark's feature. Reading this book you will understand about DataFrames, Spark SQL through practical examples. The author dives into Spark low-level APIs, RDDs and also about how Spark runs on a cluster and how to debug and monitor Spark clusters applications. The practical examples are in Scala and Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library with the new version of Spark, this book explores the main Spark's features like Dataframes usage, Spark SQL that you can uses SQL to manipulate data and Structured Streaming to process data in real time. This book contains practical examples and code snippets to facilitate the reading. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark is a book that explores best practices using Spark and Scala language to handle large-scale data applications, techniques for getting the most out of standard RDD transformations, how Spark SQL's new interfaces improve performance over SQL's RDD data structure, examples of Spark MLlib and Spark ML machine learning libraries usage and more. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming covers the basic concepts of Python through interactive examples and best practices. Learning Scala: Practical Functional Programming for the Jvm is an excellent book that covers Scala through examples and exercises. Reading this bool you will learn about the core data types, literals, values and variables. Building classes that compose one or more traits for full reusability, create new functionality by mixing them in at instantiation and more. Scala is one the main languages in Big Data projects around the world with a huge usage in big tech companies like Twitter and also the Spark's core language. Cool? I hope you enjoyed it!
- Criando Schemas com Spark Dataframes usando PySpark
O que é um schema no contexto de Dataframes? Os schemas são metadados que permitem trabalhar com dados padronizados. Bem, essa foi minha definição sobre esquemas, mas também podemos entender os schemas como uma estrutura que representa um contexto de dados ou um modelo de negócio. O Spark possibilita a definição de schemas em Dataframes visando manter a qualidade dos dados, confiabilidade e manter um catalogo de metadados. Por mais que seja possível não utilizar schemas, somente utilizando métodos do Spark para inferir o Dataframe e extrair o metadado, a utilização de schemas é uma boa prática quando se trabalha com governança de dados. Neste post vamos criar um exemplo de Dataframe simples que irá ler um arquivo CSV sem um schema e outro exemplo usando um schema definido. Através de exemplos veremos as vantagens e desvantagens. Conteúdo do arquivo CSV "type","country","engines","first_flight","number_built" "Airbus A220","Canada",2,2013-03-02,179 "Airbus A320","France",2,1986-06-10,10066 "Airbus A330","France",2,1992-01-02,1521 "Boeing 737","USA",2,1967-08-03,10636 "Boeing 747","USA",4,1969-12-12,1562 "Boeing 767","USA",2,1981-03-22,1219 Se você percebeu no conteúdo acima, temos diferentes tipos de dados. Temos tipos de coluna string, numérica e date. O conteúdo acima será representado pelo arquivo airliners.csv no código. Criando um Dataframe sem Schema definido from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Resultado Dataframe Parece que funcionou bem, mas se você olhar com atenção, perceberá que na estrutura do esquema existem alguns tipos de campos que não correspondem aos seus valores, por exemplo, campos como number_built, engines e first_flight. Eles não são tipos de string, certo? Podemos tentar corrigir essa inconsistência adicionando no Dataframe o seguinte parâmetro chamado inferSchema e adicionando o valor para true. from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Resultado Dataframe utilizando o parâmetro inferSchema Mesmo inferindo o schema, o campo first_flight manteve-se como tipo string. Agora, vamos tentar definir um schema para o Dataframe, para que tenhamos certeza de que todos os tipos sejam padronizados de acordo com o seu valor. Criando um Dataframe com Schema definido No código abaixo temos um complemento do código anterior, neste caso, estamos de fato criando um schema para ser utilizado no Dataframe. Perceba que é possível utilizar a classes StructSchema para definição do schema e StructField para os campos e seus tipos. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() StructSchema = StructType([ StructField("type", StringType()), StructField("country", StringType()), StructField("engines", IntegerType()), StructField("first_flight", DateType()), StructField("number_built", IntegerType()) ]) air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .schema(StructSchema) \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Resultado Dataframe utilizando o Schema definido Depois de definirmos o schema, todos os tipos de campos correspondem aos seus valores. Isso mostra o quão importante é usar schemas com Dataframes. Agora é possível manipular os dados de acordo com o tipo e sem preocupações de se ter dados inconsistente e visando a qualidade dos dados para o consumo. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Spark: The Definitive Guide: Big Data Processing Made Simple (Versão Inglês) é uma referência completa para quem quer aprender o Spark e sobre as suas principais funcionalidades. Lendo esse livro, você vai aprender sobre DataFrames, Spark SQL através de exemplos práticos. O autor mergulha nas APIs de baixo nível do Spark, RDDs e também sobre como o Spark é executado em um cluster e como depurar e monitorar os aplicativos de clusters do Spark. Os exemplos práticos estão em Scala e Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library (Versão Inglês) com a nova versão do Spark, este livro explora os principais recursos do Spark, como o uso de Dataframes, Spark SQL no qual você pode usar SQL para manipular dados e Structured Streaming para processar dados em tempo real. Este livro contém exemplos práticos e trechos de código para facilitar a leitura. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (Versão Inglês) é um livro que explora as melhores práticas usando a linguagem Spark e Scala para lidar com aplicações de dados em larga escala, técnicas para transformações utilizando RDD, e também mostra como as novas interfaces do Spark SQL melhoram o desempenho sobre a estrutura de dados RDD do SQL, exemplos de Spark MLlib e o uso de bibliotecas de aprendizado de máquina de ML e muito mais. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming (Versão Inglês) abrange os conceitos básicos do Python por meio de exemplos interativos e práticas recomendadas. Learning Scala: Practical Functional Programming for the Jvm (Versão Inglês) é um excelente livro que aborda a linguagem Scala através de exemplos e exercícios práticos. Lendo este livro, você aprenderá sobre os principais tipos de dados, literais, valores e variáveis. Construir classes que compõem uma ou mais características para total reutilização, criar novas funcionalidades misturando-as na instanciação e muito mais. Scala é uma das principais linguagens em projetos de Big Data em todo o mundo, com grande uso em grandes empresas de tecnologia como o Twitter e também a linguagem principal do Spark. Bom é isso, espero que tenham gostado!
- Differences between External and Internal tables in Hive
There are two ways to create tables in the Hive context and this post we'll show the differences, advantages and disadvantages. Internal Table Internal tables are known as Managed tables and we'll understand the reason in the following. Now, let's create an internal table using SQL in the Hive context and see the advantages and disadvantages. create table coffee_and_tips_table (name string, age int, address string) stored as textfile; Advantages To be honest I wouldn't say that it's an advantage but Internal tables are managed by Hive Disadvantages Internal tables can't access remote storage services for example in clouds like Amazon AWS, Microsoft Azure and Google Cloud. Dropping Internal tables all the data including metadata and partitions will be lost. External Table External tables has some interesting features compared to Internal tables and it's a good and recommended approach when we need to create tables. In the script below you can see the difference between Internal table creation and External table related to the last section. We just added the reserved word external in the script. create external table coffee_and_tips_external (name string, age int, address string) stored as textfile; Advantages The data and metadata won't be lost if drop table External tables can be accessed and managed by external process External tables allows access to remote storage service as a source location Disadvantages Again, I wouldn't say that it's a disadvantage but if you need to change schema or dropping a table, probably you'll need to run a command to repair the table as shown below. msck repair table Depending on the volume, this operation may take some time to complete. To check out a table type, run the following command below and you'll see at the column table_type the result. hive> describe formatted That's it, I hope you guys enjoy it! References: https://hive.apache.org/