• JP

Primeiros passos com Delta Lake


O que é o Delta Lake?


O Delta Lake é um projeto open-source que visa gerenciar a camada de armazenamento (Storage Layer) que segundo o seu conceito: "traz confiabilidade para Datalakes". Na prática é uma abstração do Apache Spark reutilizando os mesmos mecanismos mas oferencendo recursos extras interessantes. Dentre eles, suporte para transações ACID. Todos sabem que manter a integridade dos dados em uma pipeline de dados é uma tarefa crítica diante do alta concorrência de leitura e escrita de dados. Neste caso, o ACID possibilita gerenciar ambientes como estes.


Dentre outras vantagens, o Delta Lake provê histórico de auditoria, versionamento de dados, suporta operações DML do tipo delete e update e vários outras vantagens.


Para este tutorial, vamos simular uma pipeline de dados simples de forma local, com o foco no que o Delta Lake pode nos oferecer de vantagens. Na etapa de ingestão, vamos carregar um Spark DataFrame a partir de um JSON, criaremos uma view temporária para nos auxiliar, criaremos uma tabela Delta via SQL e por fim, utilizaremos o Delta Lake para executar algumas operações de DML utilizando a view que será criada.


Vamos utilizar Java e Maven para gerenciar as dependências, além de Spark e Hive no qual este, vai nos auxiliar em manter a tabelas em um catálogo de dados.


Maven

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.1</version>
    </dependency>

    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-core_2.12</artifactId>
        <version>0.8.0</version>
    </dependency>
</dependencies>

O código será desenvolvido em pequenos trechos para um melhor entendimento.


Configurando Spark com Delta e Hive

String val_ext="io.delta.sql.DeltaSparkSessionExtension";
String val_ctl="org.apache.spark.sql.delta.catalog.DeltaCatalog";

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("app");
sparkConf.setMaster("local[1]");
sparkConf.set("spark.sql.extensions",var_ext);
sparkConf.set("spark.sql.catalog.spark_catalog",val_ctl);

SparkSession sparkSession = SparkSession.builder()
        .config(sparkConf)
        .enableHiveSupport()
        .getOrCreate();

Entendendo o trecho acima


  • Definimos duas variáveis val_ext e val_ctl atribuindo os valores para as chaves (spark.sql.extensions e spark.sql.catalog.spark_catalog) . Estas, necessárias para a configuração do Delta junto com o Spark

  • Demos o nome do contexto do Spark de app

  • Como não estamos rodando o Spark em um cluster, o master está configurado para rodar local local[1]

  • O Spark tem suporte para o Hive, nesse caso o habilitamos no trecho enableHiveSupport( )


Ingestão dos Dados


Vamos trabalhar com Spark Dataframe como fonte dos dados. Carregaremos um Dataframe a partir de um JSON.


Arquivo order.json

{"id":1, "date_order": "2021-01-23", "customer": "Jerry", "product": "BigMac", "unit": 1, "price": 8.00}
{"id":2, "date_order": "2021-01-22", "customer": "Olivia", "product": "Cheese Burguer", "unit": 3, "price": 21.60}
{"id":3, "date_order": "2021-01-21", "customer": "Monica", "product": "Quarter", "unit": 2, "price": 12.40}
{"id":4, "date_order": "2021-01-23", "customer": "Monica", "product": "McDouble", "unit": 2, "price": 13.00}
{"id":5, "date_order": "2021-01-23", "customer": "Suzie", "product": "Double Cheese", "unit": 2, "price": 12.00}
{"id":6, "date_order": "2021-01-25", "customer": "Liv", "product": "Hamburger", "unit": 1, "price": 2.00}
{"id":7, "date_order": "2021-01-25", "customer": "Paul", "product": "McChicken", "unit": 1, "price": 2.40}


Criando Dataframe

Dataset<Row> df = sparkSession.read().json("datasource/");
df.createOrReplaceGlobalTempView("order_view");

Entendendo o trecho acima


No trecho anterior, estamos criando um Dataframe a partir do arquivo JSON que está dentro do diretório datasource/ , crie este diretório para que a estrutura do seu código fique mais compreensiva e em seguida, crie o arquivo order.json com base no conteúdo mostrado anteriormente.


Por último, criamos um view temporária que vai nos auxiliar mais a frente nos próximos passos.


Criando a Tabela Delta (Delta Table)


Vamos criar a Delta Table a partir de um script SQL. A princípio a criação é simples, mas perceba que usamos tipagens diferentes de uma tabela utilizada em um banco relacional. Como por exemplo, utilizamos STRING ao invés de VARCHAR e assim por diante.


Estamos particionando a tabela pelo campo date_order. Este campo foi escolhido como partição pois acreditamos que haverá diferentes datas. Dessa forma, as consultas podem utilizar este campo como filtro, visando um melhor desempenho.


E por fim, definimos a tabela como Delta Table a partir do trecho USING DELTA.

String statement =
        "CREATE OR REPLACE TABLE orders (" +
                "id STRING, " +
                "date_order STRING," +
                "customer STRING," +
                "product STRING," +
                "unit INTEGER," +
                "price DOUBLE) " +
                "USING DELTA " +
                "PARTITIONED BY (date_order) ";

sparkSession.sql(statement); 

Entendendo o trecho acima


No trecho anterior estamos criando uma tabela Delta chamada orders e em seguida executamos a criação.


Operações DML


Delta suporta operações como Delete, Update e Upsert utilizando Merge


Utilizando Merge junto com Insert e Update


Neste passo, vamos executar um Merge que possibilita controlar o fluxo de inserção e atualização de dados através de uma tabela, Dataframe ou view. O Merge trabalha a partir de row matches que ficará mais compreensível no trecho seguinte.

String mergeStatement = "Merge into orders " +
        "using global_temp.order_view as orders_view " +
        "ON orders.id = orders_view.id " +
        "WHEN MATCHED THEN " +
        "UPDATE SET orders.product = orders_view.product," +
        "orders.price = orders_view.price " +
        "WHEN NOT MATCHED THEN INSERT * ";

sparkSession.sql(mergeStatement);

Entendendo o trecho acima

  • No trecho acima estamos executando o Merge a partir da view order_view criada nos passos anteriores.

  • No mesmo trecho temos uma condição orders.id = orders_view.id que vai auxiliar nos matches seguintes.

  • Caso a condição anterior seja verdadeira, ou seja o MATCHED seja verdadeiro. Os dados serão atualizados.

  • Caso a condição não seja verdadeira, NOT MATCHED. Os dados serão inseridos.


No caso acima, os dados serão inseridos, pois até então não existia dados na tabela orders. Execute o comando abaixo para visualizar os dados inseridos.

sparkSession.sql("select * from orders").show();







Atualize o arquivo datasource/order.json alterando o campo product, price e execute todos os trechos novamente. Você verá que todos os registros serão atualizados.


Comando Update


É possível executar Update sem a necessidade de usar o Merge, basta executar o comando abaixo:

String updateStatement = "update orders " +
        "set product = 'Milk-Shake' " +
        "where id = 2";
        
sparkSession.sql(updateStatement);        

Comando Delete

String deleteStatement = "delete from pedidos where id = 2";
sparkSession.sql(deleteStatement);

Além de poder executar o comando Delete, é possível utilizar este comando junto ao Merge.


Entendendo o Delta Lake Transaction Log (DeltaLog)


Além de dar suporte a transações ACID, o delta gera alguns arquivos JSON que servem como uma forma de auditar e manter o histórico de cada transação, desde comandos DDL e DML


Com este mecanismo é possível até voltar em um estado específico da tabela caso necessário.


Para cada transação executada um arquivo JSON é criado dentro da pasta _delta_log. O arquivo inicial sempre será 000000000.json, contendo os commits da transação. Neste nosso cenário, este primeiro arquivo contém os commits da criação da tabela orders.


Para visualizar melhor, acesse a pasta local que provavelmente foi criada na raiz do seu projeto chamada spark-warehouse. Esta pasta foi criada pelo Hive para alocar os recursos criados desde os arquivos JSON e os parquets. Dentro dela terá uma estrutura de pastas conforme imagem abaixo:











Perceba que os arquivos são criados em ordem crescente a partir de cada transação executada.

Acesse cada arquivo JSON e verá cada transação que foi executado através do campo operation, além de outras informações.


00000000000000000000.json

"operation":"CREATE OR REPLACE TABLE"

00000000000000000001.json

"operation":"MERGE"

00000000000000000002.json

"operation":"UPDATE"

00000000000000000003.json

"operation":"DELETE"

Perceba também que os arquivos parquets foram gerados particionados em pastas pelo campo date_order. Utilizar partição visa consultas com melhores desempenho, mas não entraremos em detalhes neste post, futuramente falaremos mais sobre isso.


Espero que neste post foi possível esclarecer algumas dúvidas sobre o que é o Delta Lake e seu funcionamento. Até a próxima!

Posts recentes

Ver tudo