Acessando APIs e extraindo dados com Airflow
Introdução Airflow provê diferentes formas de se trabalhar com fluxos automatizados e uma das formas é a possibilidade de acesso a APIs externas utilizando operadores HTTP e extraindo os dados necessários. Mãos na massa Neste tutorial iremos criar uma DAG na qual acessará uma API externa e fará a extração dos dados diretamente para um arquivo local. Se for sua primeira vez utilizando Airflow, recomendo acessar este link para entender mais sobre o Airflow e como montar um ambiente. Criando a DAG Para este tutorial, criaremos uma DAG que será trigada a cada 1 hora (schedule_interval=" 0 * * * *" ) e acessará uma API externa extraindo alguns dados diretamente para um arquivo JSON local. Neste cenário usaremos o operador SimpleHttpOperator onde provê uma API com capacidade de executar requisições para APIs externas. Perceba que utilizamos dois operadores dentro de uma mesma DAG. O operador SimpleHttpOperator que provê formas de acesso a APIs externas que através do campo method definimos métodos HTTPs (GET, POST, PUT, DELETE). O campo endpoint permite especificar o endpoint da API, que no caso é products e por fim, o parâmetro http_conn_id , onde é necessário passar o identificador da conexão que será definida a seguir através da UI do Airflow. Conforme imagem abaixo, acesse o menu Admin > Connections Preencha os dados conforme imagem abaixo e salve em seguida. Sobre o operador PythonOperator , estamos utilizando apenas para executar uma função Python chamada _write_response utilizando XComs onde através da task_id da task write_response , é possível recuperar o resultado do response e utilizar em qualquer parte do código. Neste cenário estamos usando o resultado recuperado da API para escrever no arquivo. XCom é um mecanismo de comunicação entre diferente tasks que torna o Airflow bastante flexível. Muitas das vezes as tasks podem ser executadas em diferentes máquinas e com o uso de XComs, possibilita a comunicação e a troca de informações entre Tasks. Por fim, definimos a execução das tasks e suas dependências, veja que usamos o operador >> , que é basicamente definir a ordem da execução entre as tasks. No nosso caso, o acesso a API e extração deve ser executado antes da escrita no arquivo extract_data >> write_response . Após a execução da DAG, é possível acessar o arquivo que foi gerado com o resultado da extração, basta acessar via terminal um dos workers que no caso vamos ter somente um. Execute o seguinte comando abaixo para listar os containers: docker ps Uma listagem similar a esta abaixo será mostrada. Perceba que uma das linhas na coluna NAMES refere-se ao worker, no caso coffee_and_tips_airflow-worker_1 . Continuando no terminal, digite o seguinte comando para ter acesso ao diretório do Airflow onde arquivo extract_data.json se encontra. docker exec -it coffee_and_tips_airflow-worker_1 /bin/bash Pronto, agora é só abrir o arquivo e conferir o conteúdo. Conclusão Mais uma vez vimos o poder do Airflow para processos automatizados que requer acessos e integrações de APIs externas de forma fácil e com poucas linhas de código. Neste exemplo exploramos o uso de XComs que visa flexibilizar a troca de mensagens entre tasks que podem ser executadas em diferentes máquinas em um ambiente distribuído. Espero que tenha curtido!