O que é o Delta Lake?
O Delta Lake é um projeto open-source que visa gerenciar a camada de armazenamento (Storage Layer) que segundo o seu conceito: "traz confiabilidade para Datalakes". Na prática é uma abstração do Apache Spark reutilizando os mesmos mecanismos mas oferencendo recursos extras interessantes. Dentre eles, suporte para transações ACID. Todos sabem que manter a integridade dos dados em uma pipeline de dados é uma tarefa crítica diante do alta concorrência de leitura e escrita de dados. Neste caso, o ACID possibilita gerenciar ambientes como estes.
Dentre outras vantagens, o Delta Lake provê histórico de auditoria, versionamento de dados, suporta operações DML do tipo delete e update e vários outras vantagens.
Para este tutorial, vamos simular uma pipeline de dados simples de forma local, com o foco no que o Delta Lake pode nos oferecer de vantagens. Na etapa de ingestão, vamos carregar um Spark DataFrame a partir de um JSON, criaremos uma view temporária para nos auxiliar, criaremos uma tabela Delta via SQL e por fim, utilizaremos o Delta Lake para executar algumas operações de DML utilizando a view que será criada.
Vamos utilizar Java e Maven para gerenciar as dependências, além de Spark e Hive no qual este, vai nos auxiliar em manter a tabelas em um catálogo de dados.
Maven
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>
O código será desenvolvido em pequenos trechos para um melhor entendimento.
Configurando Spark com Delta e Hive
String val_ext="io.delta.sql.DeltaSparkSessionExtension";
String val_ctl="org.apache.spark.sql.delta.catalog.DeltaCatalog";
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("app");
sparkConf.setMaster("local[1]");
sparkConf.set("spark.sql.extensions",var_ext);
sparkConf.set("spark.sql.catalog.spark_catalog",val_ctl);
SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
Entendendo o trecho acima
Definimos duas variáveis val_ext e val_ctl atribuindo os valores para as chaves (spark.sql.extensions e spark.sql.catalog.spark_catalog) . Estas, necessárias para a configuração do Delta junto com o Spark
Demos o nome do contexto do Spark de app
Como não estamos rodando o Spark em um cluster, o master está configurado para rodar local local[1]
O Spark tem suporte para o Hive, nesse caso o habilitamos no trecho enableHiveSupport( )
Ingestão dos Dados
Vamos trabalhar com Spark Dataframe como fonte dos dados. Carregaremos um Dataframe a partir de um JSON.
Arquivo order.json
{"id":1, "date_order": "2021-01-23", "customer": "Jerry", "product": "BigMac", "unit": 1, "price": 8.00}
{"id":2, "date_order": "2021-01-22", "customer": "Olivia", "product": "Cheese Burguer", "unit": 3, "price": 21.60}
{"id":3, "date_order": "2021-01-21", "customer": "Monica", "product": "Quarter", "unit": 2, "price": 12.40}
{"id":4, "date_order": "2021-01-23", "customer": "Monica", "product": "McDouble", "unit": 2, "price": 13.00}
{"id":5, "date_order": "2021-01-23", "customer": "Suzie", "product": "Double Cheese", "unit": 2, "price": 12.00}
{"id":6, "date_order": "2021-01-25", "customer": "Liv", "product": "Hamburger", "unit": 1, "price": 2.00}
{"id":7, "date_order": "2021-01-25", "customer": "Paul", "product": "McChicken", "unit": 1, "price": 2.40}
Criando Dataframe
Dataset<Row> df = sparkSession.read().json("datasource/");
df.createOrReplaceGlobalTempView("order_view");
Entendendo o trecho acima
No trecho anterior, estamos criando um Dataframe a partir do arquivo JSON que está dentro do diretório datasource/ , crie este diretório para que a estrutura do seu código fique mais compreensiva e em seguida, crie o arquivo order.json com base no conteúdo mostrado anteriormente.
Por último, criamos um view temporária que vai nos auxiliar mais a frente nos próximos passos.
Criando a Tabela Delta (Delta Table)
Vamos criar a Delta Table a partir de um script SQL. A princípio a criação é simples, mas perceba que usamos tipagens diferentes de uma tabela utilizada em um banco relacional. Como por exemplo, utilizamos STRING ao invés de VARCHAR e assim por diante.
Estamos particionando a tabela pelo campo date_order. Este campo foi escolhido como partição pois acreditamos que haverá diferentes datas. Dessa forma, as consultas podem utilizar este campo como filtro, visando um melhor desempenho.
E por fim, definimos a tabela como Delta Table a partir do trecho USING DELTA.
String statement =
"CREATE OR REPLACE TABLE orders (" +
"id STRING, " +
"date_order STRING," +
"customer STRING," +
"product STRING," +
"unit INTEGER," +
"price DOUBLE) " +
"USING DELTA " +
"PARTITIONED BY (date_order) ";
sparkSession.sql(statement);
Entendendo o trecho acima
No trecho anterior estamos criando uma tabela Delta chamada orders e em seguida executamos a criação.
Operações DML
Delta suporta operações como Delete, Update e Upsert utilizando Merge
Utilizando Merge junto com Insert e Update
Neste passo, vamos executar um Merge que possibilita controlar o fluxo de inserção e atualização de dados através de uma tabela, Dataframe ou view. O Merge trabalha a partir de row matches que ficará mais compreensível no trecho seguinte.
String mergeStatement = "Merge into orders " +
"using global_temp.order_view as orders_view " +
"ON orders.id = orders_view.id " +
"WHEN MATCHED THEN " +
"UPDATE SET orders.product = orders_view.product," +
"orders.price = orders_view.price " +
"WHEN NOT MATCHED THEN INSERT * ";
sparkSession.sql(mergeStatement);
Entendendo o trecho acima
No trecho acima estamos executando o Merge a partir da view order_view criada nos passos anteriores.
No mesmo trecho temos uma condição orders.id = orders_view.id que vai auxiliar nos matches seguintes.
Caso a condição anterior seja verdadeira, ou seja o MATCHED seja verdadeiro. Os dados serão atualizados.
Caso a condição não seja verdadeira, NOT MATCHED. Os dados serão inseridos.
No caso acima, os dados serão inseridos, pois até então não existia dados na tabela orders. Execute o comando abaixo para visualizar os dados inseridos.
sparkSession.sql("select * from orders").show();
Atualize o arquivo datasource/order.json alterando o campo product, price e execute todos os trechos novamente. Você verá que todos os registros serão atualizados.
Comando Update
É possível executar Update sem a necessidade de usar o Merge, basta executar o comando abaixo:
String updateStatement = "update orders " +
"set product = 'Milk-Shake' " +
"where id = 2";
sparkSession.sql(updateStatement);
Comando Delete
String deleteStatement = "delete from pedidos where id = 2";
sparkSession.sql(deleteStatement);
Além de poder executar o comando Delete, é possível utilizar este comando junto ao Merge.
Entendendo o Delta Lake Transaction Log (DeltaLog)
Além de dar suporte a transações ACID, o delta gera alguns arquivos JSON que servem como uma forma de auditar e manter o histórico de cada transação, desde comandos DDL e DML
Com este mecanismo é possível até voltar em um estado específico da tabela caso necessário.
Para cada transação executada um arquivo JSON é criado dentro da pasta _delta_log. O arquivo inicial sempre será 000000000.json, contendo os commits da transação. Neste nosso cenário, este primeiro arquivo contém os commits da criação da tabela orders.
Para visualizar melhor, acesse a pasta local que provavelmente foi criada na raiz do seu projeto chamada spark-warehouse. Esta pasta foi criada pelo Hive para alocar os recursos criados desde os arquivos JSON e os parquets. Dentro dela terá uma estrutura de pastas conforme imagem abaixo:
Perceba que os arquivos são criados em ordem crescente a partir de cada transação executada.
Acesse cada arquivo JSON e verá cada transação que foi executado através do campo operation, além de outras informações.
00000000000000000000.json
"operation":"CREATE OR REPLACE TABLE"
00000000000000000001.json
"operation":"MERGE"
00000000000000000002.json
"operation":"UPDATE"
00000000000000000003.json
"operation":"DELETE"
Perceba também que os arquivos parquets foram gerados particionados em pastas pelo campo date_order. Utilizar partição visa consultas com melhores desempenho, mas não entraremos em detalhes neste post, futuramente falaremos mais sobre isso.
Espero que neste post foi possível esclarecer algumas dúvidas sobre o que é o Delta Lake e seu funcionamento. Até a próxima!
Comentarios