Schema Merging com Spark e Java

Schema Merging é uma forma de evoluir schemas adicionando novas colunas através da junção de DataFrames. Imagine que você possui diferentes arquivos parquets com diferentes schemas, e que exista a necessidade em criar um novo schema a partir de todas as colunas destes variados parquets. Podemos resolver este problema em uma simples linha de código, conforme será mostrado a seguir. A seguir vamos criar arquivos parquets com diferentes schemas através de arquivos JSON e em seguida, faremos o merge destes arquivos transformando em um único schema consolidado. Arquivos JSON que usaremos como DataSource: Arquivo user.json {"id":1, "login": "Jonh", "age": 24}
{"id":2, "login": "Lucas", "age": 24}
{"id":3, "login": "Miesha", "age": 25}
{"id":4, "login": "Suzie", "age": 22} Arquivo address.json {"id":1, "city": "Los Angeles", "country": "USA"}
{"id":2, "city": "New York", "country": "USA"}
{"id":3, "city": "San Louis Obispo", "country": "USA"} Criando o SparkSession SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");

SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate(); Criando os DataFrames Dataset<Row> dfUser =
sparkSession.read().json("user.json"); Dataset<Row> dfAddress =
sparkSession.read().json("address.json"); Gerando os Parquets para cada DataFrame No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 1 dfUser.write().parquet("data/table/partition=1"); No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 2 dfAddress.write().parquet("data/table/partition=2"); Agora, criaremos um novo DataFrame com base nos parquets criados anteriormente executando o Schema Merging Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table"); O Schema Merging é executado no seguinte trecho option("mergeSchema", true) Por fim, podemos ver o novo schema mergeado através do mergeSchema dfMerge.printSchema(); Resultado root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- login: string (nullable = true) |-- city: string (nullable = true) |-- country: string (nullable = true) |-- partition: integer (nullable = true) Veja que a partir dos DataFrames dfUser e dfAddress foi criado um novo schema consolidando as colunas entre eles. Código completo SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");

SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();

Dataset<Row> dfUser =
sparkSession.read().json("user.json");

dfUser.write().parquet("data/table/partition=1");

Dataset<Row> dfAddress =
sparkSession.read().json("address.json");

dfAddress.write().parquet("data/table/partition=2");

Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table");

dfMerge.printSchema(); Curtiu? Espero que sim, até mais!

Schema Merging com Spark e Java