top of page

Search

87 itens encontrados para ""

  • Introdução ao Apache Hive com Spark e Java

    O Hive é um software de Data Warehouse que possibilita a leitura, escrita e o gerenciamento de dados distribuídos e permite a utilização de SQL em consultas estruturadas. Vamos utilizar o contexto do Spark para a configuração inicial, mas é possível fazer de outras formas sem a utilização do Spark. Maven org.apache.spark spark-core_2.12 2.4.5 org.apache.spark spark-hive_2.12 2.4.5 O primeiro passo é criar a configuração do contexto inicial: String dwDir = new File("warehouse-dir").getAbsolutePath(); SparkConf sparkConf = new SparkConf() .set("spark.sql.warehouse.dir", wareHouseDir); SparkSession sparkSession = SparkSession.builder() .config(sparkConf) .enableHiveSupport() .master("local[1]") .getOrCreate(); Entendendo as configurações acima: 1. Neste trecho a variável dwDir recebe o caminho da pasta warehouse-dir que será utilizada nas configurações do Spark. Até então essa pasta não foi criada. String dwDir = new File("warehouse-dir").getAbsolutePath(); 2. No próximo trecho é setado a parâmetro spark.sql.warehouse.dir no contexto do Spark com o caminho do diretório referenciado no primeiro trecho. Dessa forma o Spark usará este diretório como o repositório dos bancos que serão criados. SparkConf sparkConf = new SparkConf() .set("spark.sql.warehouse.dir", wareHouseDir); 3. E por fim, a criação do SparkSession onde estão as configurações criadas anteriormente, o local onde o master será executado (localmente - local[1]) e a ativação do Hive. SparkSession sparkSession = SparkSession.builder() .config(sparkConf) .enableHiveSupport() .master("local[1]") .getOrCreate(); 4. Agora podemos executar algumas operações de DML e DDL 4.1. Criando um banco de dados sparkSession.sql("CREATE DATABASE IF NOT EXISTS hive_tutorial"); 4.2. Criando os Parquets Neste passo iremos criar uma tabela já apontando para o DataSource. Neste caso o DataSource será um Parquet. O Parquet é um arquivo de formato colunar que provê um melhor desempenho nas consultas. Para isso vamos criar um Parquet a partir de um JSON com seguinte o conteúdo: Arquivo data/pessoa.json {"id":1,"nome":"Joao","idade":12} {"id":2,"nome":"Kelly","idade":21} {"id":3,"nome":"Monica","idade":29} {"id":4,"nome":"Laura","idade":32} {"id":5,"nome":"Kiko","idade":23} {"id":6,"nome":"Miguel","idade":55} {"id":7,"nome":"Junior","idade":25} {"id":8,"nome":"Luis","idade":36} Executando a leitura do arquivo para um DataFrame. Dataset df = sparkSession.read().json("data/pessoa.json"); Criando os arquivos Parquet com base no DataFrame dentro do diretório data/parquet do seu projeto df.write().parquet("data/parquet/") Veja que os arquivos foram criados Pronto, agora temos um Data Source criado. 4.3. Criando a Tabela Segue os passos: sparkSession.sql("USE hive_tutorial"); Após selecionar o banco HIVE_TUTORIAL. O comando CREATE TABLE possui alguns argumentos extras, segue: STORED AS PARQUET : É um argumento que o Hive utilizará para saber que tipo de arquivo será usado na conversão, neste caso o Parquet. LOCATION: Diretório do Data Source criado anteriormente. sparkSession.sql("CREATE TABLE IF NOT EXISTS pessoa " + "(id BIGINT, nome STRING, idade BIGINT) " + "STORED AS PARQUET " + "LOCATION 'data/parquet/'"); É possível verificar a tabela criada executando o trecho abaixo: sparkSession.sql("SHOW TABLES").show(); Independente do fim da execução do programa, a tabela será mantida. Diferente de uma view criada com SparkSQL que é somente mantida em memória. 5. Exemplos de consultas Selecione o banco de dados sparkSession.sql("USE hive_tutorial"); Exemplo 1 sparkSession.sql("SELECT id, nome, idade " + "FROM hive_tutorial.pessoa " + "WHERE idade between 10 and 30 " + "ORDER BY nome desc ").show(); Resultado Exemplo 2 sparkSession.sql("SELECT count(nome) " + "FROM hive_tutorial.pessoa " + "WHERE idade > 45 ").show(); Resultado 6. Exemplos de consultas mais complexas Agora vamos criar duas novas tabelas para explorar melhor os recursos do Hive. Crie o arquivo JSON data/produto.json {"id":1,"desc":"video game","preco":1800.0,"tipo":"eletronico"} {"id":2,"desc":"geladeira","preco":1600.0,"tipo":"eletronico"} {"id":3,"desc":"cama","preco":2000.0,"tipo":"quarto"} {"id":4,"desc":"armário","preco":750.0,"tipo":"sala"} {"id":5,"desc":"notebook","preco":4500.0,"tipo":"eletronico"} {"id":6,"desc":"mesa","preco":2500.0,"tipo":"sala"} {"id":7,"desc":"cadeira","preco":110.0,"tipo":"sala"} {"id":8,"desc":"TV","preco":1500.0,"tipo":"eletronico"} {"id":9,"desc":"fogão","preco":900.0,"tipo":"cozinha"} Crie os parquets para Produto Dataset dfP = sparkSession.read().json("data/produto.json"); dfProd.write().parquet("data/parquet/produto/"); Crie o arquivo JSON data/item.json {"id":1,"id_produto":2,"qtde":1} {"id":2,"id_produto":1,"qtde":2} {"id":3,"id_produto":3,"qtde":3} {"id":4,"id_produto":4,"qtde":2} {"id":5,"id_produto":5,"qtde":5} Crie os parquets para Item Dataset dfItem = sparkSession.read().json("data/item.json"); dfItem.write().parquet("data/parquet/item/"); Com base nos parquets criado, agora vamos criar a tabelas Produto e Item sparkSession.sql("USE hive_tutorial"); sparkSession.sql("CREATE TABLE IF NOT EXISTS produto " + "(id BIGINT, desc STRING, " + "preco BIGINT, " + "tipo STRING) " + "STORED AS PARQUET " + "LOCATION 'data/parquet/produto'"); sparkSession.sql("CREATE TABLE IF NOT EXISTS item " + "(id BIGINT, " + "id_produto BIGINT, " + "qtde BIGINT) " + "STORED AS PARQUET " + "LOCATION 'data/parquet/item/'"); Tabelas criadas sparkSession.sql("SHOW TABLES").show(); Consultas utilizando JOIN Exemplo 1 sparkSession.sql("SELECT prod.id, " + "prod.desc, " + "prod.preco, " + "prod.tipo, " + "item.qtde " + "FROM produto prod inner join item item " + "on (prod.id = item.id_produto) " + "order by prod.id ").show(); Resultado Exemplo 2 sparkSession.sql(" SELECT " + "prod.tipo, " + "sum(item.qtde) " + "FROM produto prod inner join item item " + "on (prod.id = item.id_produto) " + "group by prod.tipo").show(); Resultado Exemplo 3 sparkSession.sql(" SELECT " + "prod.tipo, " + "sum(item.qtde), " + "sum(item.qtde * prod.preco) " + "FROM produto prod inner join item item " + "on (prod.id = item.id_produto) " + "group by prod.tipo").show(); Resultado Pra finalizar, dê uma olhada na documentação oficial para mais detalhes: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html https://hive.apache.org/ É isso, espero ter ajudado!

  • Primeiros passos com Delta Lake

    O que é o Delta Lake? O Delta Lake é um projeto open-source que visa gerenciar a camada de armazenamento (Storage Layer) que segundo o seu conceito: "traz confiabilidade para Datalakes". Na prática é uma abstração do Apache Spark reutilizando os mesmos mecanismos mas oferencendo recursos extras interessantes. Dentre eles, suporte para transações ACID. Todos sabem que manter a integridade dos dados em uma pipeline de dados é uma tarefa crítica diante do alta concorrência de leitura e escrita de dados. Neste caso, o ACID possibilita gerenciar ambientes como estes. Dentre outras vantagens, o Delta Lake provê histórico de auditoria, versionamento de dados, suporta operações DML do tipo delete e update e vários outras vantagens. Para este tutorial, vamos simular uma pipeline de dados simples de forma local, com o foco no que o Delta Lake pode nos oferecer de vantagens. Na etapa de ingestão, vamos carregar um Spark DataFrame a partir de um JSON, criaremos uma view temporária para nos auxiliar, criaremos uma tabela Delta via SQL e por fim, utilizaremos o Delta Lake para executar algumas operações de DML utilizando a view que será criada. Vamos utilizar Java e Maven para gerenciar as dependências, além de Spark e Hive no qual este, vai nos auxiliar em manter a tabelas em um catálogo de dados. Maven org.apache.spark spark-core_2.12 3.0.1 org.apache.spark spark-sql_2.12 3.0.1 org.apache.spark spark-hive_2.12 3.0.1 io.delta delta-core_2.12 0.8.0 O código será desenvolvido em pequenos trechos para um melhor entendimento. Configurando Spark com Delta e Hive String val_ext="io.delta.sql.DeltaSparkSessionExtension"; String val_ctl="org.apache.spark.sql.delta.catalog.DeltaCatalog"; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("app"); sparkConf.setMaster("local[1]"); sparkConf.set("spark.sql.extensions",var_ext); sparkConf.set("spark.sql.catalog.spark_catalog",val_ctl); SparkSession sparkSession = SparkSession.builder() .config(sparkConf) .enableHiveSupport() .getOrCreate(); Entendendo o trecho acima Definimos duas variáveis val_ext e val_ctl atribuindo os valores para as chaves (spark.sql.extensions e spark.sql.catalog.spark_catalog) . Estas, necessárias para a configuração do Delta junto com o Spark Demos o nome do contexto do Spark de app Como não estamos rodando o Spark em um cluster, o master está configurado para rodar local local[1] O Spark tem suporte para o Hive, nesse caso o habilitamos no trecho enableHiveSupport( ) Ingestão dos Dados Vamos trabalhar com Spark Dataframe como fonte dos dados. Carregaremos um Dataframe a partir de um JSON. Arquivo order.json {"id":1, "date_order": "2021-01-23", "customer": "Jerry", "product": "BigMac", "unit": 1, "price": 8.00} {"id":2, "date_order": "2021-01-22", "customer": "Olivia", "product": "Cheese Burguer", "unit": 3, "price": 21.60} {"id":3, "date_order": "2021-01-21", "customer": "Monica", "product": "Quarter", "unit": 2, "price": 12.40} {"id":4, "date_order": "2021-01-23", "customer": "Monica", "product": "McDouble", "unit": 2, "price": 13.00} {"id":5, "date_order": "2021-01-23", "customer": "Suzie", "product": "Double Cheese", "unit": 2, "price": 12.00} {"id":6, "date_order": "2021-01-25", "customer": "Liv", "product": "Hamburger", "unit": 1, "price": 2.00} {"id":7, "date_order": "2021-01-25", "customer": "Paul", "product": "McChicken", "unit": 1, "price": 2.40} Criando Dataframe Dataset df = sparkSession.read().json("datasource/"); df.createOrReplaceGlobalTempView("order_view"); Entendendo o trecho acima No trecho anterior, estamos criando um Dataframe a partir do arquivo JSON que está dentro do diretório datasource/ , crie este diretório para que a estrutura do seu código fique mais compreensiva e em seguida, crie o arquivo order.json com base no conteúdo mostrado anteriormente. Por último, criamos um view temporária que vai nos auxiliar mais a frente nos próximos passos. Criando a Tabela Delta (Delta Table) Vamos criar a Delta Table a partir de um script SQL. A princípio a criação é simples, mas perceba que usamos tipagens diferentes de uma tabela utilizada em um banco relacional. Como por exemplo, utilizamos STRING ao invés de VARCHAR e assim por diante. Estamos particionando a tabela pelo campo date_order. Este campo foi escolhido como partição pois acreditamos que haverá diferentes datas. Dessa forma, as consultas podem utilizar este campo como filtro, visando um melhor desempenho. E por fim, definimos a tabela como Delta Table a partir do trecho USING DELTA. String statement = "CREATE OR REPLACE TABLE orders (" + "id STRING, " + "date_order STRING," + "customer STRING," + "product STRING," + "unit INTEGER," + "price DOUBLE) " + "USING DELTA " + "PARTITIONED BY (date_order) "; sparkSession.sql(statement); Entendendo o trecho acima No trecho anterior estamos criando uma tabela Delta chamada orders e em seguida executamos a criação. Operações DML Delta suporta operações como Delete, Update e Upsert utilizando Merge Utilizando Merge junto com Insert e Update Neste passo, vamos executar um Merge que possibilita controlar o fluxo de inserção e atualização de dados através de uma tabela, Dataframe ou view. O Merge trabalha a partir de row matches que ficará mais compreensível no trecho seguinte. String mergeStatement = "Merge into orders " + "using global_temp.order_view as orders_view " + "ON orders.id = orders_view.id " + "WHEN MATCHED THEN " + "UPDATE SET orders.product = orders_view.product," + "orders.price = orders_view.price " + "WHEN NOT MATCHED THEN INSERT * "; sparkSession.sql(mergeStatement); Entendendo o trecho acima No trecho acima estamos executando o Merge a partir da view order_view criada nos passos anteriores. No mesmo trecho temos uma condição orders.id = orders_view.id que vai auxiliar nos matches seguintes. Caso a condição anterior seja verdadeira, ou seja o MATCHED seja verdadeiro. Os dados serão atualizados. Caso a condição não seja verdadeira, NOT MATCHED. Os dados serão inseridos. No caso acima, os dados serão inseridos, pois até então não existia dados na tabela orders. Execute o comando abaixo para visualizar os dados inseridos. sparkSession.sql("select * from orders").show(); Atualize o arquivo datasource/order.json alterando o campo product, price e execute todos os trechos novamente. Você verá que todos os registros serão atualizados. Comando Update É possível executar Update sem a necessidade de usar o Merge, basta executar o comando abaixo: String updateStatement = "update orders " + "set product = 'Milk-Shake' " + "where id = 2"; sparkSession.sql(updateStatement); Comando Delete String deleteStatement = "delete from pedidos where id = 2"; sparkSession.sql(deleteStatement); Além de poder executar o comando Delete, é possível utilizar este comando junto ao Merge. Entendendo o Delta Lake Transaction Log (DeltaLog) Além de dar suporte a transações ACID, o delta gera alguns arquivos JSON que servem como uma forma de auditar e manter o histórico de cada transação, desde comandos DDL e DML Com este mecanismo é possível até voltar em um estado específico da tabela caso necessário. Para cada transação executada um arquivo JSON é criado dentro da pasta _delta_log. O arquivo inicial sempre será 000000000.json, contendo os commits da transação. Neste nosso cenário, este primeiro arquivo contém os commits da criação da tabela orders. Para visualizar melhor, acesse a pasta local que provavelmente foi criada na raiz do seu projeto chamada spark-warehouse. Esta pasta foi criada pelo Hive para alocar os recursos criados desde os arquivos JSON e os parquets. Dentro dela terá uma estrutura de pastas conforme imagem abaixo: Perceba que os arquivos são criados em ordem crescente a partir de cada transação executada. Acesse cada arquivo JSON e verá cada transação que foi executado através do campo operation, além de outras informações. 00000000000000000000.json "operation":"CREATE OR REPLACE TABLE" 00000000000000000001.json "operation":"MERGE" 00000000000000000002.json "operation":"UPDATE" 00000000000000000003.json "operation":"DELETE" Perceba também que os arquivos parquets foram gerados particionados em pastas pelo campo date_order. Utilizar partição visa consultas com melhores desempenho, mas não entraremos em detalhes neste post, futuramente falaremos mais sobre isso. Espero que neste post foi possível esclarecer algumas dúvidas sobre o que é o Delta Lake e seu funcionamento. Até a próxima!

  • Usando Comparator.comparing para ordenar Java Stream

    Introdução A ordenação de dados é uma tarefa comum em muitos projetos de desenvolvimento de software. Quando trabalhamos com coleções de objetos em Java, uma abordagem poderosa e flexível para realizar a ordenação é o uso da interface Comparator.comparing em conjunto com Streams. Neste post, vamos mostrar que usando Comparator.comparing para ordenar Java Stream pode tornar a ordenação elegante e eficiente. O que é a interface Comparator.comparing? A interface Comparator.comparing é uma funcionalidade introduzida no Java 8 como parte do pacote java.util.Comparator. Ela fornece um método estático chamado comparing, que permite especificar uma função de chave (chave de ordenação) para comparar objetos. Essa função é usada para extrair um valor de um objeto e compará-lo com base nesse valor durante a ordenação. Flexibilidade na ordenação com Comparator.comparing Uma das principais vantagens da interface Comparator.comparing é sua flexibilidade. Com ela, podemos realizar ordenações em diferentes campos de um objeto, permitindo a criação de lógicas de ordenação complexas de forma simples e concisa. Perceba no código abaixo que simplemente no método sorted(), passamos como argumento a interface Comparator.comparing que por sua vez, passou o campo city como argumento utilizando method reference (People::getCity) executando a ordenação por este campo. Saída da execução Monica John Mary Anthony Seth Ordenação de múltiplos critérios Muitas vezes, é necessário realizar ordenações com base em múltiplos critérios. Isso é facilmente alcançado com a interface Comparator.comparing. Basta encadear vários métodos comparing, cada um especificando um critério diferente. O Java se encarregará de realizar a ordenação de acordo com a sequência especificada. Por exemplo, podemos ordenar a mesma lista por city e, em seguida, por name: Comparator.comparing(People::getCity).thenComparing(People::getName). Ordenação ascendente e descendente Outra vantagem importante da interface Comparator.comparing é a capacidade de realizar ordenações tanto em ordem ascendente quanto descendente. Para isso, basta encadear o método reversed() como no código abaixo: Saída da execução Seth Mary John Anthony Monica Eficiência e simplicidade Ao usar a interface Comparator.comparing em conjunto com Streams, a ordenação se torna mais eficiente e elegante. A combinação desses recursos permite escrever código limpo, de fácil leitura e manutenção. Além disso, o Java otimiza internamente a ordenação usando algoritmos eficientes, resultando em um desempenho satisfatório mesmo para grandes conjuntos de dados. Conclusão final A interface Comparator.comparing é uma ferramenta poderosa para realizar a ordenação de Streams em Java. Sua flexibilidade, capacidade de ordenação ascendente e descendente, suporte para múltiplos critérios e eficiência na execução tornam-na uma opção valiosa para qualquer desenvolvedor Java. Ao aproveitar essa interface, podemos obter um código mais conciso, menos verboso e eficiente, facilitando a manipulação de objetos em uma Stream. Espero que tenha curtido!

  • Aplicando Change Data Feed para auditoria em tabelas Delta

    O que é o Change Data Feed? Change Data Feed é uma feature do Delta Lake a partir da versão 2.0.0 que permite rastrear a níveis de linhas em tabelas Delta, mudanças como operações de DML (Merge, Delete ou Update), versões do dados e o timestamp de quando aconteceu a mudança. O processo mapeia operações de Merge, Delete e Update mantendo o histórico de alterações a nível de linha, ou seja, cada evento efetuado em um registro, o Delta através do Change Data Feed consegue registrar como uma espécie de auditoria. É claro que é possível utilizar para diferentes casos de uso, é extenso as possibilidades. Como funciona na prática Iremos simular uma pipe aplicando Change Data Feed para tabelas Delta, pois não é um recurso default. Após a criação da tabela Delta, iremos executar algumas operações visando explorar mais sobre o poder do Change Data Feed. Iremos trabalhar com seguinte Dataset: Criando a Sessão Spark e configurando alguns parâmetros do Delta A partir de agora, criaremos o código em partes para um fácil entendimento. O código abaixo estamos criando o método responsável por manter a sessão do Spark e configurando alguns parâmetros para o funcionamento do Delta. Carregando o Dataset Vamos carregar o Dataset e criar uma view temporário para ser usada na nossa pipeline mais adiante. Criando a tabela Delta Agora faremos a criação da tabela Delta já configurando Change Data Feed nas propriedades da tabela e toda a parte de metadata será baseada no Dataset anteriormente apresentado. Perceba que estamos usando na propriedade o seguinte parâmetro delta.enableChangeDataFeed = true para a ativação do Change Data Feed. Executando Merge dos dados Agora executaremos uma simples operação Merge para que o Change Data Feed possa registrar como mudança na nossa tabela. Veja que o Merge utiliza na nossa view global_temp.raw_product anteriormente criada para o upsert dos dados. Auditando a tabela Agora que o Merge foi executado, vamos executar uma leitura na nossa tabela para entender o que aconteceu e como o Change Data Feed atuou. Perceba que estamos passando os seguinte parâmetros: 1. readChangeFeed em que é necessário para o uso do Change Data Feed. 2. startingVersion é o parâmetro responsável por restringir a partir de qual versão inicial queremos que seja apresentada. Resultado após execução: Veja que além da colunas definidas na criação da tabela, temos 3 novas colunas gerenciadas pelo Change Data Feed. 1. _change_type: Coluna contendo valores de acordo com cada operação efetuada como insert, update_preimage , update_postimage, delete 2. _commit_version: Versão da mudança 3. _commit_timestamp: Timestamp representando a data da mudança No resultado acima, o resultado do upsert foi um simples insert, por não conter todas as condições possíveis de um update. Deletando um registro Nessa etapa faremos um simples delete em um registro da tabela, apenas para validarmos como o Change Data Feed irá se comportar. Auditando a tabela (novamente) Perceba abaixo que após a deleção do registro 6, agora temos um novo registro criado como delete na tabela e seu incremento de versão para 2. Outro ponto é que o registro original foi mantido, porém com a versão antiga. Atualizando um registro Agora como último teste, iremos atualizar um registro para entender novamente o comportamento do Change Data Feed. Auditando a tabela (última vez) Agora como último teste, executamos um simples update em um registro para entender como será o comportamento. Perceba que 2 novos valores foram adicionados/atualizados na coluna _change_type. O valor update_postimage é o valor após o update executado e dessa vez, para o registro antigo, foi mantido a mesma versão do novo na coluna _commit_version, pois este mesmo registro foi atualizado de acordo com coluna _change_type para update_preimage, ou seja, valor antes da mudança. Conclusão O Change Data Feed é um ótimo recurso para entender o comportamento da sua pipeline de dados e também uma forma de auditar registros visando um melhor entendimento das operações ali executadas. Segundo o próprio time do Delta, é um recurso que mantido não gera nenhum overhead significativo, ou seja, talvez seja uma feature que pode ser adotada de forma integral em sua estratégia de dados pois possui vários benefícios como foi mostrado neste post. Repositório GitHub Espero que tenha curtido!

  • Entendendo Record Class do Java em 2 minutos

    Introdução Liberado no Java 14 como preview, mais especificamente na JEP 395, Record Class é uma alternativa para se trabalhar com Classes em Java. Record Class foi uma sacada bem interessante desenhado para eliminar a verbosidade de quando se precisa criar uma classe e seus componentes, como por exemplo: Construtores canônicos Métodos de acesso público Implementar os métodos equals e hashCode Implementar o método toString Usando Record Classes não é mais necessário declarar os itens acima, ajudando o desenvolvedor a ser mais focado em outras frentes. Vamos entender melhor na prática. Vamos criar uma classe Java chamada User e adicionar alguns campos e métodos. Perceba que para uma simples classe com 4 campos, criamos um construtor, métodos de acesso público, implementamos os métodos equals e hashCode e por fim, o método toString. Funciona bem, porém poderíamos evitar a complexidade e criar um código menos verboso. Nesse caso, podemos utilizar Record Classes no lugar da classe User acima. User Record Class É notável a diferença entre Record e uma Classe Java tradicional. Perceba que não foi necessário declarar os campos, criar os métodos de acessos ou implementar nenhum outro método. Em uma Record Class quando criada, implicitamente os métodos de acesso público são criados, as implementações dos métodos equals, hashCode e toString também são criados de forma automática sendo não necessário implementa-los de forma explícita. E por fim, os campos de referências ou componentes, são criados como private final com o mesmos nomes. Saída Desvantagens Record Class se comporta como uma classe Java comum, mas a diferença é que não se pode trabalhar com herança, ou seja, não se pode estender (extends) outra classe, somente implementar uma ou mais interfaces. Outro ponto é que não é possível criar variáveis de instâncias não estáticas (static); Conclusão final Record Classes é uma ótima abordagem para quem procura um código menos verboso ou que precisa de agilidade em implementar modelos. Apesar da limitação de não poder estender outras Record Classes, é um limitador que não prejudica o seu uso em geral. Espero que tenha curtido!

  • Entendendo Java Reflection em 2 minutos

    Introdução A Java Reflection é uma poderosa API que permite a um programa Java examinar e manipular informações sobre suas próprias classes em tempo de execução. Com a Reflection, é possível obter informações sobre campos, métodos e construtores de uma classe, além de acessar e modificar esses elementos mesmo que eles sejam privados. Neste post vamos escrever alguns códigos Java explorando algumas das facilidades de se usar Reflection e quando aplicar em seus projetos. Classe Bank Criaremos uma simples classe chamada Bank, onde iremos criar alguns campos, métodos e construtores para que sejam explorados pelo uso do Reflection. Acessando os campos da classe Bank Com a classe Bank criada, vamos explorar via Reflection a listagem de todos campos da classe através do método getDeclaredFields da classe Class. Perceba que através do método estático Class.forName, passamos como parâmetro uma string com o nome da classe em que desejamos explorar via Reflection. Saída Field name: code Field type: class java.lang.Integer ************ Field name: nameOfBank Field type: class java.lang.String ************ Field name: amountOfDepositedMoney Field type: class java.lang.Double ************ Field name: totalOfCustomers Field type: class java.lang.Integer ************ Acessando os métodos da classe Bank Através do método getDeclaredMethods, podemos recuperar todos os métodos da classe Bank. Saída Method name: doDeposit Method type: class java.lang.String ************ Method name: doWithDraw Method type: class java.lang.String ************ Method name: getReceipt Method type: class java.lang.String ************ Criando objetos Com o uso do Reflection para criar objetos, é necessário cria-los através de um construtor. Neste caso, devemos primeiro invocar um construtor para que seja criado o objeto. O detalhe é que para recuperar este construtor, devemos nos atentar aos tipos dos parâmetros que compõem o construtor e a ordem que eles são declarados. Isso torna flexível recuperar diferentes construtores com quantidade de parâmetros e tipos diferentes. Perceba abaixo que foi necessário criar um array do tipo Class atribuindo diferentes tipos de acordo com a composição do construtor que iremos usar para criar o nosso objeto. Neste cenário, será necessário invocar o método class.getConstructor(argType) passando o array criado anteriormente como argumento. Dessa forma, teremos um objeto construtor que será usado na criação do nosso objeto. Por fim, criamos um novo array do tipo Object atribuindo os valores que irão compor o nosso objeto obedecendo a ordem definida no construtor e em seguida, basta invocar o método constructor.newInstance(argumentsValue) passando o array como parâmetro retornando o objeto em que desejamos criar. Saída Bank{code=1, nameOfBank='Bank of America', amountOfDepositedMoney=1.5, totalOfCustomers=2500} Invocando métodos Para invocar um método através de Reflection é bem simples como é mostrado no código abaixo. Perceba que é necessário passar como parâmetro no método cls.getMethod("doDeposit", argumentsType) o nome explícito do método, no caso o "doDeposit" e no segundo parâmetro, um array representando o tipo do dado usado no parâmetro do método doDeposit(double amount), no caso um parâmetro do tipo double. Por fim, invocar o método method.invoke passando como primeiro parâmetro o objeto referenciando a classe, no caso um objeto do tipo Bank. E como segundo parâmetro, o próprio valor do parâmetro que será executado no método. Saída 145.85 of money has been deposited Conclusão O uso de Reflection é uma boa estratégia quando necessita-se de flexibilidade em explorar diferentes classes e seus métodos sem a necessidade de instanciar objetos. Normalmente usa-se Reflection em componentes específicos de uma arquitetura mas não impede de ser usado em cenários variados. Pelo exemplos mostrados acima, percebe-se cenários infinitos de sua aplicação e as vantagens do seu uso. Espero que tenha curtido!

  • Airflow para Iniciantes: Entenda Apache Airflow da maneira mais simples

    Introdução Airflow tem sido uma das principais ferramentas de orquestração do mercado e muito falada no mundo Modern Data Stack, por ser uma ferramenta capaz de orquestrar workloads de dados através de ETLs ou ELTs. Mas na verdade, o Airflow não se resume somente a isso, ele pode ser aplicado em diversos casos de usos do dia a dia de um Engenheiro de Dados ou Software. Neste Tutorial sobre Airflow para iniciantes, iremos apresentar o Airflow da maneira mais simples, sem a necessidade de saber ou criar ETLs. Mas o que é o Airflow de fato? O Apache Airflow é uma plataforma de orquestração de fluxo de trabalho amplamente utilizada para agendar, monitorar e gerenciar pipelines de dados. Ele possui vários componentes que trabalham juntos para fornecer suas funcionalidades. Componentes do Airflow DAG O DAG (Directed Acyclic Graph) é o principal componente e a representação de fluxo de trabalho no Airflow. É composto por tarefas (tasks) e dependências entre elas. As tarefas são definidas como operadores (operators), como PythonOperator, BashOperator, SQLOperator e entre outros. O DAG define a ordem de execução das tarefas e as relações de dependências. Webserver O componente do Webserver fornece uma interface da web para interagir com o Airflow. Ele permite que você visualize, gerencie e monitore seus fluxos de trabalho, tarefas, DAGs e logs. O Webserver também permite a autenticação de usuários e o controle de acesso com base em funções. Scheduler O Scheduler é responsável por agendar a execução das tarefas de acordo com a definição do DAG. Ele verifica periodicamente se há tarefas pendentes para execução e atribui recursos disponíveis para a execução das tarefas no momento apropriado. O Scheduler também lida com a recuperação de falhas e o agendamento de retries de tarefas. Executor O Executor é responsável por executar as tarefas definidas nos DAGs. Existem diferentes tipos de executores disponíveis no Airflow, como LocalExecutor, CeleryExecutor, KubernetesExecutor e etc. Cada executor tem suas próprias configurações e comportamentos de execução. Metadatabase O Metadatabase é um banco de dados onde o Airflow armazena metadados sobre as tarefas, DAGs, execuções, agendamentos, entre outros. Ele é usado para rastrear o status das tarefas, registrar o histórico de execução e fornecer informações para o monitoramento e a visualização do fluxo de trabalho. É possível utilizar diversos outros bancos de dados para registrar o histórico como MySQL, Postgres e dentre outros. Workers Os Workers são os nós de execução em um ambiente distribuído. Eles recebem tarefas atribuídas pelo Scheduler e as executam. Os Workers podem ser dimensionados horizontalmente para lidar com pipelines de dados maiores ou para distribuir a carga de trabalho para vários recursos. Plugins Os Plugins são extensões do Airflow que permitem adicionar novos recursos e funcionalidades ao sistema. Eles podem incluir novos operadores, hooks, sensores, conexões com sistemas externos e entre outros. Os Plugins fornecem uma maneira de personalizar e estender as capacidades do Airflow para atender às necessidades específicas de um fluxo de trabalho. Operadores Operadores são basicamente a composição de um DAG. Entenda um operador como um bloco de código com responsabilidade própria. Pelo Airflow ser um orquestrador e executar um fluxo de trabalho, podemos ter diferentes tarefas a serem executadas como por exemplo, acessar uma API, enviar um e-mail, acessar uma tabela em um banco de dados e efetuar uma operação, executar um código Python ou até um comando Bash. Para cada uma das tarefas acima, devemos usar um operador. A seguir, iremos abordar alguns dos principais operadores: BashOperator O BashOperator permite executar comandos Bash ou scripts diretamente no sistema operacional onde o Airflow está sendo executado. É útil para tarefas que envolvem a execução de scripts de shell, utilitários ou qualquer ação que possa ser realizada no terminal. Resumindo, quando precisamos abrir o terminal do nosso sistema e executar algum comando para manipular arquivos ou algo relacionado ao próprio sistema, porém dentro de um DAG, é este o operador a ser usado. PythonOperator O PythonOperator permite que você execute funções Python como tarefas no Airflow. Você pode escrever suas próprias funções Python personalizadas e usar o PythonOperator para chamar essas funções como parte do fluxo de trabalho. DummyOperator O DummyOperator é uma tarefa "falsa" que não realiza nenhuma ação. É útil para criar dependências e fluxos de trabalho complexos sem a necessidade de executar nenhuma ação real. Sensor Os Sensores são usados para aguardar a ocorrência de algum evento externo antes de continuar o fluxo de trabalho, pode funcionar como um listener (ouvinte). Por exemplo, o HttpSensor que é um tipo de Sensor, pode validar se uma API externa esteja ativa, caso esteja, o fluxo continua sendo executado. Não necessariamente é um operador HTTP que deveria retornar algo, mas somente um tipo de ouvinte. HttpOperator Diferente de um Sensor, o HttpOperator é usado para realizar solicitações HTTP, como GET, POST, PUT, DELETE e etc. Neste caso, permite interagir de forma mais completa com APIs internas ou externas. SqlOperator SqlOperator é o operador responsável por executar operações de DML e DDL em um banco de dados, ou seja, desde manipulações de dados como SELECTS, INSERTS, UPDATES até criação de tabelas, alteração e etc. Executores Os executores são responsáveis por executar as tarefas definidas em um fluxo de trabalho (DAG). Eles gerenciam a alocação e a execução das tarefas em tempo de execução, garantindo que cada tarefa seja executada de forma eficiente e confiável. O Airflow oferece diferentes tipos de executores, cada um com características e funcionalidades distintas, permitindo que você escolha o mais adequado para suas necessidades específicas. A seguir, abordaremos alguns dos principais executores: LocalExecutor O LocalExecutor é o executor padrão no Apache Airflow. Ele é projetado para ser usado em ambientes de desenvolvimento e teste onde a escalabilidade não é uma preocupação. O LocalExecutor executa as tarefas em threads separados no mesmo processo do Airflow. Essa abordagem é simples e eficiente para pipelines menores ou para execuções em um único nó. CeleryExecutor Se você precisa de um executor para ambientes distribuídos e com alta escala, o CeleryExecutor é uma excelente opção. Ele utiliza o Celery, uma biblioteca de tarefas em fila, para distribuir as tarefas em nós de execução separados. Essa abordagem torna o Airflow adequado para executar pipelines em clusters de servidores, permitindo dimensionar horizontalmente conforme a demanda. KubernetesExecutor Para ambientes que utilizam Kubernetes como plataforma de orquestração de containers, o KubernetesExecutor é uma escolha natural. Ele aproveita a capacidade de orquestração do Kubernetes para executar tarefas em pods separados, o que pode resultar em melhor isolamento de recursos e mais facilidade na execução de tarefas em containers. DaskExecutor Se o seu fluxo de trabalho requer processamento paralelo e distribuído, o DaskExecutor pode ser a opção certa. Ele utiliza a biblioteca Dask para realizar a computação paralela em um cluster de recursos. Essa abordagem é ideal para tarefas que podem ser divididas em sub-tasks independentes, permitindo maior aproveitamento dos recursos disponíveis. Linguagem de programação Airflow suporta unicamente como linguem de programação, o Python. Para ser sincero, não é um limitador para quem não conhece bem a linguagem. Na prática, o processo de criar DAGs é padrão, o que pode mudar de acordo com as suas necessidades, será lidar com diferentes tipos de operadores, podendo ou não usar Python. Mãos na massa Montando o ambiente Para este tutorial iremos usar o Docker que nos ajudará a provisionar o nosso ambiente sem a necessidade de instalar o Airflow. Caso não tenha o Docker instalado, recomendo seguir as recomendações deste link e após instalado, volte para seguirmos o tutorial. Baixando o projeto Para facilitar, clone o projeto do seguinte repositório e siga os passos para fazer o deploy do Airflow. Passos para o deploy Com o docker instalado e após ter baixado o projeto conforme o item anterior, acesse o diretório onde se encontra o projeto e abra o terminal, execute o seguinte comando docker: docker-compose up O comando acima irá iniciar os containers do docker onde estão presentes os serviços do próprio Airflow, postgres e entre outras que iremos utilizar. Se tiver curiosidade de como estes serviços estão mapeados, abra o arquivo docker-compose.yaml do projeto e lá você terá mais detalhes. Enfim, após a execução do comando acima e os containers já iniciados, acesse via browser o seguinte endereço http://localhost:8080/ Uma tela como abaixo será aberta, basta digitar airflow para o usuário e senha e acesse a UI do Airflow. Criando a DAG Criando um simples Hello World Para este exemplo, iremos criar uma simples DAG onde irá printar o clássico "Hello World". No projeto que você baixou, acesse a pasta /dags e crie o seguinte arquivo python chamado hello_world.py . O Código acima é um simples exemplo de uma DAG escrita em Python. Percebe-se que começamos importando algumas funções, incluindo a própria DAG, função relacionada a data e o operador Python. Em seguida, criamos uma função Python que irá printar no console "Hello World" chamada print_hello. Esta função será chamada pela DAG mais a frente. A declaração de uma DAG inicia-se com with DAG(..) passando alguns argumentos como: dag_id: Identificador da DAG no contexto do Airflow start_date: A data definida é apenas um ponto de referência e não necessariamente a data do início de execução e nem da criação da DAG. Normalmente as execuções são feitas a uma data posterior a definida neste parâmetro, e tem uma importância quando precisamos calcular execuções entre o início e o que foi definido no parâmetro schedule_interval. schedule_interval: Neste parâmetro definimos a periodicidade em que a DAG será executada. É possível definir diferentes formas de execuções através de expressões CRON ou através de Strings já definidas como @daily, @hourly, @once, @weekly e etc. No caso do exemplo, o fluxo será executado apenas uma vez. catchup: Este parâmetro controla execuções retroativas, ou seja, caso esteja definido como True, o Airflow irá executar o período retroativo a partir da data definida no start_date até a data atual. No exemplo anterior definimos como False por não ter a necessidade da execução retroativa. Após o preenchimento dos argumentos, criamos a task hello_task dentro da própria DAG através do operador PythonOperator no qual provê formas de executar funções python dentro de uma DAG. Perceba que declaramos um identificador através do task_id e no argumento python_callable no qual é nativa do operador PythonOperator, passamos a função python print_hello criada anteriormente. Por último, invoque a task hello_task. Dessa forma, a DAG entenderá que esta será a task a ser executada. Caso você já tenha feito o deploy, a DAG aparecerá no Airflow em pouco tempo para ser executada conforme na imagem abaixo: Após a DAG criada, ative-a e a execute clicando em Trigger DAG conforme a imagem acima. Clique na task hello_operator (centro) e em seguida uma janela será aberta conforme imagem abaixo: Clique no botão Log para ver mais detalhes da execução: Perceba como é simples a criação de uma DAG, basta pensar nas diferentes possibilidades e cenários de aplicabilidade. A seguir, faremos mais exemplos um pouco mais complexos explorando várias outros cenários. Conclusão Com base no simples exemplo mostrado, Airflow apresentou uma abordagem flexível e simples para o controle de fluxos automatizados, desde a criação de DAGs até a navegação do seu componente web. Como citei no início, a sua utilização não se limita somente na orquestração de ETLs como é feito no mercado em geral, mas também na possibilidade do seu uso em tarefas que requer qualquer necessidade em controlar fluxos que possuem dependências entre seus componentes dentro de um contexto escalável ou não. Repositório GitHub Espero que tenha curtido!

  • Criando códigos Java assíncronos com Future

    Introdução Java Future é uma das várias formas de se trabalhar com a linguagem de forma assíncrona provendo um contexto multi-Thread em que é possível executar tarefas em paralelo sem gerar um bloqueio no processo. No exemplo abaixo faremos uma simulação de envio de e-mail fictício em que mesmo durante o envio, o processo não será bloqueado, ou seja, não será necessário esperar o termino do envio para que as demais funcionalidades ou mecanismos voltem a operar. Classe EmailService Entendendo a classe EmailService A classe acima representa de forma fictícia o envio de e-mails em massa, a ideia de se usar o loop simulando o envio é justamente para atrasar o processo em si. Por fim, ao final do envio, o método sendEmailBatch(int numberOfEmailsToBeSent) retorna uma String contendo uma mensagem referente ao fim do processo. Classe EmailServiceAsync Entendendo a classe EmailServiceAsync A classe EmailServiceAsync representa o mecanismo assíncrono em si, nela temos o método sendEmailBatchAsync(int numberOfEmailsToBeSent) no qual será o responsável por tornar o processo de envio de e-mail fictício assíncrono. O processo assíncrono é gerenciado pelo uso do ExecutorService no qual facilita o gerenciamento de tarefas de forma assíncrona nas quais são atribuídas a um pool de threads. No caso, a chamada ao método sendEmailBatch(int numberOfEmailsToBeSent) se resume a uma tarefa (task) no qual será atribuída a uma Thread definida em Executors.newFixedThreadPool(1). Por fim, o método retorna uma Future que é literalmente uma promessa de que aquela tarefa em alguma hora será finalizada, representando um processo assíncrono. Classe EmailServiceAsyncRun Entendendo a classe EmailServiceAsyncRun É nesta classe onde iremos testar o processo assíncrono usando Future. Vamos recapitular, na classe EmailService, criamos um método chamado sendEmailBatch(int numberOfEmailsToBeSent) no qual estamos simulando através do for o envio de e-mail fictício e printando uma mensagem de envio que usaremos para testar a concorrência. Na classe EmailServiceAsync, o método sendEmailBatchAsync(int numberOfEmailsToBeSent) cria um ExecutorService que fará o gerenciamento das tasks juntamente com o pool de threads, que neste caso, estamos criando só uma Thread definido em Executors.newFixedThreadPool(1) e retornará uma Future. Agora na classe EmailServiceAsyncRun, é onde de fato testamos o processo, vamos entender por partes: Instanciamos um objeto do tipo EmailServiceAsync Criamos um objeto do tipo Future e atribuímos ao retorno do método emailAsync.sendEmailBatchAsync(500) . A ideia do argumento 500 é apenas para controlar a iteração do For, atrasando o processo para ser finalizado. Até poderíamos usar Thread.sleep() como alternativa e definir um tempo de delay que também funcionaria bem. Perceba que estamos utilizando para controlar o controle de iteração while, o método futureReturn.isDone(), ou seja, este método permite que o processo não seja bloqueado enquanto o fluxo de e-mail seja executado. Neste caso, qualquer processo em que deseja implementar para concorrer enquanto o envio é feito, pode ser criado dentro do while, como por exemplo, um fluxo de atualização de tabelas de clientes ou qualquer outro processo. Na linha 20, através do método futureReturn.get(), estamos imprimindo o resultado do envio dos e-mails. E por fim, finalizamos o executorService e suas tasks através do método executorService.shutdown(). Executando o processo Perceba claramente que existem dois processos distintos sendo executados, o processo de envio de email "Sending email Nº 498.." e o processo de atualização de uma tabela de cliente. Trabalhando com processos blocantes O uso do Future é bastante utilizado para casos de uso onde precisamos bloquear um processo, ou seja, a Thread atual será bloqueada até que o processo sendo executado por Future termine. Para isso, basta invocar diretamente o método futureReturn.get() sem usar qualquer controle de iteração como foi usado no exemplo anterior. Um ponto importante é que este tipo de abordagem pode fazer com que recursos sejam desperdiçados devido ao bloqueio da Thread atual. Conclusão O uso de Future é bem promissor quando precisamos adicionar processos assíncronos em nosso código da maneira mais simples ou até mesmo utilizar para bloqueio de processos. É uma API enxuta com uma certa limitação de recursos mas que funciona bem para alguns cenários. Espero que tenha curtido!

  • Acessando APIs e extraindo dados com Airflow

    Introdução Airflow provê diferentes formas de se trabalhar com fluxos automatizados e uma das formas é a possibilidade de acesso a APIs externas utilizando operadores HTTP e extraindo os dados necessários. Mãos na massa Neste tutorial iremos criar uma DAG na qual acessará uma API externa e fará a extração dos dados diretamente para um arquivo local. Se for sua primeira vez utilizando Airflow, recomendo acessar este link para entender mais sobre o Airflow e como montar um ambiente. Criando a DAG Para este tutorial, criaremos uma DAG que será trigada a cada 1 hora (schedule_interval="0 * * * *") e acessará uma API externa extraindo alguns dados diretamente para um arquivo JSON local. Neste cenário usaremos o operador SimpleHttpOperator onde provê uma API com capacidade de executar requisições para APIs externas. Perceba que utilizamos dois operadores dentro de uma mesma DAG. O operador SimpleHttpOperator que provê formas de acesso a APIs externas que através do campo method definimos métodos HTTPs (GET, POST, PUT, DELETE). O campo endpoint permite especificar o endpoint da API, que no caso é products e por fim, o parâmetro http_conn_id, onde é necessário passar o identificador da conexão que será definida a seguir através da UI do Airflow. Conforme imagem abaixo, acesse o menu Admin > Connections Preencha os dados conforme imagem abaixo e salve em seguida. Sobre o operador PythonOperator, estamos utilizando apenas para executar uma função Python chamada _write_response utilizando XComs onde através da task_id da task write_response, é possível recuperar o resultado do response e utilizar em qualquer parte do código. Neste cenário estamos usando o resultado recuperado da API para escrever no arquivo. XCom é um mecanismo de comunicação entre diferente tasks que torna o Airflow bastante flexível. Muitas das vezes as tasks podem ser executadas em diferentes máquinas e com o uso de XComs, possibilita a comunicação e a troca de informações entre Tasks. Por fim, definimos a execução das tasks e suas dependências, veja que usamos o operador >>, que é basicamente definir a ordem da execução entre as tasks. No nosso caso, o acesso a API e extração deve ser executado antes da escrita no arquivo extract_data >> write_response. Após a execução da DAG, é possível acessar o arquivo que foi gerado com o resultado da extração, basta acessar via terminal um dos workers que no caso vamos ter somente um. Execute o seguinte comando abaixo para listar os containers: docker ps Uma listagem similar a esta abaixo será mostrada. Perceba que uma das linhas na coluna NAMES refere-se ao worker, no caso coffee_and_tips_airflow-worker_1. Continuando no terminal, digite o seguinte comando para ter acesso ao diretório do Airflow onde arquivo extract_data.json se encontra. docker exec -it coffee_and_tips_airflow-worker_1 /bin/bash Pronto, agora é só abrir o arquivo e conferir o conteúdo. Conclusão Mais uma vez vimos o poder do Airflow para processos automatizados que requer acessos e integrações de APIs externas de forma fácil e com poucas linhas de código. Neste exemplo exploramos o uso de XComs que visa flexibilizar a troca de mensagens entre tasks que podem ser executadas em diferentes máquinas em um ambiente distribuído. Espero que tenha curtido!

  • Guia rápido sobre Apache Kafka: O poder da arquitetura Event-Driven

    Introdução No mundo orientado por dados de hoje, a capacidade de processar e analisar eficientemente grandes volumes de dados em tempo real tornou-se um fator determinante para empresas e organizações de todos os tamanhos. Desde plataformas de comércio eletrônico e redes sociais até instituições financeiras e dispositivos IoT, a demanda por lidar com fluxos de dados em escala está em constante crescimento. É aí que o Apache Kafka entra como uma ferramenta fundamental no mundo da arquitetura orientada por eventos. Imagine uma tecnologia que pode conectar, processar e entregar dados de forma contínua entre inúmeros sistemas e aplicativos em tempo real. O Apache Kafka, frequentemente chamado de plataforma de streaming distribuído, é exatamente essa tecnologia. É o herói desconhecido nos bastidores, permitindo o fluxo de dados em tempo real e fornecendo a base para uma infinidade de aplicativos modernos orientados por dados. Neste guia rápido sobre Apache Kafka, vamos mostrar os principais componentes da arquitetura que compõem o Apache Kafka, desvendando seus conceitos essenciais, arquitetura, casos de uso e por último, um tutorial prático onde você será capaz de entender a dinâmica e funcionamento de como um produtor e consumidor de eventos se comunicam através de um tópico. Portanto, embarquemos nessa aventura e descubramos como o Apache Kafka está revolucionando a forma como lidamos com dados no século 21. Conceitos chave do Kafka 1. Tópicos O que são Tópicos no Kafka? No Kafka, um tópico é um canal ou categoria lógica para dados. Ele atua como um canal onde são mantidos os registros enviados, permitindo que os produtores escrevam dados em tópicos específicos e que os consumidores leiam deles. Pense nos tópicos como uma maneira de categorizar e separar os fluxos de dados. Por exemplo, em uma plataforma de comércio eletrônico, você pode ter tópicos como "Atualizações de Pedidos", "Alterações de Estoque" e "Feedback do Cliente", cada um dedicado a um tipo específico de dados. Particionamento dos Tópicos Uma das características poderosas dos tópicos do Kafka é o particionamento. Quando um tópico é dividido em partições, isso aprimora a capacidade do Kafka de lidar com grandes volumes de dados e distribuir a carga entre vários corretores. As partições são as unidades de paralelismo no Kafka e fornecem tolerância a falhas, escalabilidade e capacidades de processamento paralelo. Cada partição é ordenada e imutável, e os registros dentro de uma partição são atribuídos a um deslocamento único, que é um identificador numérico que representa a posição de um registro dentro da partição. Esse deslocamento é usado pelos consumidores para controlar os dados consumidos, permitindo que eles retomem de onde pararam em caso de falha ou ao processar dados em tempo real. Organização de Dados Tópicos possibilitam uma organização dos dados. Conforme mostrado nas imagens acima, os tópicos funcionam como uma camada de armazenamento dentro do contexto do Kafka onde todos os dados que são enviados pelo produtores são organizados em tópicos e partições. Modelo de Publicação e Assinatura O modelo Publicação e Assinatura ou pub/sub é um modelo no qual os produtores publicam dados em um tópico e os consumidores se inscrevem nos tópicos de interesse para receber os dados. Uma analogia do mundo real é quando nos cadastramos para receber newsletter de um determinado site, ou seja, o editor daquele site publica notícias e nós como consumidores, recebemos tudo que for publicado. Escalabilidade Os tópicos podem ser divididos em partições, permitindo ao Kafka distribuir dados em vários brokers possibilitando escalabilidade e processamento paralelo. Retenção dos dados Cada tópico pode ter sua própria política de retenção de dados, definindo por quanto tempo os dados permanecem no tópico. Regras como estas possibilitam administrar melhor o volume de dados armazenados nos tópicos podendo ou não liberar espaço em disco. 2. Produtores No Kafka, um produtor é um componente crucial responsável por enviar dados aos tópicos do Kafka. Pense nos produtores como os originadores de informações - aplicativos ou sistemas que geram e publicam registros em tópicos específicos dentro do cluster do Kafka. Esses registros podem representar desde eventos de usuário em um site até logs do sistema ou transações financeiras. Os produtores são a fonte da verdade para dados no Kafka. Eles geram registros e os enviam para tópicos designados para processamento posterior. Eles também decidem para qual tópico um registro deve ser enviado, com base na natureza dos dados. Isso garante que os dados sejam categorizados adequadamente dentro do ecossistema do Kafka. Tipo do dado Geralmente utiliza-se JSON como formato de dado para o envio, dessa forma facilita a transferência eficiente e o armazenamento de dados. Confirmação de entrega Os produtores podem lidar com confirmações do broker do Kafka, garantindo que os dados sejam recebidos e persistidos com sucesso. Esse mecanismo de confirmação contribui para a confiabilidade dos dados e que estão sendo entregues ao tópico sem perda dos dados. Envio para partições específicas Conforme falado mais cedo, produtores enviam dados para tópicos, porém é possível configurar o envio para uma partição específica dentro de um tópico. 3. Consumidores Consumidores são componentes importantes dentro do contexto Kafka, são responsáveis por consumir e fornecer os dados de origem. Tecnicamente, consumidores se inscrevem em tópicos do Kafka e qualquer informação ali produzida são recebidas pelos consumidores, representando a abordagem pub/sub. Inscrição em Tópicos Como citado anteriormente, consumidores são um dos responsáveis por fornecer dados direto da origem. Consumidores possuem a flexibilidade de se inscrever em qualquer tópico de acordo com seus interesses possibilitando receber dados relevantes para o negócio. Processamento dos dados Consumidores sempre vão receber novos dados vindo dos tópicos, ou seja, cada consumidor é responsável processar estes dados de acordo suas necessidades. Pense em um microserviço que funciona como um consumidor, este pode consumir dados de um tópico responsável por armazenar logs de aplicações e executar qualquer tratamento antes de entrega-lo ao usuário ou para outras aplicações terceiras. Integração entre aplicações Como citado no item anterior, Kafka possibilita aplicações de integrarem seus serviços facilmente através de tópicos e consumidores variados. Um dos casos de usos mais comuns é a integração entre aplicações. Antigamente, aplicações precisavam conectar em diferentes bancos de dados para acessar dados de outras aplicações, isso criava vulnerabilidades e feria princípios de responsabilidades entre as aplicações. Soluções como Kafka possibilita integrar diferentes serviços usando o padrão pub/sub onde diferentes consumidores representados por aplicações podem acessar os mesmos tópicos e processar estes dados em tempo real sem a necessidade de acessar bancos de dados de terceiros ou qualquer outra fonte de dado evitando qualquer risco de segurança e adicionado agilidade ao processo de entrega dos dados. 4. Brokers Brokers são peças fundamentais na arquitetura do Kafka, são eles os responsáveis por mediar e gerenciar a troca de mensagens entre produtores e consumidores, ou seja, Brokers gerenciam o armazenamento dos dados produzidos pelos produtores e garantem uma transmissão confiável dos dados dentro de um cluster Kafka. Na prática, Brokers possuem um papel transparente dentro de um cluster Kafka, mas a seguir irei pontuar algumas de suas responsabilidades que fazem toda a diferença para o funcionamento do Kafka. Recepção dos dados Brokers são responsáveis pelo recebimento dos dados, eles funcionam como um entry-point ou porta de entrada pelo dados produzidos e em seguida gerenciam todo o armazenamento para que sejam consumidos por qualquer consumidor. Tolerância a falhas Como toda arquitetura de dados, precisamos pensar na tolerância a falhas. No contexto do Kafka são os Brokers responsáveis por garantir que mesmo em falhas os dados sejam duráveis e que mantenham uma alta disponibilidade. Brokers são responsáveis por gerenciar as partições dentro dos tópicos capazes de replicar os dados prevendo qualquer falha e diminuindo a possibilidade da perda de dados. Replicação de dados Como citado no item anterior, a replicação de dados é uma forma de diminuir a perda de dados em casos de falha. A replicação de dados é feita a partir de múltiplas replicas de partições armazenados em diferentes Brokers, isso permite que mesmo se um Broker falhe, haverá dados replicados em vários outros. Responsável por gerenciar partições Citamos recente sobre partições dentro de tópicos mas não citamos quem as gerenciam. Partições são gerenciadas por um Broker que trabalha coordenando a leitura e escrita naquela partição e também distribuindo o carregamento dos dados pelo cluster. Resumindo, os Brokers desempenham um trabalho de orquestração dentro um cluster Kafka, gerenciando a leitura e escrita feita pelos produtores e consumidores, garantindo que a troca de mensagens sejam realizadas e que não haverá perda dos dados em caso de falhas em alguns dos seus componentes através da replicação dos dados também gerenciada por eles. Conclusão Apache Kafka se destaca como uma solução robusta e poderosa, atendendo às demandas complexas de ambientes modernos baseados em dados. Seus recursos escaláveis, tolerantes a falhas e processamento em tempo real o tornam parte integrante das arquiteturas que lidam com fluxos de dados dinâmicos e em grande escala. Como mostrado neste post, Kafka atende diversos cenários, desde processamento de dados em tempo real em quer requer uma versão fresca do dado como em qualquer arquitetura de dados atualmente, e até como uma solução para integrar aplicações oferecendo o uso do padrão publish/subscribe. Kafka tem sido adotado por grandes empresas como: Linkedin, onde originalmente foi criado. Netflix, Uber, Airbnb, Wallmart , Goldman Sachs, Twitter e várias outras.

  • Guia Completo: Desvendando os Salários na Área de Tecnologia no Brasil em 2023

    Na era digital em que vivemos, a área de tecnologia continua a ser uma das mais dinâmicas e promissoras em termos de oportunidades de carreira. No entanto, um dos aspectos mais discutidos e importantes ao considerar uma carreira na tecnologia é o aspecto financeiro: os salários. Com o cenário econômico e tecnológico em constante evolução, compreender os salários na área de tecnologia se torna essencial para profissionais que buscam ingressar ou progredir nesse campo. Neste guia abrangente, exploraremos os salários na área de TI no Brasil durante o ano de 2023, fornecendo insights valiosos sobre as faixas salariais em diferentes níveis de experiência e especialidades. Analisaremos dados das principais capitais do país, como São Paulo, Rio de Janeiro, Belo Horizonte, Brasília e Porto Alegre, abrangendo áreas-chave como Engenharia de Software, Engenharia de Dados, Ciência de Dados, Analista de Dados e Analista de Segurança da Informação. Prepare-se para descobrir informações essenciais que o ajudarão a tomar decisões informadas sobre sua carreira na área de tecnologia, seja você um profissional em busca de novas oportunidades, um estudante considerando seu futuro ou simplesmente interessado em entender melhor o panorama salarial neste setor vibrante e em constante evolução. Esses números são apenas uma representação geral dos salários na área de tecnologia e podem variar dependendo da empresa, da experiência do profissional e de outros fatores. É importante também considerar que os salários podem ter variado ao longo do ano e podem continuar a mudar nos anos subsequentes. Como ingressar na área da Tecnologia em 2024? Com o avanço acelerado da tecnologia moldando nosso mundo de maneiras cada vez mais profundas, o interesse e a demanda por profissionais qualificados na área continuam a crescer exponencialmente. Entrar na indústria da tecnologia em 2024 oferece uma vasta gama de oportunidades emocionantes e desafiadoras para aqueles que estão prontos para abraçar a mudança e impulsionar a inovação. Nesta era de rápida evolução digital, as portas de entrada para a tecnologia são mais diversas do que nunca. Seja você um recém-formado procurando dar os primeiros passos em sua carreira ou um profissional experiente em busca de uma mudança de direção, há uma variedade de caminhos acessíveis e promissores para explorar, algumas delas citadas logo a seguir: Educação Formal e Cursos Técnicos: Optar por uma graduação em Ciência da Computação, Engenharia de Computação, Sistemas de Informação ou áreas relacionadas é uma maneira tradicional de entrar na área de tecnologia. Esses cursos oferecem uma base sólida de conhecimento técnico e teórico. Além disso, há uma abundância de cursos técnicos e de especialização em plataformas online, como Coursera, Udemy, e edX, que oferecem a oportunidade de adquirir habilidades específicas, como programação em Python, desenvolvimento web, entre outros. Bootcamps de Programação: Bootcamps de programação são programas intensivos e práticos que ensinam habilidades de desenvolvimento de software em um curto período de tempo, geralmente de alguns meses a um ano. Eles são uma ótima opção para quem busca uma transição rápida e eficiente para a área de tecnologia. Muitos bootcamps oferecem programas especializados em áreas específicas, como desenvolvimento web, ciência de dados, UX/UI design, entre outros. Estágios e Programas de Trainee: Estágios em empresas de tecnologia proporcionam uma valiosa experiência prática e permitem que os estudantes apliquem o conhecimento adquirido na universidade em situações do mundo real. Programas de trainee oferecidos por grandes empresas de tecnologia são uma excelente oportunidade para recém-formados desenvolverem habilidades técnicas e de liderança enquanto são mentorados por profissionais experientes. Networking e Projetos Pessoais: Participar de eventos de networking, como meetups de tecnologia, conferências e hackathons, pode ajudar a estabelecer conexões com profissionais da área e potenciais empregadores. Desenvolver projetos pessoais, como aplicativos, websites ou contribuições para projetos de código aberto, não apenas demonstra suas habilidades técnicas, mas também mostra seu comprometimento e paixão pela área. Certificações Profissionais: Obter certificações reconhecidas na indústria, como AWS Certified Solutions Architect, Google Certified Professional Cloud Architect, ou Microsoft Certified: Azure Developer Associate, pode aumentar suas chances de conseguir emprego na área de tecnologia e demonstrar seu conhecimento especializado em uma determinada plataforma ou tecnologia. Aprendizado Contínuo: A área de tecnologia está em constante evolução, portanto, é crucial estar sempre atualizado com as últimas tendências e tecnologias. Participar de cursos, workshops e conferências regulares e estar aberto ao aprendizado contínuo é essencial para o sucesso a longo prazo na área de tecnologia. Para aqueles que já estão no mercado de Tecnologia, como alcançar melhores salários em 2024? Na competitiva e em constante evolução indústria de Tecnologia da Informação (TI), profissionais talentosos e experientes buscam constantemente maneiras de se destacarem e avançarem em suas carreiras. Para muitos, essa busca inclui não apenas o desejo de garantir salários mais altos, mas também o aprimoramento contínuo de suas habilidades técnicas e interpessoais. Nesta era digital, onde a demanda por especialistas em TI nunca foi tão alta, é crucial que os profissionais estejam bem preparados para enfrentar os desafios do mercado e aproveitar as oportunidades disponíveis. Desde a pesquisa cuidadosa sobre os salários do setor até o desenvolvimento proativo de habilidades técnicas e soft skills, existem várias estratégias que os profissionais de TI podem adotar para alcançar seus objetivos de remuneração e crescimento profissional, a seguir algumas estratégias que podem te ajudar a angariar novos desafios e melhores salários: Dicas para Buscar Melhores Salários: Pesquise o Mercado: Faça uma pesquisa abrangente sobre as faixas salariais na sua área de atuação e nível de experiência. Sites de empregos, relatórios de salários da indústria e conversas com colegas de profissão podem fornecer insights valiosos. Considere a Certificação: Certificações relevantes podem aumentar seu valor no mercado e justificar um salário mais alto. Certificações como AWS Certified Solutions Architect, Google Cloud Platform, Cisco Certified Network Professional (CCNP) ou Certified Information Systems Security Professional (CISSP) são altamente valorizadas em suas respectivas áreas. Destaque suas Conquistas: Atualize seu currículo e perfil online para destacar suas conquistas e projetos relevantes. Mostre como suas habilidades contribuíram para o sucesso das empresas em que trabalhou. Negocie com Confiança: Ao receber uma oferta de emprego ou durante uma revisão salarial, esteja preparado para negociar. Demonstre seu valor, baseando-se em sua experiência, habilidades e contribuições para a empresa. Considere a Mudança: Em alguns casos, mudar de emprego pode ser a melhor maneira de garantir um salário mais alto. Explore oportunidades em outras empresas e compare pacotes de remuneração e benefícios. Aprimoramento de Hard Skills: Mantenha-se Atualizado: A tecnologia está em constante evolução. Esteja sempre atualizado com as últimas tendências, linguagens de programação, frameworks e ferramentas relevantes para sua área de atuação. Participe de Cursos e Treinamentos: Faça cursos online, participe de workshops e treinamentos presenciais para aprimorar suas habilidades técnicas. Plataformas como Udemy, Coursera e LinkedIn Learning oferecem uma variedade de opções de aprendizado. Contribua para Projetos de Código Aberto: Participar de projetos de código aberto não apenas demonstra suas habilidades técnicas, mas também amplia seu network e visibilidade na comunidade de desenvolvimento de software. Desenvolvimento de Soft Skills: Aprimore suas Habilidades de Comunicação: Habilidades de comunicação eficazes são essenciais para colaboração e liderança. Pratique a articulação de suas ideias de forma clara e concisa, tanto verbalmente quanto por escrito. Desenvolva sua Inteligência Emocional: A capacidade de gerenciar emoções, resolver conflitos e manter a calma sob pressão é inestimável no ambiente de trabalho. Invista em desenvolver sua inteligência emocional. Trabalhe em Equipe de Forma Eficiente: Colabore de forma eficaz com colegas de equipe, aprenda a ouvir ativamente, dar e receber feedback construtivo e liderar através da influência positiva. Aprimore suas Habilidades de Gerenciamento de Tempo e Projeto: Seja organizado, defina prioridades e cumpra prazos. Demonstrar habilidades de gerenciamento de tempo e projeto pode aumentar sua eficiência e valor como profissional de TI. Conclusão Ao longo deste post, exploramos diversas estratégias e dicas para profissionais de TI que buscam melhores salários e o aprimoramento de suas habilidades. Desde a pesquisa cuidadosa sobre os salários do setor até o desenvolvimento proativo de habilidades técnicas e soft skills, fica claro que há várias oportunidades disponíveis para aqueles que estão dispostos a investir em seu crescimento profissional. É fundamental lembrar que o sucesso na área de tecnologia não se resume apenas a alcançar um salário mais alto, mas também a manter-se atualizado com as últimas tendências, tecnologias e práticas da indústria. O aprendizado contínuo, a adaptabilidade e a capacidade de se comunicar efetivamente são habilidades que podem levar os profissionais de TI ainda mais longe em suas carreiras. Além disso, é importante destacar que o caminho para o sucesso na tecnologia é único para cada indivíduo. Não há uma abordagem única ou uma fórmula garantida para o sucesso. Cada profissional deve avaliar suas próprias metas, habilidades e circunstâncias pessoais e profissionais para determinar quais estratégias são mais adequadas para eles. Portanto, seja você um profissional experiente em busca de um salário mais alto ou um iniciante que está dando os primeiros passos na indústria de TI, lembre-se de que o crescimento profissional é uma jornada contínua e multifacetada. Mantenha-se comprometido com seu desenvolvimento pessoal e profissional, aproveite as oportunidades que surgirem e esteja sempre aberto a novos desafios e aprendizados. Com determinação, perseverança e um foco claro em seus objetivos, você estará bem posicionado para elevar sua carreira na tecnologia para novos patamares e alcançar o sucesso que almeja. O futuro está nas suas mãos - aproveite ao máximo as oportunidades que a indústria de tecnologia tem a oferecer.

  • Diferenças entre Future e CompletableFuture

    Introdução No âmbito da programação assíncrona e concorrente em Java, Future e CompletableFuture servem como ferramentas essenciais para gerenciar e executar tarefas assíncronas. Ambas as estruturas oferecem maneiras de representar o resultado de uma computação assíncrona, mas diferem significativamente em termos de funcionalidade, flexibilidade e facilidade de uso. Compreender as distinções entre Future e CompletableFuture é crucial para os desenvolvedores Java que desejam projetar sistemas assíncronos robustos e eficientes. No seu cerne, um Future representa o resultado de uma computação assíncrona que pode ou não estar completa. Ele permite que os desenvolvedores submetam tarefas para execução assíncrona e obtenham um identificador para recuperar o resultado em um momento posterior. Embora o Future forneça um mecanismo básico para programação assíncrona, suas capacidades são um tanto limitadas em termos de composição, tratamento de exceções e gerenciamento de fluxo assíncrono. Por outro lado, CompletableFuture introduz uma abordagem mais avançada e versátil para programação assíncrona em Java. Ele estende as capacidades do Future oferecendo uma API fluente para compor, combinar e lidar com tarefas assíncronas com maior flexibilidade e controle. CompletableFuture capacita os desenvolvedores a construir fluxos de trabalho assíncronos complexos, lidar com exceções de forma elegante e coordenar a execução de várias tarefas de maneira transparente. Neste post, vamos aprofundar as diferenças entre Future e CompletableFuture, explorando suas respectivas características, casos de uso e melhores práticas. Ao compreender as vantagens e compromissos distintos de cada estrutura, os desenvolvedores podem tomar decisões informadas ao projetar sistemas assíncronos e aproveitar a concorrência em aplicativos Java. Vamos embarcar em uma jornada para explorar as nuances de Future e CompletableFuture no ecossistema Java. Casos de Uso usando Future Processamento Paralelo: Utilize Future para paralelizar tarefas independentes através de múltiplas threads e reunir resultados de forma assíncrona. Por exemplo, processar múltiplos arquivos simultaneamente. I/O Assíncrona: Ao realizar operações de leitura e escrita que são blocantes, como ler de um arquivo ou fazer requisições de rede, você pode usar Future para executar essas operações em threads separadas e continuar com outras tarefas enquanto aguarda a conclusão da I/O. Execução e Coordenação de Tarefas: Utilize Future para executar tarefas de forma assíncrona e coordenar a conclusão delas. Por exemplo, em um servidor web, lidar com múltiplas requisições simultaneamente usando Future para cada processamento de requisição. Tratamento de Timeout: Você pode definir Timeouts para tarefas Future para evitar esperar indefinidamente pela conclusão. Isso é útil ao lidar com recursos com tempos de resposta imprevisíveis. Casos de Uso para CompletableFuture Padrão Async/Await: CompletableFuture suporta uma API fluente para encadear operações assíncronas, permitindo que você expresse fluxos de trabalho assíncronos complexos de forma clara e concisa, semelhante ao padrão async/await em outras linguagens de programação. Combinação de Resultados: Utilize CompletableFuture para combinar os resultados de múltiplas tarefas assíncronas, seja esperando que todas as tarefas sejam concluídas (allOf) ou combinando os resultados de duas tarefas (thenCombine, thenCompose). Tratamento de Exceções: CompletableFuture oferece mecanismos robustos de tratamento de exceções, permitindo lidar com exceções lançadas durante computações assíncronas de forma elegante usando métodos como exceptionally ou handle. Gráficos de Dependência: Você pode construir gráficos de dependência complexos de tarefas assíncronas usando CompletableFuture, onde a conclusão de uma tarefa desencadeia a execução de outra, permitindo um controle refinado sobre o fluxo de execução. Callbacks Não-Blocantes: CompletableFuture permite que você anexe callbacks que são executados após a conclusão do futuro, permitindo o tratamento não blocante de resultados ou erros. Completar uma Future Manualmente: Ao contrário de Future, você pode completar um CompletableFuture manualmente usando métodos como complete, completeExceptionally ou cancel. Essa funcionalidade pode ser útil em cenários em que você deseja fornecer um resultado ou lidar explicitamente com casos excepcionais. Exemplos Criação e finalização Exemplo de código usando Future na criação e finalização de um fluxo. ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(() -> { Thread.sleep(2000); return 10; }); Exemplo de código usando CompletableFuture na criação e finalização de um fluxo. CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 10; }); Em CompletableFuture, métodos como supplyAsync permitem execuções assíncronas sem que seja necessário usar um ExecutorService como é mostrado no primeiro exemplo. Encadeando Ações Exemplo abaixo usando Future em um encadeamento de ações. Future future = executor.submit(() -> 10); Future result = future.thenApply(i -> "Result: " + i); Agora, um exemplo usando CompletableFuture para encadear ações. CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 10); CompletableFuture result = completableFuture.thenApply(i -> "Result: " + i); CompletableFuture oferece uma API fluente (thenApply, thenCompose, etc.) para encadear ações, facilitando o uso de expressões de fluxos assíncronos. Lidando com Exceções Lidando com exceção usando Future. Future future = executor.submit(() -> { throw new RuntimeException("Exception occurred"); }); Lidando com exceção usando CompletableFuture. CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Exception occurred"); }); Aguardando uma conclusão de uma tarefa // Future Integer result = future.get(); // CompletableFuture Integer result = completableFuture.get(); Ambos Future e CompletableFuture fornecem o método get() responsável por aguardar a conclusão de um processamento e trazer os resultados esperados. Combinado Múltiplos CompletableFutures CompletableFuture future1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 20); CompletableFuture combinedFuture = future1.thenCombine(future2, (x, y) -> x + y); CompletableFuture fornecem métodos como thenCombine, thenCompose, e allOf que desempenham combinações ou compõem múltiplas tarefas (tasks) assíncronas. Conclusão No dinâmico cenário da programação assíncrona e concorrente em Java, tanto Future quanto CompletableFuture se destacam como ferramentas indispensáveis, oferecendo vantagens e casos de uso distintos. Podemos ver as diferenças entre Future e CompletableFuture, enquanto o Future fornece um mecanismo básico para representar o resultado de computações assíncronas, suas capacidades são um tanto limitadas quando se trata de composição, tratamento de exceções e gerenciamento de fluxo assíncrono. Por outro lado, o CompletableFuture surge como uma alternativa poderosa e flexível, estendendo as funcionalidades do Future com uma API fluente para composição, combinação e manipulação de tarefas assíncronas com maior controle e elegância. A escolha entre Future e CompletableFuture depende dos requisitos específicos e complexidades da tarefa em questão. Para operações assíncronas simples ou ao trabalhar dentro dos limites de bases de código existentes, o Future pode ser suficiente. No entanto, em cenários que exigem fluxos de trabalho assíncronos mais sofisticados, tratamento de exceções ou coordenação de tarefas, o CompletableFuture oferece uma solução convincente com seu conjunto de recursos rico e API intuitiva.

bottom of page