Introdução ao Apache Hive com Spark e Java
O Hive é um software de Data Warehouse que possibilita a leitura, escrita e o gerenciamento de dados distribuídos e permite a utilização de SQL em consultas estruturadas. Vamos utilizar o contexto do Spark para a configuração inicial, mas é possível fazer de outras formas sem a utilização do Spark. Maven < dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-core_2.12</ artifactId >
< version >2.4.5</ version >
</ dependency >
< dependency >
< groupId >org.apache.spark</ groupId >
< artifactId >spark-hive_2.12</ artifactId >
< version >2.4.5</ version >
</ dependency > O primeiro passo é criar a configuração do contexto inicial: String dwDir = new File( "warehouse-dir" ).getAbsolutePath();
SparkConf sparkConf = new SparkConf()
.set( "spark.sql.warehouse.dir" , wareHouseDir);
SparkSession sparkSession = SparkSession. builder ()
.config(sparkConf)
.enableHiveSupport()
.master( "local[1]" )
.getOrCreate(); Entendendo as configurações acima: 1 . Neste trecho a variável dwDir recebe o caminho da pasta warehouse-dir que será utilizada nas configurações do Spark. Até então essa pasta não foi criada. String dwDir = new File( "warehouse-dir" ).getAbsolutePath(); 2 . No próximo trecho é setado a parâmetro spark.sql.warehouse.dir no contexto do Spark com o caminho do diretório referenciado no primeiro trecho. Dessa forma o Spark usará este diretório como o repositório dos bancos que serão criados. SparkConf sparkConf = new SparkConf()
.set( "spark.sql.warehouse.dir" , wareHouseDir); 3 . E por fim, a criação do SparkSession onde estão as configurações criadas anteriormente, o local onde o master será executado (localmente - local[1] ) e a ativação do Hive. SparkSession sparkSession = SparkSession. builder ()
.config(sparkConf)
.enableHiveSupport()
.master( "local[1]" )
.getOrCreate(); 4 . Agora podemos executar algumas operações de DML e DDL 4.1. Criando um banco de dados sparkSession.sql( "CREATE DATABASE IF NOT EXISTS hive_tutorial" ); 4.2. Criando os Parquets Neste passo iremos criar uma tabela já apontando para o DataSource. Neste caso o DataSource será um Parquet. O Parquet é um arquivo de formato colunar que provê um melhor desempenho nas consultas. Para isso vamos criar um Parquet a partir de um JSON com seguinte o conteúdo: Arquivo data/ pessoa.json { "id" :1, "nome": "Joao" ,"idade" :12}
{ "id" :2, "nome": "Kelly" ,"idade" :21}
{ "id" :3, "nome": "Monica" ,"idade" :29}
{ "id" :4, "nome": "Laura" ,"idade" :32}
{ "id" :5, "nome": "Kiko" ,"idade" :23}
{ "id" :6, "nome": "Miguel" ,"idade" :55}
{ "id" :7, "nome": "Junior" ,"idade" :25}
{ "id" :8, "nome": "Luis" ,"idade" :36} Executando a leitura do arquivo para um DataFrame. Dataset<Row> df = sparkSession.read().json( "data/pessoa.json" ); Criando os arquivos Parquet com base no DataFrame dentro do diretório data/parquet do seu projeto df.write().parquet( "data/parquet/" ) Veja que os arquivos foram criados Pronto, agora temos um Data Source criado. 4.3. Criando a Tabela Segue os passos: sparkSession.sql( "USE hive_tutorial" ); Após selecionar o banco HIVE_TUTORIAL. O comando CREATE TABLE possui alguns argumentos extras, segue: STORED AS PARQUET : É um argumento que o Hive utilizará para saber que tipo de arquivo será usado na conversão, neste caso o Parquet. LOCATION: Diretório do Data Source criado anteriormente. sparkSession.sql( "CREATE TABLE IF NOT EXISTS pessoa " +
"(id BIGINT, nome STRING, idade BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/'" ); É possível verificar a tabela criada executando o trecho abaixo: sparkSession.sql( "SHOW TABLES" ).show(); Independente do fim da execução do programa, a tabela será mantida. Diferente de uma view criada com SparkSQL que é somente mantida em memória. 5. Exemplos de consultas Selecione o banco de dados sparkSession.sql( "USE hive_tutorial" ); Exemplo 1 sparkSession.sql( "SELECT id, nome, idade " +
"FROM hive_tutorial.pessoa " +
"WHERE idade between 10 and 30 " +
"ORDER BY nome desc " ).show(); Resultado Exemplo 2 sparkSession.sql( "SELECT count(nome) " +
"FROM hive_tutorial.pessoa " +
"WHERE idade > 45 " ).show(); Resultado 6. Exemplos de consultas mais complexas Agora vamos criar duas novas tabelas para explorar melhor os recursos do Hive. Crie o arquivo JSON data/produto.json { "id" :1, "desc": "video game" ,"preco" :1800.0, "tipo": "eletronico" }
{ "id" :2, "desc": "geladeira" ,"preco" :1600.0, "tipo": "eletronico" }
{ "id" :3, "desc": "cama" ,"preco" :2000.0, "tipo": "quarto" }
{ "id" :4, "desc": "armário" ,"preco" :750.0, "tipo": "sala" }
{ "id" :5, "desc": "notebook" ,"preco" :4500.0, "tipo": "eletronico" }
{ "id" :6, "desc": "mesa" ,"preco" :2500.0, "tipo": "sala" }
{ "id" :7, "desc": "cadeira" ,"preco" :110.0, "tipo": "sala" }
{ "id" :8, "desc": "TV" ,"preco" :1500.0, "tipo": "eletronico" }
{ "id" :9, "desc": "fogão" ,"preco" :900.0, "tipo": "cozinha" } Crie os parquets para Produto Dataset<Row> dfP = sparkSession.read().json( "data/produto.json" );
dfProd.write().parquet( "data/parquet/produto/" ); Crie o arquivo JSON data/item.json { "id" :1, "id_produto" :2, "qtde" :1}
{ "id" :2, "id_produto" :1, "qtde" :2}
{ "id" :3, "id_produto" :3, "qtde" :3}
{ "id" :4, "id_produto" :4, "qtde" :2}
{ "id" :5, "id_produto" :5, "qtde" :5} Crie os parquets para Item Dataset<Row> dfItem = sparkSession.read().json( "data/item.json" );
dfItem.write().parquet( "data/parquet/item/" ); Com base nos parquets criado, agora vamos criar a tabelas Produto e Item sparkSession.sql( "USE hive_tutorial" );
sparkSession.sql( "CREATE TABLE IF NOT EXISTS produto " +
"(id BIGINT, desc STRING, " +
"preco BIGINT, " +
"tipo STRING) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/produto'" );
sparkSession.sql( "CREATE TABLE IF NOT EXISTS item " +
"(id BIGINT, " +
"id_produto BIGINT, " +
"qtde BIGINT) " +
"STORED AS PARQUET " +
"LOCATION 'data/parquet/item/'" ); Tabelas criadas sparkSession.sql( "SHOW TABLES" ).show(); Consultas utilizando JOIN Exemplo 1 sparkSession.sql( "SELECT prod.id, " +
"prod.desc, " +
"prod.preco, " +
"prod.tipo, " +
"item.qtde " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"order by prod.id " ).show(); Resultado Exemplo 2 sparkSession.sql( " SELECT " +
"prod.tipo, " +
"sum(item.qtde) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo" ).show(); Resultado Exemplo 3 sparkSession.sql( " SELECT " +
"prod.tipo, " +
"sum(item.qtde), " +
"sum(item.qtde * prod.preco) " +
"FROM produto prod inner join item item " +
"on (prod.id = item.id_produto) " +
"group by prod.tipo" ).show(); Resultado Pra finalizar, dê uma olhada na documentação oficial para mais detalhes: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html https://hive.apache.org/ É isso, espero ter ajudado!