• JP

Schema Merging com Spark e Java


Schema Merging é uma forma de evoluir schemas adicionando novas colunas através da junção de DataFrames. Imagine que você possui diferentes arquivos parquets com diferentes schemas, e que exista a necessidade em criar um novo schema a partir de todas as colunas destes variados parquets. Podemos resolver este problema em uma simples linha de código, conforme será mostrado a seguir.


A seguir vamos criar arquivos parquets com diferentes schemas através de arquivos JSON e em seguida, faremos o merge destes arquivos transformando em um único schema consolidado.


Arquivos JSON que usaremos como DataSource:


Arquivo user.json

{"id":1, "login": "Jonh", "age": 24}
{"id":2, "login": "Lucas", "age": 24}
{"id":3, "login": "Miesha", "age": 25}
{"id":4, "login": "Suzie", "age": 22}

Arquivo address.json

{"id":1, "city": "Los Angeles", "country": "USA"}
{"id":2, "city": "New York", "country": "USA"}
{"id":3, "city": "San Louis Obispo", "country": "USA"}

Criando o SparkSession

SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");

SparkSession sparkSession =
        new SparkSession.Builder()
                .config(sparkConf)
                .getOrCreate();

Criando os DataFrames

Dataset<Row> dfUser =
        sparkSession.read().json("user.json");
Dataset<Row> dfAddress =
        sparkSession.read().json("address.json");

Gerando os Parquets para cada DataFrame


No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 1

dfUser.write().parquet("data/table/partition=1");

No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 2

dfAddress.write().parquet("data/table/partition=2");

Agora, criaremos um novo DataFrame com base nos parquets criados anteriormente executando o Schema Merging

Dataset<Row> dfMerge = sparkSession
        .read().option("mergeSchema", true)
        .parquet("data/table");

O Schema Merging é executado no seguinte trecho

option("mergeSchema", true)

Por fim, podemos ver o novo schema mergeado através do mergeSchema

dfMerge.printSchema();

Resultado


root

|-- age: long (nullable = true)

|-- id: long (nullable = true)

|-- login: string (nullable = true)

|-- city: string (nullable = true)

|-- country: string (nullable = true)

|-- partition: integer (nullable = true)



Veja que a partir dos DataFrames dfUser e dfAddress foi criado um novo schema consolidando as colunas entre eles.



Código completo

SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");

SparkSession sparkSession =
        new SparkSession.Builder()
                .config(sparkConf)
                .getOrCreate();

Dataset<Row> dfUser =
        sparkSession.read().json("user.json");

dfUser.write().parquet("data/table/partition=1");

Dataset<Row> dfAddress =
        sparkSession.read().json("address.json");

dfAddress.write().parquet("data/table/partition=2");

Dataset<Row> dfMerge = sparkSession
        .read().option("mergeSchema", true)
        .parquet("data/table");

dfMerge.printSchema();

Curtiu? Espero que sim, até mais!

Posts recentes

Ver tudo