Consultas com Spark SQL
Spark SQL faz parte do core do Apache Spark e permite consultas estruturadas dentro do contexto Spark utilizando SQL. Com Spark SQL é possível conectar em diversos datasources como Avro, ORC, JSON, Parquet e dentre outros. Neste tutorial vamos utilizar como datasource um arquivo JSON para mostrar os poderosos recursos do Spark SQL. Maven < dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-core_2.12</ artifactId >
< version >3.1.0</ version >
</ dependency >
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-sql_2.12</ artifactId >
< version >3.1.0</ version >
</ dependency >
Configurar o contexto Spark No passo seguinte, criaremos o SparkSession. Pense que neste ponto você pode reutiliza-lo em sua aplicação, então pense em um classe Singleton para alocar este objeto na inicialização. No exemplo a seguir não farei isso para que fique mais prático e simples. public class SparkSql {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "spark-sql-app" );
sparkConf.setMaster( "local[1]" );
SparkSession session = SparkSession
. builder ()
.config(sparkConf)
.getOrCreate();
}
} O objeto SparkConf é responsável pela configuração da Session, perceba que é um objeto simples com os atributos appName e master . O atributo master é uma configuração especifica caso a aplicação execute em um cluster, como nesse exemplo é local, então o valor [1] é o suficiente para a execução e por fim o appName é o nome da aplicação. Listagem 1 - Select simples Conteúdo do arquivo produto.json {"id":1, "nome":"arroz", "preco":12.0, "qtde": 50}
{"id":2, "nome":"feijao", "preco":7.50, "qtde": 30}
{"id":3, "nome":"coca-cola", "preco":5.50, "qtde": 150}
{"id":4, "nome":"suco", "preco":3.80, "qtde": 250}
{"id":5, "nome":"milho", "preco":1.50, "qtde": 33}
{"id":6, "nome":"yogurte", "preco":6.0, "qtde": 15}
{"id":7, "nome":"leite", "preco":3.70, "qtde": 250}
{"id":8, "nome":"oleo", "preco":5.60, "qtde": 100} public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "spark-sql-app" );
sparkConf.setMaster( "local[1]" );
SparkSession session = SparkSession
. builder ()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dataFrame = session.read().json( "produto.json" );
dataFrame.createOrReplaceTempView( "produto" );
Dataset<Row> sqlFrame = session.sql( "select * from produto" );
sqlFrame.show();
} Neste exemplo acima é uma forma simples de listar todo o conteúdo do arquivo através de um DataFrame. Um DataFrame é basicamente uma coleção de dados distribuídos que se assemelha bastante com uma tabela relacional. Neste trecho é criado uma view temporária com base no Dataframe que foi carregado pela sessão. dataFrame.createOrReplaceTempView( "produto" ); No próximo trecho é executado uma consulta SQL simples com base na view criada anteriormente. Dataset<Row> sqlFrame = session.sql( "select * from produto" ); Por fim é executado o método .show() que é uma ação do Spark. Este método lista todos os registros da coleção. É possível passar como argumento neste método a quantidade de registros para listagem, o valor padrão é de 20 registros. sqlFrame.show(); Resultado da execução
Listagem 2 - Cláusula Where public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "spark-sql-app" );
sparkConf.setMaster( "local[1]" );
SparkSession session = SparkSession
. builder ()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dataFrame = session.read().json( "produto.json" );
dataFrame.createOrReplaceTempView( "produto" );
Dataset<Row> sqlFrame = session.sql( "select nome, preco " +
"from produto " +
"where preco >= 5.0" );
sqlFrame.show();
} Resultado da execução Listagem 3 - Between public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "spark-sql-app" );
sparkConf.setMaster( "local[1]" );
SparkSession session = SparkSession
. builder ()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dataFrame = session.read().json( "produto.json" );
dataFrame.createOrReplaceTempView( "produto" );
Dataset<Row> sqlFrame = session.sql( "select " +
"nome, preco, qtde " +
"from produto " +
"where qtde between 10 and 50 " );
sqlFrame.show();
} Resultado da execução Listagem 4 - Sum public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "spark-sql-app" );
sparkConf.setMaster( "local[1]" );
SparkSession session = SparkSession
. builder ()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dataFrame = session.read().json( "produto.json" );
dataFrame.createOrReplaceTempView( "produto" );
Dataset<Row> sqlFrame = session.sql( "select " +
"sum(preco * qtde) as total " +
"from produto " +
"where qtde > 100 " );
sqlFrame.show();
} Resultado da execução Conteúdo do arquivo produto.json alterado {"id":1,"nome":"arroz","preco":12.0,"qtde":50,"tipo":"sólido"}
{"id":2,"nome":"feijao","preco":7.50,"qtde":30,"tipo":"sólido"}
{"id":3,"nome":"coca","preco":5.50,"qtde":150,"tipo":"líquido"}
{"id":4,"nome":"suco","preco":3.80,"qtde":250,"tipo":"líquido"}
{"id":5,"nome":"milho","preco":1.50,"qtde":33,"tipo":"sólido"}
{"id":6,"nome":"yogurte","preco":6.0,"qtde":15,"tipo":"líquido"}
{"id":7,"nome":"leite","preco":3.70,"qtde":250,"tipo":"líquido"}
{"id":8,"nome":"oleo","preco":5.60,"qtde":100,"tipo":"líquido"}
Listagem 5 - Count + group by + having public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName( "spark-sql-app" );
sparkConf.setMaster( "local[1]" );
SparkSession session = SparkSession
. builder ()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dataFrame = session.read().json( "produto.json" );
dataFrame.createOrReplaceTempView( "produto" );
Dataset<Row> sqlFrame = session.sql( "select " +
"tipo, count(tipo) as qtde" +
"from produto " +
"group by tipo " +
"having (tipo = 'sólido') " );
sqlFrame.show();
} Resultado da execução Tentei mostrar alguns exemplos simples de como utilizar o Spark SQL e o que ele é capaz, espero ter ajudado no entendimento e até mais. Documentação: https://spark.apache.org/sql/ Github: https://github.com/apache/spark