O Hive é um software de Data Warehouse que possibilita a leitura, escrita e o gerenciamento de dados distribuídos e permite a utilização de SQL em consultas estruturadas.
Vamos utilizar o contexto do Spark para a configuração inicial, mas é possível fazer de outras formas sem a utilização do Spark.
Maven
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.5</version>
</dependency>
O primeiro passo é criar a configuração do contexto inicial:
String dwDir = new File("warehouse-dir").getAbsolutePath();
SparkConf sparkConf = new SparkConf()
.set("spark.sql.warehouse.dir", wareHouseDir);
SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.master("local[1]")
.getOrCreate();
Entendendo as configurações acima:
1. Neste trecho a variável dwDir recebe o caminho da pasta warehouse-dir que será utilizada nas configurações do Spark. Até então essa pasta não foi criada.
String dwDir = new File("warehouse-dir").getAbsolutePath();
2. No próximo trecho é setado a parâmetro spark.sql.warehouse.dir no contexto do Spark com o caminho do diretório referenciado no primeiro trecho. Dessa forma o Spark usará este diretório como o repositório dos bancos que serão criados.
SparkConf sparkConf = new SparkConf()
.set("spark.sql.warehouse.dir", wareHouseDir);
3. E por fim, a criação do SparkSession onde estão as configurações criadas anteriormente, o local onde o master será executado (localmente - local[1]) e a ativação do Hive.
SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.master("local[1]")
.getOrCreate();
4. Agora podemos executar algumas operações de DML e DDL
4.1. Criando um banco de dados
sparkSession.sql("CREATE DATABASE IF NOT EXISTS hive_tutorial");
4.2. Criando os Parquets
Neste passo iremos criar uma tabela já apontando para o DataSource. Neste caso o DataSource será um Parquet. O Parquet é um arquivo de formato colunar que provê um melhor desempenho nas consultas.
Para isso vamos criar um Parquet a partir de um JSON com seguinte o conteúdo:
Arquivo data/pessoa.json
{"id":1,"nome":"Joao","idade":12}
{"id":2,"nome":"Kelly","idade":21}
{"id":3,"nome":"Monica","idade":29}
{"id":4,"nome":"Laura","idade":32}
{"id":5,"nome":"Kiko","idade":23}
{"id":6,"nome":"Miguel","idade":55}
{"id":7,"nome":"Junior","idade":25}
{"id":8,"nome":"Luis","idade":36}
Executando a leitura do arquivo para um DataFrame.
Dataset<Row> df = sparkSession.read().json("data/pessoa.json");
Criando os arquivos Parquet com base no DataFrame dentro do diretório data/parquet do seu projeto
df.write().parquet("data/parquet/")
Veja que os arquivos foram criados
Pronto, agora temos um Data Source criado.
4.3. Criando a Tabela
Segue os passos:
sparkSession.sql("USE hive_tutorial");
Após selecionar o banco HIVE_TUTORIAL. O comando CREATE TABLE possui alguns argumentos extras, segue:
STORED AS PARQUET : É um argumento que o Hive utilizará para saber que tipo de arquivo será usado na conversão, neste caso o Parquet.
LOCATION: Diretório do Data Source criado anteriormente.
sparkSession.sql("CREATE TABLE IF NOT EXISTS pessoa " +
"(id BIGINT, nome STRING, idade BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/'");
É possível verificar a tabela criada executando o trecho abaixo:
sparkSession.sql("SHOW TABLES").show();
Independente do fim da execução do programa, a tabela será mantida. Diferente de uma view criada com SparkSQL que é somente mantida em memória.
5. Exemplos de consultas
Selecione o banco de dados
sparkSession.sql("USE hive_tutorial");
Exemplo 1
sparkSession.sql("SELECT id, nome, idade " +
"FROM hive_tutorial.pessoa " +
"WHERE idade between 10 and 30 " +
"ORDER BY nome desc ").show();
Resultado
Exemplo 2
sparkSession.sql("SELECT count(nome) " +
"FROM hive_tutorial.pessoa " +
"WHERE idade > 45 ").show();
Resultado
6. Exemplos de consultas mais complexas
Agora vamos criar duas novas tabelas para explorar melhor os recursos do Hive.
Crie o arquivo JSON data/produto.json
{"id":1,"desc":"video game","preco":1800.0,"tipo":"eletronico"}
{"id":2,"desc":"geladeira","preco":1600.0,"tipo":"eletronico"}
{"id":3,"desc":"cama","preco":2000.0,"tipo":"quarto"}
{"id":4,"desc":"armário","preco":750.0,"tipo":"sala"}
{"id":5,"desc":"notebook","preco":4500.0,"tipo":"eletronico"}
{"id":6,"desc":"mesa","preco":2500.0,"tipo":"sala"}
{"id":7,"desc":"cadeira","preco":110.0,"tipo":"sala"}
{"id":8,"desc":"TV","preco":1500.0,"tipo":"eletronico"}
{"id":9,"desc":"fogão","preco":900.0,"tipo":"cozinha"}
Crie os parquets para Produto
Dataset<Row> dfP = sparkSession.read().json("data/produto.json");
dfProd.write().parquet("data/parquet/produto/");
Crie o arquivo JSON data/item.json
{"id":1,"id_produto":2,"qtde":1}
{"id":2,"id_produto":1,"qtde":2}
{"id":3,"id_produto":3,"qtde":3}
{"id":4,"id_produto":4,"qtde":2}
{"id":5,"id_produto":5,"qtde":5}
Crie os parquets para Item
Dataset<Row> dfItem = sparkSession.read().json("data/item.json");
dfItem.write().parquet("data/parquet/item/");
Com base nos parquets criado, agora vamos criar a tabelas Produto e Item
sparkSession.sql("USE hive_tutorial");
sparkSession.sql("CREATE TABLE IF NOT EXISTS produto " +
"(id BIGINT, desc STRING, " +
"preco BIGINT, " +
"tipo STRING) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/produto'");
sparkSession.sql("CREATE TABLE IF NOT EXISTS item " +
"(id BIGINT, " +
"id_produto BIGINT, " +
"qtde BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/item/'");
Tabelas criadas
sparkSession.sql("SHOW TABLES").show();
Consultas utilizando JOIN
Exemplo 1
sparkSession.sql("SELECT prod.id, " +
"prod.desc, " +
"prod.preco, " +
"prod.tipo, " +
"item.qtde " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"order by prod.id ").show();
Resultado
Exemplo 2
sparkSession.sql(" SELECT " +
"prod.tipo, " +
"sum(item.qtde) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo").show();
Resultado
Exemplo 3
sparkSession.sql(" SELECT " +
"prod.tipo, " +
"sum(item.qtde), " +
"sum(item.qtde * prod.preco) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo").show();
Resultado
Pra finalizar, dê uma olhada na documentação oficial para mais detalhes:
É isso, espero ter ajudado!
Comentários