• JP

Introdução ao Apache Hive com Spark e Java

O Hive é um software de Data Warehouse que possibilita a leitura, escrita e o gerenciamento de dados distribuídos e permite a utilização de SQL em consultas estruturadas.


Vamos utilizar o contexto do Spark para a configuração inicial, mas é possível fazer de outras formas sem a utilização do Spark.


Maven

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>2.4.5</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>2.4.5</version>
</dependency>

O primeiro passo é criar a configuração do contexto inicial:

String dwDir = new File("warehouse-dir").getAbsolutePath();

SparkConf sparkConf = new SparkConf()
        .set("spark.sql.warehouse.dir", wareHouseDir);

SparkSession sparkSession = SparkSession.builder()
        .config(sparkConf)
        .enableHiveSupport()
        .master("local[1]")
        .getOrCreate();

Entendendo as configurações acima:


1. Neste trecho a variável dwDir recebe o caminho da pasta warehouse-dir que será utilizada nas configurações do Spark. Até então essa pasta não foi criada.

String dwDir = new File("warehouse-dir").getAbsolutePath();

2. No próximo trecho é setado a parâmetro spark.sql.warehouse.dir no contexto do Spark com o caminho do diretório referenciado no primeiro trecho. Dessa forma o Spark usará este diretório como o repositório dos bancos que serão criados.

SparkConf sparkConf = new SparkConf()
        .set("spark.sql.warehouse.dir", wareHouseDir);

3. E por fim, a criação do SparkSession onde estão as configurações criadas anteriormente, o local onde o master será executado (localmente - local[1]) e a ativação do Hive.

SparkSession sparkSession = SparkSession.builder()
        .config(sparkConf)
        .enableHiveSupport()
        .master("local[1]")
        .getOrCreate();

4. Agora podemos executar algumas operações de DML e DDL


4.1. Criando um banco de dados

sparkSession.sql("CREATE DATABASE IF NOT EXISTS hive_tutorial");

4.2. Criando os Parquets


Neste passo iremos criar uma tabela já apontando para o DataSource. Neste caso o DataSource será um Parquet. O Parquet é um arquivo de formato colunar que provê um melhor desempenho nas consultas.


Para isso vamos criar um Parquet a partir de um JSON com seguinte o conteúdo:


Arquivo data/pessoa.json

{"id":1,"nome":"Joao","idade":12}
{"id":2,"nome":"Kelly","idade":21}
{"id":3,"nome":"Monica","idade":29}
{"id":4,"nome":"Laura","idade":32}
{"id":5,"nome":"Kiko","idade":23}
{"id":6,"nome":"Miguel","idade":55}
{"id":7,"nome":"Junior","idade":25}
{"id":8,"nome":"Luis","idade":36}

Executando a leitura do arquivo para um DataFrame.

Dataset<Row> df = sparkSession.read().json("data/pessoa.json");

Criando os arquivos Parquet com base no DataFrame dentro do diretório data/parquet do seu projeto

df.write().parquet("data/parquet/")

Veja que os arquivos foram criados





Pronto, agora temos um Data Source criado.


4.3. Criando a Tabela


Segue os passos:

sparkSession.sql("USE hive_tutorial");

Após selecionar o banco HIVE_TUTORIAL. O comando CREATE TABLE possui alguns argumentos extras, segue:

  1. STORED AS PARQUET : É um argumento que o Hive utilizará para saber que tipo de arquivo será usado na conversão, neste caso o Parquet.

  2. LOCATION: Diretório do Data Source criado anteriormente.

sparkSession.sql("CREATE TABLE IF NOT EXISTS pessoa " +
        "(id BIGINT, nome STRING, idade BIGINT) " +
        "STORED AS PARQUET " +
        "LOCATION 'data/parquet/'");

É possível verificar a tabela criada executando o trecho abaixo:

sparkSession.sql("SHOW TABLES").show();






Independente do fim da execução do programa, a tabela será mantida. Diferente de uma view criada com SparkSQL que é somente mantida em memória.


5. Exemplos de consultas


Selecione o banco de dados

sparkSession.sql("USE hive_tutorial");

Exemplo 1

sparkSession.sql("SELECT id, nome, idade " +
        "FROM hive_tutorial.pessoa " +
        "WHERE idade between 10 and 30 " +
        "ORDER BY nome desc ").show();

Resultado











Exemplo 2

sparkSession.sql("SELECT count(nome) " +
        "FROM hive_tutorial.pessoa " +
        "WHERE idade > 45 ").show();

Resultado







6. Exemplos de consultas mais complexas


Agora vamos criar duas novas tabelas para explorar melhor os recursos do Hive.


Crie o arquivo JSON data/produto.json

{"id":1,"desc":"video game","preco":1800.0,"tipo":"eletronico"}
{"id":2,"desc":"geladeira","preco":1600.0,"tipo":"eletronico"}
{"id":3,"desc":"cama","preco":2000.0,"tipo":"quarto"}
{"id":4,"desc":"armário","preco":750.0,"tipo":"sala"}
{"id":5,"desc":"notebook","preco":4500.0,"tipo":"eletronico"}
{"id":6,"desc":"mesa","preco":2500.0,"tipo":"sala"}
{"id":7,"desc":"cadeira","preco":110.0,"tipo":"sala"}
{"id":8,"desc":"TV","preco":1500.0,"tipo":"eletronico"}
{"id":9,"desc":"fogão","preco":900.0,"tipo":"cozinha"}

Crie os parquets para Produto

Dataset<Row> dfP = sparkSession.read().json("data/produto.json");
dfProd.write().parquet("data/parquet/produto/");

Crie o arquivo JSON data/item.json

{"id":1,"id_produto":2,"qtde":1}
{"id":2,"id_produto":1,"qtde":2}
{"id":3,"id_produto":3,"qtde":3}
{"id":4,"id_produto":4,"qtde":2}
{"id":5,"id_produto":5,"qtde":5}

Crie os parquets para Item

Dataset<Row> dfItem = sparkSession.read().json("data/item.json");
dfItem.write().parquet("data/parquet/item/");

Com base nos parquets criado, agora vamos criar a tabelas Produto e Item

sparkSession.sql("USE hive_tutorial");

sparkSession.sql("CREATE TABLE IF NOT EXISTS produto " +
        "(id BIGINT, desc STRING, " +
        "preco BIGINT, " +
        "tipo STRING) " +
        "STORED AS PARQUET " +
        "LOCATION 'data/parquet/produto'");
        
sparkSession.sql("CREATE TABLE IF NOT EXISTS item " +
        "(id BIGINT, " +
        "id_produto BIGINT, " +
        "qtde BIGINT) " +
        "STORED AS PARQUET " +
        "LOCATION 'data/parquet/item/'");        

Tabelas criadas

sparkSession.sql("SHOW TABLES").show();








Consultas utilizando JOIN


Exemplo 1

sparkSession.sql("SELECT prod.id, " +
        "prod.desc, " +
        "prod.preco, " +
        "prod.tipo, " +
        "item.qtde " +
        "FROM produto prod inner join item item " +
        "on (prod.id = item.id_produto) " +
        "order by prod.id ").show();

Resultado











Exemplo 2

sparkSession.sql(" SELECT " +
        "prod.tipo, " +
        "sum(item.qtde) " +
        "FROM produto prod inner join item item " +
        "on (prod.id = item.id_produto) " +
        "group by prod.tipo").show();

Resultado









Exemplo 3

sparkSession.sql(" SELECT " +
        "prod.tipo, " +
        "sum(item.qtde), " +
        "sum(item.qtde * prod.preco) " +
        "FROM produto prod inner join item item " +
        "on (prod.id = item.id_produto) " +
        "group by prod.tipo").show();

Resultado









Pra finalizar, dê uma olhada na documentação oficial para mais detalhes:

https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

https://hive.apache.org/


É isso, espero ter ajudado!

Posts recentes

Ver tudo