JP

2 de mai de 20215 min

Primeiros passos com Delta Lake

Atualizado: fev 8

O que é o Delta Lake?

O Delta Lake é um projeto open-source que visa gerenciar a camada de armazenamento (Storage Layer) que segundo o seu conceito: "traz confiabilidade para Datalakes". Na prática é uma abstração do Apache Spark reutilizando os mesmos mecanismos mas oferencendo recursos extras interessantes. Dentre eles, suporte para transações ACID. Todos sabem que manter a integridade dos dados em uma pipeline de dados é uma tarefa crítica diante do alta concorrência de leitura e escrita de dados. Neste caso, o ACID possibilita gerenciar ambientes como estes.

Dentre outras vantagens, o Delta Lake provê histórico de auditoria, versionamento de dados, suporta operações DML do tipo delete e update e vários outras vantagens.

Para este tutorial, vamos simular uma pipeline de dados simples de forma local, com o foco no que o Delta Lake pode nos oferecer de vantagens. Na etapa de ingestão, vamos carregar um Spark DataFrame a partir de um JSON, criaremos uma view temporária para nos auxiliar, criaremos uma tabela Delta via SQL e por fim, utilizaremos o Delta Lake para executar algumas operações de DML utilizando a view que será criada.

Vamos utilizar Java e Maven para gerenciar as dependências, além de Spark e Hive no qual este, vai nos auxiliar em manter a tabelas em um catálogo de dados.

Maven

<dependencies>
 
<dependency>
 
<groupId>org.apache.spark</groupId>
 
<artifactId>spark-core_2.12</artifactId>
 
<version>3.0.1</version>
 
</dependency>
 

 
<dependency>
 
<groupId>org.apache.spark</groupId>
 
<artifactId>spark-sql_2.12</artifactId>
 
<version>3.0.1</version>
 
</dependency>
 

 
<dependency>
 
<groupId>org.apache.spark</groupId>
 
<artifactId>spark-hive_2.12</artifactId>
 
<version>3.0.1</version>
 
</dependency>
 

 
<dependency>
 
<groupId>io.delta</groupId>
 
<artifactId>delta-core_2.12</artifactId>
 
<version>0.8.0</version>
 
</dependency>
 
</dependencies>

O código será desenvolvido em pequenos trechos para um melhor entendimento.

Configurando Spark com Delta e Hive

String val_ext="io.delta.sql.DeltaSparkSessionExtension";
 
String val_ctl="org.apache.spark.sql.delta.catalog.DeltaCatalog";
 

 
SparkConf sparkConf = new SparkConf();
 
sparkConf.setAppName("app");
 
sparkConf.setMaster("local[1]");
 
sparkConf.set("spark.sql.extensions",var_ext);
 
sparkConf.set("spark.sql.catalog.spark_catalog",val_ctl);
 

 
SparkSession sparkSession = SparkSession.builder()
 
.config(sparkConf)
 
.enableHiveSupport()
 
.getOrCreate();

Entendendo o trecho acima

  • Definimos duas variáveis val_ext e val_ctl atribuindo os valores para as chaves (spark.sql.extensions e spark.sql.catalog.spark_catalog) . Estas, necessárias para a configuração do Delta junto com o Spark

  • Demos o nome do contexto do Spark de app

  • Como não estamos rodando o Spark em um cluster, o master está configurado para rodar local local[1]

  • O Spark tem suporte para o Hive, nesse caso o habilitamos no trecho enableHiveSupport( )

Ingestão dos Dados

Vamos trabalhar com Spark Dataframe como fonte dos dados. Carregaremos um Dataframe a partir de um JSON.

Arquivo order.json

{"id":1, "date_order": "2021-01-23", "customer": "Jerry", "product": "BigMac", "unit": 1, "price": 8.00}
 
{"id":2, "date_order": "2021-01-22", "customer": "Olivia", "product": "Cheese Burguer", "unit": 3, "price": 21.60}
 
{"id":3, "date_order": "2021-01-21", "customer": "Monica", "product": "Quarter", "unit": 2, "price": 12.40}
 
{"id":4, "date_order": "2021-01-23", "customer": "Monica", "product": "McDouble", "unit": 2, "price": 13.00}
 
{"id":5, "date_order": "2021-01-23", "customer": "Suzie", "product": "Double Cheese", "unit": 2, "price": 12.00}
 
{"id":6, "date_order": "2021-01-25", "customer": "Liv", "product": "Hamburger", "unit": 1, "price": 2.00}
 
{"id":7, "date_order": "2021-01-25", "customer": "Paul", "product": "McChicken", "unit": 1, "price": 2.40}
 

Criando Dataframe

Dataset<Row> df = sparkSession.read().json("datasource/");
 
df.createOrReplaceGlobalTempView("order_view");

Entendendo o trecho acima

No trecho anterior, estamos criando um Dataframe a partir do arquivo JSON que está dentro do diretório datasource/ , crie este diretório para que a estrutura do seu código fique mais compreensiva e em seguida, crie o arquivo order.json com base no conteúdo mostrado anteriormente.

Por último, criamos um view temporária que vai nos auxiliar mais a frente nos próximos passos.

Criando a Tabela Delta (Delta Table)

Vamos criar a Delta Table a partir de um script SQL. A princípio a criação é simples, mas perceba que usamos tipagens diferentes de uma tabela utilizada em um banco relacional. Como por exemplo, utilizamos STRING ao invés de VARCHAR e assim por diante.

Estamos particionando a tabela pelo campo date_order. Este campo foi escolhido como partição pois acreditamos que haverá diferentes datas. Dessa forma, as consultas podem utilizar este campo como filtro, visando um melhor desempenho.

E por fim, definimos a tabela como Delta Table a partir do trecho USING DELTA.

String statement =
 
"CREATE OR REPLACE TABLE orders (" +
 
"id STRING, " +
 
"date_order STRING," +
 
"customer STRING," +
 
"product STRING," +
 
"unit INTEGER," +
 
"price DOUBLE) " +
 
"USING DELTA " +
 
"PARTITIONED BY (date_order) ";
 

 
sparkSession.sql(statement);

Entendendo o trecho acima

No trecho anterior estamos criando uma tabela Delta chamada orders e em seguida executamos a criação.

Operações DML

Delta suporta operações como Delete, Update e Upsert utilizando Merge

Utilizando Merge junto com Insert e Update

Neste passo, vamos executar um Merge que possibilita controlar o fluxo de inserção e atualização de dados através de uma tabela, Dataframe ou view. O Merge trabalha a partir de row matches que ficará mais compreensível no trecho seguinte.

String mergeStatement = "Merge into orders " +
 
"using global_temp.order_view as orders_view " +
 
"ON orders.id = orders_view.id " +
 
"WHEN MATCHED THEN " +
 
"UPDATE SET orders.product = orders_view.product," +
 
"orders.price = orders_view.price " +
 
"WHEN NOT MATCHED THEN INSERT * ";
 

 
sparkSession.sql(mergeStatement);

Entendendo o trecho acima

  • No trecho acima estamos executando o Merge a partir da view order_view criada nos passos anteriores.

  • No mesmo trecho temos uma condição orders.id = orders_view.id que vai auxiliar nos matches seguintes.

  • Caso a condição anterior seja verdadeira, ou seja o MATCHED seja verdadeiro. Os dados serão atualizados.

  • Caso a condição não seja verdadeira, NOT MATCHED. Os dados serão inseridos.

No caso acima, os dados serão inseridos, pois até então não existia dados na tabela orders. Execute o comando abaixo para visualizar os dados inseridos.

sparkSession.sql("select * from orders").show();

Atualize o arquivo datasource/order.json alterando o campo product, price e execute todos os trechos novamente. Você verá que todos os registros serão atualizados.

Comando Update

É possível executar Update sem a necessidade de usar o Merge, basta executar o comando abaixo:

String updateStatement = "update orders " +
 
"set product = 'Milk-Shake' " +
 
"where id = 2";
 

 
sparkSession.sql(updateStatement);

Comando Delete

String deleteStatement = "delete from pedidos where id = 2";
 
sparkSession.sql(deleteStatement);

Além de poder executar o comando Delete, é possível utilizar este comando junto ao Merge.

Entendendo o Delta Lake Transaction Log (DeltaLog)

Além de dar suporte a transações ACID, o delta gera alguns arquivos JSON que servem como uma forma de auditar e manter o histórico de cada transação, desde comandos DDL e DML

Com este mecanismo é possível até voltar em um estado específico da tabela caso necessário.

Para cada transação executada um arquivo JSON é criado dentro da pasta _delta_log. O arquivo inicial sempre será 000000000.json, contendo os commits da transação. Neste nosso cenário, este primeiro arquivo contém os commits da criação da tabela orders.

Para visualizar melhor, acesse a pasta local que provavelmente foi criada na raiz do seu projeto chamada spark-warehouse. Esta pasta foi criada pelo Hive para alocar os recursos criados desde os arquivos JSON e os parquets. Dentro dela terá uma estrutura de pastas conforme imagem abaixo:

Perceba que os arquivos são criados em ordem crescente a partir de cada transação executada.

Acesse cada arquivo JSON e verá cada transação que foi executado através do campo operation, além de outras informações.

00000000000000000000.json

"operation":"CREATE OR REPLACE TABLE"

00000000000000000001.json

"operation":"MERGE"

00000000000000000002.json

"operation":"UPDATE"

00000000000000000003.json

"operation":"DELETE"

Perceba também que os arquivos parquets foram gerados particionados em pastas pelo campo date_order. Utilizar partição visa consultas com melhores desempenho, mas não entraremos em detalhes neste post, futuramente falaremos mais sobre isso.

Espero que neste post foi possível esclarecer algumas dúvidas sobre o que é o Delta Lake e seu funcionamento. Até a próxima!

    1