• JP

Consultas com Spark SQL

Spark SQL faz parte do core do Apache Spark e permite consultas estruturadas dentro do contexto Spark utilizando SQL. Com Spark SQL é possível conectar em diversos datasources como Avro, ORC, JSON, Parquet e dentre outros.



Neste tutorial vamos utilizar como datasource um arquivo JSON para mostrar os poderosos recursos do Spark SQL.



Maven

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.1.0</version>
</dependency>

Configurar o contexto Spark


No passo seguinte, criaremos o SparkSession. Pense que neste ponto você pode reutiliza-lo em sua aplicação, então pense em um classe Singleton para alocar este objeto na inicialização. No exemplo a seguir não farei isso para que fique mais prático e simples.


public class SparkSql {

    public static void main(String[] args) {

        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("spark-sql-app");
        sparkConf.setMaster("local[1]");

        SparkSession session = SparkSession
                .builder()
                .config(sparkConf)
                .getOrCreate();
    }
}

O objeto SparkConf é responsável pela configuração da Session, perceba que é um objeto simples com os atributos appName e master. O atributo master é uma configuração especifica caso a aplicação execute em um cluster, como nesse exemplo é local, então o valor [1] é o suficiente para a execução e por fim o appName é o nome da aplicação.



Listagem 1 - Select simples


Conteúdo do arquivo produto.json

{"id":1, "nome":"arroz", "preco":12.0, "qtde": 50}
{"id":2, "nome":"feijao", "preco":7.50, "qtde": 30}
{"id":3, "nome":"coca-cola", "preco":5.50, "qtde": 150}
{"id":4, "nome":"suco", "preco":3.80, "qtde": 250}
{"id":5, "nome":"milho", "preco":1.50, "qtde": 33}
{"id":6, "nome":"yogurte", "preco":6.0, "qtde": 15}
{"id":7, "nome":"leite", "preco":3.70, "qtde": 250}
{"id":8, "nome":"oleo", "preco":5.60, "qtde": 100}
public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("spark-sql-app");
    sparkConf.setMaster("local[1]");

    SparkSession session = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();

    Dataset<Row> dataFrame = session.read().json("produto.json");
    dataFrame.createOrReplaceTempView("produto");
    
    Dataset<Row> sqlFrame = session.sql("select * from produto");
    sqlFrame.show();
}

Neste exemplo acima é uma forma simples de listar todo o conteúdo do arquivo através de um DataFrame.


Um DataFrame é basicamente uma coleção de dados distribuídos que se assemelha bastante com uma tabela relacional.


Neste trecho é criado uma view temporária com base no Dataframe que foi carregado pela sessão.

dataFrame.createOrReplaceTempView("produto");

No próximo trecho é executado uma consulta SQL simples com base na view criada anteriormente.

Dataset<Row> sqlFrame = session.sql("select * from produto");

Por fim é executado o método .show() que é uma ação do Spark. Este método lista todos os registros da coleção. É possível passar como argumento neste método a quantidade de registros para listagem, o valor padrão é de 20 registros.

sqlFrame.show();

Resultado da execução









Listagem 2 - Cláusula Where


public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("spark-sql-app");
    sparkConf.setMaster("local[1]");

    SparkSession session = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();

    Dataset<Row> dataFrame = session.read().json("produto.json");
    dataFrame.createOrReplaceTempView("produto");
    
    Dataset<Row> sqlFrame = session.sql("select nome, preco " +
            "from produto " +
            "where preco >= 5.0");
            
    sqlFrame.show();
}

Resultado da execução











Listagem 3 - Between


public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("spark-sql-app");
    sparkConf.setMaster("local[1]");

    SparkSession session = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();

    Dataset<Row> dataFrame = session.read().json("produto.json");
    dataFrame.createOrReplaceTempView("produto");
    Dataset<Row> sqlFrame = session.sql("select " +
            "nome, preco, qtde " +
            "from produto " +
            "where qtde between 10 and 50 ");

    sqlFrame.show();
}

Resultado da execução










Listagem 4 - Sum


public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("spark-sql-app");
    sparkConf.setMaster("local[1]");

    SparkSession session = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();

    Dataset<Row> dataFrame = session.read().json("produto.json");
    dataFrame.createOrReplaceTempView("produto");
    
    Dataset<Row> sqlFrame = session.sql("select " +
            "sum(preco * qtde) as total " +
            "from produto " +
            "where qtde > 100 ");

    sqlFrame.show();
}

Resultado da execução








Conteúdo do arquivo produto.json alterado

{"id":1,"nome":"arroz","preco":12.0,"qtde":50,"tipo":"sólido"}
{"id":2,"nome":"feijao","preco":7.50,"qtde":30,"tipo":"sólido"}
{"id":3,"nome":"coca","preco":5.50,"qtde":150,"tipo":"líquido"}
{"id":4,"nome":"suco","preco":3.80,"qtde":250,"tipo":"líquido"}
{"id":5,"nome":"milho","preco":1.50,"qtde":33,"tipo":"sólido"}
{"id":6,"nome":"yogurte","preco":6.0,"qtde":15,"tipo":"líquido"}
{"id":7,"nome":"leite","preco":3.70,"qtde":250,"tipo":"líquido"}
{"id":8,"nome":"oleo","preco":5.60,"qtde":100,"tipo":"líquido"}


Listagem 5 - Count + group by + having

public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("spark-sql-app");
    sparkConf.setMaster("local[1]");

    SparkSession session = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();

    Dataset<Row> dataFrame = session.read().json("produto.json");
    dataFrame.createOrReplaceTempView("produto");

    Dataset<Row> sqlFrame = session.sql("select " +
            "tipo, count(tipo) as qtde" +
            "from produto " +
            "group by tipo " +
            "having (tipo = 'sólido') ");

    sqlFrame.show();
}

Resultado da execução







Tentei mostrar alguns exemplos simples de como utilizar o Spark SQL e o que ele é capaz, espero ter ajudado no entendimento e até mais.


Documentação: https://spark.apache.org/sql/

Github: https://github.com/apache/spark


Posts recentes

Ver tudo