Consultas com Spark SQL

Spark SQL faz parte do core do Apache Spark e permite consultas estruturadas dentro do contexto Spark utilizando SQL. Com Spark SQL é possível conectar em diversos datasources como Avro, ORC, JSON, Parquet e dentre outros. Neste tutorial vamos utilizar como datasource um arquivo JSON para mostrar os poderosos recursos do Spark SQL. Maven <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.0</version>
</dependency> Configurar o contexto Spark No passo seguinte, criaremos o SparkSession. Pense que neste ponto você pode reutiliza-lo em sua aplicação, então pense em um classe Singleton para alocar este objeto na inicialização. No exemplo a seguir não farei isso para que fique mais prático e simples. public class SparkSql {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-sql-app");
sparkConf.setMaster("local[1]");

SparkSession session = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
}
} O objeto SparkConf é responsável pela configuração da Session, perceba que é um objeto simples com os atributos appName e master. O atributo master é uma configuração especifica caso a aplicação execute em um cluster, como nesse exemplo é local, então o valor [1] é o suficiente para a execução e por fim o appName é o nome da aplicação. Listagem 1 - Select simples Conteúdo do arquivo produto.json {"id":1, "nome":"arroz", "preco":12.0, "qtde": 50}
{"id":2, "nome":"feijao", "preco":7.50, "qtde": 30}
{"id":3, "nome":"coca-cola", "preco":5.50, "qtde": 150}
{"id":4, "nome":"suco", "preco":3.80, "qtde": 250}
{"id":5, "nome":"milho", "preco":1.50, "qtde": 33}
{"id":6, "nome":"yogurte", "preco":6.0, "qtde": 15}
{"id":7, "nome":"leite", "preco":3.70, "qtde": 250}
{"id":8, "nome":"oleo", "preco":5.60, "qtde": 100} public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-sql-app");
sparkConf.setMaster("local[1]");

SparkSession session = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Dataset<Row> dataFrame = session.read().json("produto.json");
dataFrame.createOrReplaceTempView("produto");

Dataset<Row> sqlFrame = session.sql("select * from produto");
sqlFrame.show();
} Neste exemplo acima é uma forma simples de listar todo o conteúdo do arquivo através de um DataFrame. Um DataFrame é basicamente uma coleção de dados distribuídos que se assemelha bastante com uma tabela relacional. Neste trecho é criado uma view temporária com base no Dataframe que foi carregado pela sessão. dataFrame.createOrReplaceTempView("produto"); No próximo trecho é executado uma consulta SQL simples com base na view criada anteriormente. Dataset<Row> sqlFrame = session.sql("select * from produto"); Por fim é executado o método .show() que é uma ação do Spark. Este método lista todos os registros da coleção. É possível passar como argumento neste método a quantidade de registros para listagem, o valor padrão é de 20 registros. sqlFrame.show(); Resultado da execução Listagem 2 - Cláusula Where public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-sql-app");
sparkConf.setMaster("local[1]");

SparkSession session = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Dataset<Row> dataFrame = session.read().json("produto.json");
dataFrame.createOrReplaceTempView("produto");

Dataset<Row> sqlFrame = session.sql("select nome, preco " +
"from produto " +
"where preco >= 5.0");

sqlFrame.show();
} Resultado da execução Listagem 3 - Between public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-sql-app");
sparkConf.setMaster("local[1]");

SparkSession session = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Dataset<Row> dataFrame = session.read().json("produto.json");
dataFrame.createOrReplaceTempView("produto");
Dataset<Row> sqlFrame = session.sql("select " +
"nome, preco, qtde " +
"from produto " +
"where qtde between 10 and 50 ");

sqlFrame.show();
} Resultado da execução Listagem 4 - Sum public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-sql-app");
sparkConf.setMaster("local[1]");

SparkSession session = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Dataset<Row> dataFrame = session.read().json("produto.json");
dataFrame.createOrReplaceTempView("produto");

Dataset<Row> sqlFrame = session.sql("select " +
"sum(preco * qtde) as total " +
"from produto " +
"where qtde > 100 ");

sqlFrame.show();
} Resultado da execução Conteúdo do arquivo produto.json alterado {"id":1,"nome":"arroz","preco":12.0,"qtde":50,"tipo":"sólido"}
{"id":2,"nome":"feijao","preco":7.50,"qtde":30,"tipo":"sólido"}
{"id":3,"nome":"coca","preco":5.50,"qtde":150,"tipo":"líquido"}
{"id":4,"nome":"suco","preco":3.80,"qtde":250,"tipo":"líquido"}
{"id":5,"nome":"milho","preco":1.50,"qtde":33,"tipo":"sólido"}
{"id":6,"nome":"yogurte","preco":6.0,"qtde":15,"tipo":"líquido"}
{"id":7,"nome":"leite","preco":3.70,"qtde":250,"tipo":"líquido"}
{"id":8,"nome":"oleo","preco":5.60,"qtde":100,"tipo":"líquido"} Listagem 5 - Count + group by + having public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("spark-sql-app");
sparkConf.setMaster("local[1]");

SparkSession session = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Dataset<Row> dataFrame = session.read().json("produto.json");
dataFrame.createOrReplaceTempView("produto");

Dataset<Row> sqlFrame = session.sql("select " +
"tipo, count(tipo) as qtde" +
"from produto " +
"group by tipo " +
"having (tipo = 'sólido') ");

sqlFrame.show();
} Resultado da execução Tentei mostrar alguns exemplos simples de como utilizar o Spark SQL e o que ele é capaz, espero ter ajudado no entendimento e até mais. Documentação: https://spark.apache.org/sql/ Github: https://github.com/apache/spark

Consultas com Spark SQL