Schema Merging com Spark e Java
- JP

- 1 de fev. de 2021
- 2 min de leitura
Schema Merging é uma forma de evoluir schemas adicionando novas colunas através da junção de DataFrames. Imagine que você possui diferentes arquivos parquets com diferentes schemas, e que exista a necessidade em criar um novo schema a partir de todas as colunas destes variados parquets. Podemos resolver este problema em uma simples linha de código, conforme será mostrado a seguir.
A seguir vamos criar arquivos parquets com diferentes schemas através de arquivos JSON e em seguida, faremos o merge destes arquivos transformando em um único schema consolidado.
Arquivos JSON que usaremos como DataSource:
Arquivo user.json
{"id":1, "login": "Jonh", "age": 24}
{"id":2, "login": "Lucas", "age": 24}
{"id":3, "login": "Miesha", "age": 25}
{"id":4, "login": "Suzie", "age": 22}Arquivo address.json
{"id":1, "city": "Los Angeles", "country": "USA"}
{"id":2, "city": "New York", "country": "USA"}
{"id":3, "city": "San Louis Obispo", "country": "USA"}Criando o SparkSession
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();Criando os DataFrames
Dataset<Row> dfUser =
sparkSession.read().json("user.json");Dataset<Row> dfAddress =
sparkSession.read().json("address.json");Gerando os Parquets para cada DataFrame
No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 1
dfUser.write().parquet("data/table/partition=1");No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 2
dfAddress.write().parquet("data/table/partition=2");Agora, criaremos um novo DataFrame com base nos parquets criados anteriormente executando o Schema Merging
Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table");O Schema Merging é executado no seguinte trecho
option("mergeSchema", true)Por fim, podemos ver o novo schema mergeado através do mergeSchema
dfMerge.printSchema();Resultado
root
|-- age: long (nullable = true)
|-- id: long (nullable = true)
|-- login: string (nullable = true)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
|-- partition: integer (nullable = true)
Veja que a partir dos DataFrames dfUser e dfAddress foi criado um novo schema consolidando as colunas entre eles.
Código completo
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dfUser =
sparkSession.read().json("user.json");
dfUser.write().parquet("data/table/partition=1");
Dataset<Row> dfAddress =
sparkSession.read().json("address.json");
dfAddress.write().parquet("data/table/partition=2");
Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table");
dfMerge.printSchema();Curtiu? Espero que sim, até mais!



Comentários