Schema Merging com Spark e Java
Schema Merging é uma forma de evoluir schemas adicionando novas colunas através da junção de DataFrames. Imagine que você possui diferentes arquivos parquets com diferentes schemas, e que exista a necessidade em criar um novo schema a partir de todas as colunas destes variados parquets. Podemos resolver este problema em uma simples linha de código, conforme será mostrado a seguir. A seguir vamos criar arquivos parquets com diferentes schemas através de arquivos JSON e em seguida, faremos o merge destes arquivos transformando em um único schema consolidado. Arquivos JSON que usaremos como DataSource: Arquivo user.json { "id" :1, "login" : "Jonh" , "age" : 24}
{ "id" :2, "login" : "Lucas" , "age" : 24}
{ "id" :3, "login" : "Miesha" , "age" : 25}
{ "id" :4, "login" : "Suzie" , "age" : 22} Arquivo address.json { "id" :1, "city" : "Los Angeles" , "country" : "USA" }
{ "id" :2, "city" : "New York" , "country" : "USA" }
{ "id" :3, "city" : "San Louis Obispo" , "country" : "USA" } Criando o SparkSession SparkConf sparkConf = new SparkConf();
sparkConf.setMaster( "local[1]" );
sparkConf.setAppName( "app" );
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate(); Criando os DataFrames Dataset<Row> dfUser =
sparkSession.read().json( "user.json" ); Dataset<Row> dfAddress =
sparkSession.read().json( "address.json" ); Gerando os Parquets para cada DataFrame No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 1 dfUser.write().parquet( "data/table/partition=1" ); No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 2 dfAddress.write().parquet( "data/table/partition=2" ); Agora, criaremos um novo DataFrame com base nos parquets criados anteriormente executando o Schema Merging Dataset<Row> dfMerge = sparkSession
.read().option( "mergeSchema" , true )
.parquet( "data/table" ); O Schema Merging é executado no seguinte trecho option( "mergeSchema" , true ) Por fim, podemos ver o novo schema mergeado através do mergeSchema dfMerge.printSchema(); Resultado root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- login: string (nullable = true) |-- city: string (nullable = true) |-- country: string (nullable = true) |-- partition: integer (nullable = true) Veja que a partir dos DataFrames dfUser e dfAddress foi criado um novo schema consolidando as colunas entre eles. Código completo SparkConf sparkConf = new SparkConf();
sparkConf.setMaster( "local[1]" );
sparkConf.setAppName( "app" );
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dfUser =
sparkSession.read().json( "user.json" );
dfUser.write().parquet( "data/table/partition=1" );
Dataset<Row> dfAddress =
sparkSession.read().json( "address.json" );
dfAddress.write().parquet( "data/table/partition=2" );
Dataset<Row> dfMerge = sparkSession
.read().option( "mergeSchema" , true )
.parquet( "data/table" );
dfMerge.printSchema(); Curtiu? Espero que sim, até mais!