top of page

Primeiros passos com Delta Lake

O que é o Delta Lake? O Delta Lake é um projeto open-source que visa gerenciar a camada de armazenamento (Storage Layer) que segundo o seu conceito: "traz confiabilidade para Datalakes". Na prática é uma abstração do Apache Spark reutilizando os mesmos mecanismos mas oferencendo recursos extras interessantes. Dentre eles, suporte para transações ACID. Todos sabem que manter a integridade dos dados em uma pipeline de dados é uma tarefa crítica diante do alta concorrência de leitura e escrita de dados. Neste caso, o ACID possibilita gerenciar ambientes como estes. Dentre outras vantagens, o Delta Lake provê histórico de auditoria, versionamento de dados, suporta operações DML do tipo delete e update e vários outras vantagens. Para este tutorial, vamos simular uma pipeline de dados simples de forma local, com o foco no que o Delta Lake pode nos oferecer de vantagens. Na etapa de ingestão, vamos carregar um Spark DataFrame a partir de um JSON, criaremos uma view temporária para nos auxiliar, criaremos uma tabela Delta via SQL e por fim, utilizaremos o Delta Lake para executar algumas operações de DML utilizando a view que será criada. Vamos utilizar Java e Maven para gerenciar as dependências, além de Spark e Hive no qual este, vai nos auxiliar em manter a tabelas em um catálogo de dados. Maven < dependencies >
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-core_2.12</ artifactId >
< version >3.0.1</ version >
</ dependency >

< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-sql_2.12</ artifactId >
< version >3.0.1</ version >
</ dependency >

< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-hive_2.12</ artifactId >
< version >3.0.1</ version >
</ dependency >

< dependency >
< groupId >io.delta</ groupId >
< artifactId >delta-core_2.12</ artifactId >
< version >0.8.0</ version >
</ dependency >
</ dependencies > O código será desenvolvido em pequenos trechos para um melhor entendimento. Configurando Spark com Delta e Hive String val_ext=" io.delta.sql.DeltaSparkSessionExtension" ;
String val_ctl= "org.apache.spark.sql.delta.catalog.DeltaCatalog" ;

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "app" );
sparkConf.setMaster( "local[1]" );
sparkConf.set( "spark.sql.extensions" ,var_ext);
sparkConf.set( "spark.sql.catalog.spark_catalog" ,val_ctl);

SparkSession sparkSession = SparkSession. builder ()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate(); Entendendo o trecho acima Definimos duas variáveis val_ext e val_ctl atribuindo os valores para as chaves ( spark.sql.extensions e spark.sql.catalog.spark_catalog ) . Estas, necessárias para a configuração do Delta junto com o Spark Demos o nome do contexto do Spark de app Como não estamos rodando o Spark em um cluster, o master está configurado para rodar local local[1] O Spark tem suporte para o Hive, nesse caso o habilitamos no trecho enableHiveSupport( ) Ingestão dos Dados Vamos trabalhar com Spark Dataframe como fonte dos dados. Carregaremos um Dataframe a partir de um JSON. Arquivo order.json { "id" :1, "date_order" : "2021-01-23" , "customer" : "Jerry" , "product" : "BigMac" , "unit" : 1, "price" : 8.00}
{ "id" :2, "date_order" : "2021-01-22" , "customer" : "Olivia" , "product" : "Cheese Burguer" , "unit" : 3, "price" : 21.60}
{ "id" :3, "date_order" : "2021-01-21" , "customer" : "Monica" , "product" : "Quarter" , "unit" : 2, "price" : 12.40}
{ "id" :4, "date_order" : "2021-01-23" , "customer" : "Monica" , "product" : "McDouble" , "unit" : 2, "price" : 13.00}
{ "id" :5, "date_order" : "2021-01-23" , "customer" : "Suzie" , "product" : "Double Cheese" , "unit" : 2, "price" : 12.00}
{ "id" :6, "date_order" : "2021-01-25" , "customer" : "Liv" , "product" : "Hamburger" , "unit" : 1, "price" : 2.00}
{ "id" :7, "date_order" : "2021-01-25" , "customer" : "Paul" , "product" : "McChicken" , "unit" : 1, "price" : 2.40}
Criando Dataframe Dataset<Row> df = sparkSession.read().json( "datasource/" );
df.createOrReplaceGlobalTempView( "order_view" ); Entendendo o trecho acima No trecho anterior, estamos criando um Dataframe a partir do arquivo JSON que está dentro do diretório datasource/ , crie este diretório para que a estrutura do seu código fique mais compreensiva e em seguida, crie o arquivo order.json com base no conteúdo mostrado anteriormente. Por último, criamos um view temporária que vai nos auxiliar mais a frente nos próximos passos. Criando a Tabela Delta (Delta Table) Vamos criar a Delta Table a partir de um script SQL. A princípio a criação é simples, mas perceba que usamos tipagens diferentes de uma tabela utilizada em um banco relacional. Como por exemplo, utilizamos STRING ao invés de VARCHAR e assim por diante. Estamos particionando a tabela pelo campo date_order . Este campo foi escolhido como partição pois acreditamos que haverá diferentes datas. Dessa forma, as consultas podem utilizar este campo como filtro, visando um melhor desempenho. E por fim, definimos a tabela como Delta Table a partir do trecho USING DELTA. String statement =
"CREATE OR REPLACE TABLE orders (" +
"id STRING, " +
"date_order STRING," +
"customer STRING," +
"product STRING," +
"unit INTEGER," +
"price DOUBLE) " +
"USING DELTA " +
"PARTITIONED BY (date_order) " ;

sparkSession.sql(statement); Entendendo o trecho acima No trecho anterior estamos criando uma tabela Delta chamada orders e em seguida executamos a criação. Operações DML Delta suporta operações como Delete, Update e Upsert utilizando Merge Utilizando Merge junto com Insert e Update Neste passo, vamos executar um Merge que possibilita controlar o fluxo de inserção e atualização de dados através de uma tabela, Dataframe ou view. O Merge trabalha a partir de r ow matches que ficará mais compreensível no trecho seguinte. String mergeStatement = "Merge into orders " +
"using global_temp.order_view as orders_view " +
"ON orders.id = orders_view.id " +
"WHEN MATCHED THEN " +
"UPDATE SET orders.product = orders_view.product," +
"orders.price = orders_view.price " +
"WHEN NOT MATCHED THEN INSERT * " ;

sparkSession.sql(mergeStatement); Entendendo o trecho acima No trecho acima estamos executando o Merge a partir da view order_view criada nos passos anteriores. No mesmo trecho temos uma condição orders.id = orders_view.id que vai auxiliar nos matches seguintes. Caso a condição anterior seja verdadeira, ou seja o MATCHED seja verdadeiro. Os dados serão atualizados. Caso a condição não seja verdadeira, NOT MATCHED. Os dados serão inseridos. No caso acima, os dados serão inseridos, pois até então não existia dados na tabela orders . Execute o comando abaixo para visualizar os dados inseridos. sparkSession.sql( "select * from orders" ).show(); Atualize o arquivo datasource/order.json alterando o campo product, price e execute todos os trechos novamente. Você verá que todos os registros serão atualizados. Comando Update É possível executar Update sem a necessidade de usar o Merge, basta executar o comando abaixo: String updateStatement = "update orders " +
"set product = 'Milk-Shake' " +
"where id = 2" ;

sparkSession.sql(updateStatement); Comando Delete String deleteStatement = "delete from pedidos where id = 2" ;
sparkSession.sql(deleteStatement); Além de poder executar o comando Delete, é possível utilizar este comando junto ao Merge. Entendendo o Delta Lake Transaction Log (DeltaLog) Além de dar suporte a transações ACID, o delta gera alguns arquivos JSON que servem como uma forma de auditar e manter o histórico de cada transação, desde comandos DDL e DML Com este mecanismo é possível até voltar em um estado específico da tabela caso necessário. Para cada transação executada um arquivo JSON é criado dentro da pasta _delta_log . O arquivo inicial sempre será 000000000.json , contendo os commits da transação. Neste nosso cenário, este primeiro arquivo contém os commits da criação da tabela orders . Para visualizar melhor, acesse a pasta local que provavelmente foi criada na raiz do seu projeto chamada spark-warehouse. Esta pasta foi criada pelo Hive para alocar os recursos criados desde os arquivos JSON e os parquets. Dentro dela terá uma estrutura de pastas conforme imagem abaixo: Perceba que os arquivos são criados em ordem crescente a partir de cada transação executada. Acesse cada arquivo JSON e verá cada transação que foi executado através do campo operation , além de outras informações. 00000000000000000000.json "operation": "CREATE OR REPLACE TABLE" 00000000000000000001.json "operation": "MERGE" 00000000000000000002.json "operation": "UPDATE" 00000000000000000003.json "operation": "DELETE" Perceba também que os arquivos parquets foram gerados particionados em pastas pelo campo date_order. Utilizar partição visa consultas com melhores desempenho, mas não entraremos em detalhes neste post, futuramente falaremos mais sobre isso. Espero que neste post foi possível esclarecer algumas dúvidas sobre o que é o Delta Lake e seu funcionamento. Até a próxima!

Primeiros passos com Delta Lake
bottom of page