Introdução ao Apache Hive com Spark e Java

O Hive é um software de Data Warehouse que possibilita a leitura, escrita e o gerenciamento de dados distribuídos e permite a utilização de SQL em consultas estruturadas. Vamos utilizar o contexto do Spark para a configuração inicial, mas é possível fazer de outras formas sem a utilização do Spark. Maven <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.5</version>
</dependency> O primeiro passo é criar a configuração do contexto inicial: String dwDir = new File("warehouse-dir").getAbsolutePath();

SparkConf sparkConf = new SparkConf()
.set("spark.sql.warehouse.dir", wareHouseDir);

SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.master("local[1]")
.getOrCreate(); Entendendo as configurações acima: 1. Neste trecho a variável dwDir recebe o caminho da pasta warehouse-dir que será utilizada nas configurações do Spark. Até então essa pasta não foi criada. String dwDir = new File("warehouse-dir").getAbsolutePath(); 2. No próximo trecho é setado a parâmetro spark.sql.warehouse.dir no contexto do Spark com o caminho do diretório referenciado no primeiro trecho. Dessa forma o Spark usará este diretório como o repositório dos bancos que serão criados. SparkConf sparkConf = new SparkConf()
.set("spark.sql.warehouse.dir", wareHouseDir); 3. E por fim, a criação do SparkSession onde estão as configurações criadas anteriormente, o local onde o master será executado (localmente - local[1]) e a ativação do Hive. SparkSession sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.master("local[1]")
.getOrCreate(); 4. Agora podemos executar algumas operações de DML e DDL 4.1. Criando um banco de dados sparkSession.sql("CREATE DATABASE IF NOT EXISTS hive_tutorial"); 4.2. Criando os Parquets Neste passo iremos criar uma tabela já apontando para o DataSource. Neste caso o DataSource será um Parquet. O Parquet é um arquivo de formato colunar que provê um melhor desempenho nas consultas. Para isso vamos criar um Parquet a partir de um JSON com seguinte o conteúdo: Arquivo data/pessoa.json {"id":1,"nome":"Joao","idade":12}
{"id":2,"nome":"Kelly","idade":21}
{"id":3,"nome":"Monica","idade":29}
{"id":4,"nome":"Laura","idade":32}
{"id":5,"nome":"Kiko","idade":23}
{"id":6,"nome":"Miguel","idade":55}
{"id":7,"nome":"Junior","idade":25}
{"id":8,"nome":"Luis","idade":36} Executando a leitura do arquivo para um DataFrame. Dataset<Row> df = sparkSession.read().json("data/pessoa.json"); Criando os arquivos Parquet com base no DataFrame dentro do diretório data/parquet do seu projeto df.write().parquet("data/parquet/") Veja que os arquivos foram criados Pronto, agora temos um Data Source criado. 4.3. Criando a Tabela Segue os passos: sparkSession.sql("USE hive_tutorial"); Após selecionar o banco HIVE_TUTORIAL. O comando CREATE TABLE possui alguns argumentos extras, segue: STORED AS PARQUET : É um argumento que o Hive utilizará para saber que tipo de arquivo será usado na conversão, neste caso o Parquet. LOCATION: Diretório do Data Source criado anteriormente. sparkSession.sql("CREATE TABLE IF NOT EXISTS pessoa " +
"(id BIGINT, nome STRING, idade BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/'"); É possível verificar a tabela criada executando o trecho abaixo: sparkSession.sql("SHOW TABLES").show(); Independente do fim da execução do programa, a tabela será mantida. Diferente de uma view criada com SparkSQL que é somente mantida em memória. 5. Exemplos de consultas Selecione o banco de dados sparkSession.sql("USE hive_tutorial"); Exemplo 1 sparkSession.sql("SELECT id, nome, idade " +
"FROM hive_tutorial.pessoa " +
"WHERE idade between 10 and 30 " +
"ORDER BY nome desc ").show(); Resultado Exemplo 2 sparkSession.sql("SELECT count(nome) " +
"FROM hive_tutorial.pessoa " +
"WHERE idade > 45 ").show(); Resultado 6. Exemplos de consultas mais complexas Agora vamos criar duas novas tabelas para explorar melhor os recursos do Hive. Crie o arquivo JSON data/produto.json {"id":1,"desc":"video game","preco":1800.0,"tipo":"eletronico"}
{"id":2,"desc":"geladeira","preco":1600.0,"tipo":"eletronico"}
{"id":3,"desc":"cama","preco":2000.0,"tipo":"quarto"}
{"id":4,"desc":"armário","preco":750.0,"tipo":"sala"}
{"id":5,"desc":"notebook","preco":4500.0,"tipo":"eletronico"}
{"id":6,"desc":"mesa","preco":2500.0,"tipo":"sala"}
{"id":7,"desc":"cadeira","preco":110.0,"tipo":"sala"}
{"id":8,"desc":"TV","preco":1500.0,"tipo":"eletronico"}
{"id":9,"desc":"fogão","preco":900.0,"tipo":"cozinha"} Crie os parquets para Produto Dataset<Row> dfP = sparkSession.read().json("data/produto.json");
dfProd.write().parquet("data/parquet/produto/"); Crie o arquivo JSON data/item.json {"id":1,"id_produto":2,"qtde":1}
{"id":2,"id_produto":1,"qtde":2}
{"id":3,"id_produto":3,"qtde":3}
{"id":4,"id_produto":4,"qtde":2}
{"id":5,"id_produto":5,"qtde":5} Crie os parquets para Item Dataset<Row> dfItem = sparkSession.read().json("data/item.json");
dfItem.write().parquet("data/parquet/item/"); Com base nos parquets criado, agora vamos criar a tabelas Produto e Item sparkSession.sql("USE hive_tutorial");

sparkSession.sql("CREATE TABLE IF NOT EXISTS produto " +
"(id BIGINT, desc STRING, " +
"preco BIGINT, " +
"tipo STRING) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/produto'");

sparkSession.sql("CREATE TABLE IF NOT EXISTS item " +
"(id BIGINT, " +
"id_produto BIGINT, " +
"qtde BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/item/'"); Tabelas criadas sparkSession.sql("SHOW TABLES").show(); Consultas utilizando JOIN Exemplo 1 sparkSession.sql("SELECT prod.id, " +
"prod.desc, " +
"prod.preco, " +
"prod.tipo, " +
"item.qtde " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"order by prod.id ").show(); Resultado Exemplo 2 sparkSession.sql(" SELECT " +
"prod.tipo, " +
"sum(item.qtde) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo").show(); Resultado Exemplo 3 sparkSession.sql(" SELECT " +
"prod.tipo, " +
"sum(item.qtde), " +
"sum(item.qtde * prod.preco) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo").show(); Resultado Pra finalizar, dê uma olhada na documentação oficial para mais detalhes: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html https://hive.apache.org/ É isso, espero ter ajudado!

Introdução ao Apache Hive com Spark e Java