Introdução
Airflow tem sido uma das principais ferramentas de orquestração do mercado e muito falada no mundo Modern Data Stack, por ser uma ferramenta capaz de orquestrar workloads de dados através de ETLs ou ELTs. Mas na verdade, o Airflow não se resume somente a isso, ele pode ser aplicado em diversos casos de usos do dia a dia de um Engenheiro de Dados ou Software.
Neste Tutorial sobre Airflow para iniciantes, iremos apresentar o Airflow da maneira mais simples, sem a necessidade de saber ou criar ETLs.
Mas o que é o Airflow de fato?
O Apache Airflow é uma plataforma de orquestração de fluxo de trabalho amplamente utilizada para agendar, monitorar e gerenciar pipelines de dados. Ele possui vários componentes que trabalham juntos para fornecer suas funcionalidades.
Componentes do Airflow
DAG
O DAG (Directed Acyclic Graph) é o principal componente e a representação de fluxo de trabalho no Airflow. É composto por tarefas (tasks) e dependências entre elas. As tarefas são definidas como operadores (operators), como PythonOperator, BashOperator, SQLOperator e entre outros. O DAG define a ordem de execução das tarefas e as relações de dependências.
Webserver
O componente do Webserver fornece uma interface da web para interagir com o Airflow. Ele permite que você visualize, gerencie e monitore seus fluxos de trabalho, tarefas, DAGs e logs. O Webserver também permite a autenticação de usuários e o controle de acesso com base em funções.
Scheduler
O Scheduler é responsável por agendar a execução das tarefas de acordo com a definição do DAG. Ele verifica periodicamente se há tarefas pendentes para execução e atribui recursos disponíveis para a execução das tarefas no momento apropriado. O Scheduler também lida com a recuperação de falhas e o agendamento de retries de tarefas.
Executor
O Executor é responsável por executar as tarefas definidas nos DAGs. Existem diferentes tipos de executores disponíveis no Airflow, como LocalExecutor, CeleryExecutor, KubernetesExecutor e etc. Cada executor tem suas próprias configurações e comportamentos de execução.
Metadatabase
O Metadatabase é um banco de dados onde o Airflow armazena metadados sobre as tarefas, DAGs, execuções, agendamentos, entre outros. Ele é usado para rastrear o status das tarefas, registrar o histórico de execução e fornecer informações para o monitoramento e a visualização do fluxo de trabalho. É possível utilizar diversos outros bancos de dados para registrar o histórico como MySQL, Postgres e dentre outros.
Workers
Os Workers são os nós de execução em um ambiente distribuído. Eles recebem tarefas atribuídas pelo Scheduler e as executam. Os Workers podem ser dimensionados horizontalmente para lidar com pipelines de dados maiores ou para distribuir a carga de trabalho para vários recursos.
Plugins
Os Plugins são extensões do Airflow que permitem adicionar novos recursos e funcionalidades ao sistema. Eles podem incluir novos operadores, hooks, sensores, conexões com sistemas externos e entre outros. Os Plugins fornecem uma maneira de personalizar e estender as capacidades do Airflow para atender às necessidades específicas de um fluxo de trabalho.
Operadores
Operadores são basicamente a composição de um DAG. Entenda um operador como um bloco de código com responsabilidade própria. Pelo Airflow ser um orquestrador e executar um fluxo de trabalho, podemos ter diferentes tarefas a serem executadas como por exemplo, acessar uma API, enviar um e-mail, acessar uma tabela em um banco de dados e efetuar uma operação, executar um código Python ou até um comando Bash.
Para cada uma das tarefas acima, devemos usar um operador. A seguir, iremos abordar alguns dos principais operadores:
BashOperator
O BashOperator permite executar comandos Bash ou scripts diretamente no sistema operacional onde o Airflow está sendo executado. É útil para tarefas que envolvem a execução de scripts de shell, utilitários ou qualquer ação que possa ser realizada no terminal. Resumindo, quando precisamos abrir o terminal do nosso sistema e executar algum comando para manipular arquivos ou algo relacionado ao próprio sistema, porém dentro de um DAG, é este o operador a ser usado.
PythonOperator
O PythonOperator permite que você execute funções Python como tarefas no Airflow. Você pode escrever suas próprias funções Python personalizadas e usar o PythonOperator para chamar essas funções como parte do fluxo de trabalho.
DummyOperator
O DummyOperator é uma tarefa "falsa" que não realiza nenhuma ação. É útil para criar dependências e fluxos de trabalho complexos sem a necessidade de executar nenhuma ação real.
Sensor
Os Sensores são usados para aguardar a ocorrência de algum evento externo antes de continuar o fluxo de trabalho, pode funcionar como um listener (ouvinte). Por exemplo, o HttpSensor que é um tipo de Sensor, pode validar se uma API externa esteja ativa, caso esteja, o fluxo continua sendo executado. Não necessariamente é um operador HTTP que deveria retornar algo, mas somente um tipo de ouvinte.
HttpOperator
Diferente de um Sensor, o HttpOperator é usado para realizar solicitações HTTP, como GET, POST, PUT, DELETE e etc. Neste caso, permite interagir de forma mais completa com APIs internas ou externas.
SqlOperator
SqlOperator é o operador responsável por executar operações de DML e DDL em um banco de dados, ou seja, desde manipulações de dados como SELECTS, INSERTS, UPDATES até criação de tabelas, alteração e etc.
Executores
Os executores são responsáveis por executar as tarefas definidas em um fluxo de trabalho (DAG). Eles gerenciam a alocação e a execução das tarefas em tempo de execução, garantindo que cada tarefa seja executada de forma eficiente e confiável. O Airflow oferece diferentes tipos de executores, cada um com características e funcionalidades distintas, permitindo que você escolha o mais adequado para suas necessidades específicas. A seguir, abordaremos alguns dos principais executores:
LocalExecutor
O LocalExecutor é o executor padrão no Apache Airflow. Ele é projetado para ser usado em ambientes de desenvolvimento e teste onde a escalabilidade não é uma preocupação. O LocalExecutor executa as tarefas em threads separados no mesmo processo do Airflow. Essa abordagem é simples e eficiente para pipelines menores ou para execuções em um único nó.
CeleryExecutor
Se você precisa de um executor para ambientes distribuídos e com alta escala, o CeleryExecutor é uma excelente opção. Ele utiliza o Celery, uma biblioteca de tarefas em fila, para distribuir as tarefas em nós de execução separados. Essa abordagem torna o Airflow adequado para executar pipelines em clusters de servidores, permitindo dimensionar horizontalmente conforme a demanda.
KubernetesExecutor
Para ambientes que utilizam Kubernetes como plataforma de orquestração de containers, o KubernetesExecutor é uma escolha natural. Ele aproveita a capacidade de orquestração do Kubernetes para executar tarefas em pods separados, o que pode resultar em melhor isolamento de recursos e mais facilidade na execução de tarefas em containers.
DaskExecutor
Se o seu fluxo de trabalho requer processamento paralelo e distribuído, o DaskExecutor pode ser a opção certa. Ele utiliza a biblioteca Dask para realizar a computação paralela em um cluster de recursos. Essa abordagem é ideal para tarefas que podem ser divididas em sub-tasks independentes, permitindo maior aproveitamento dos recursos disponíveis.
Linguagem de programação
Airflow suporta unicamente como linguem de programação, o Python. Para ser sincero, não é um limitador para quem não conhece bem a linguagem. Na prática, o processo de criar DAGs é padrão, o que pode mudar de acordo com as suas necessidades, será lidar com diferentes tipos de operadores, podendo ou não usar Python.
Mãos na massa
Montando o ambiente
Para este tutorial iremos usar o Docker que nos ajudará a provisionar o nosso ambiente sem a necessidade de instalar o Airflow.
Caso não tenha o Docker instalado, recomendo seguir as recomendações deste link e após instalado, volte para seguirmos o tutorial.
Baixando o projeto
Para facilitar, clone o projeto do seguinte repositório e siga os passos para fazer o deploy do Airflow.
Passos para o deploy
Com o docker instalado e após ter baixado o projeto conforme o item anterior, acesse o diretório onde se encontra o projeto e abra o terminal, execute o seguinte comando docker:
docker-compose up
O comando acima irá iniciar os containers do docker onde estão presentes os serviços do próprio Airflow, postgres e entre outras que iremos utilizar.
Se tiver curiosidade de como estes serviços estão mapeados, abra o arquivo docker-compose.yaml do projeto e lá você terá mais detalhes.
Enfim, após a execução do comando acima e os containers já iniciados, acesse via browser o seguinte endereço http://localhost:8080/
Uma tela como abaixo será aberta, basta digitar airflow para o usuário e senha e acesse a UI do Airflow.
Criando a DAG
Criando um simples Hello World
Para este exemplo, iremos criar uma simples DAG onde irá printar o clássico "Hello World". No projeto que você baixou, acesse a pasta /dags e crie o seguinte arquivo python chamado hello_world.py .
O Código acima é um simples exemplo de uma DAG escrita em Python. Percebe-se que começamos importando algumas funções, incluindo a própria DAG, função relacionada a data e o operador Python.
Em seguida, criamos uma função Python que irá printar no console "Hello World" chamada print_hello. Esta função será chamada pela DAG mais a frente.
A declaração de uma DAG inicia-se com with DAG(..) passando alguns argumentos como:
dag_id: Identificador da DAG no contexto do Airflow
start_date: A data definida é apenas um ponto de referência e não necessariamente a data do início de execução e nem da criação da DAG. Normalmente as execuções são feitas a uma data posterior a definida neste parâmetro, e tem uma importância quando precisamos calcular execuções entre o início e o que foi definido no parâmetro schedule_interval.
schedule_interval: Neste parâmetro definimos a periodicidade em que a DAG será executada. É possível definir diferentes formas de execuções através de expressões CRON ou através de Strings já definidas como @daily, @hourly, @once, @weekly e etc. No caso do exemplo, o fluxo será executado apenas uma vez.
catchup: Este parâmetro controla execuções retroativas, ou seja, caso esteja definido como True, o Airflow irá executar o período retroativo a partir da data definida no start_date até a data atual. No exemplo anterior definimos como False por não ter a necessidade da execução retroativa.
Após o preenchimento dos argumentos, criamos a task hello_task dentro da própria DAG através do operador PythonOperator no qual provê formas de executar funções python dentro de uma DAG.
Perceba que declaramos um identificador através do task_id e no argumento python_callable no qual é nativa do operador PythonOperator, passamos a função python print_hello criada anteriormente.
Por último, invoque a task hello_task. Dessa forma, a DAG entenderá que esta será a task a ser executada.
Caso você já tenha feito o deploy, a DAG aparecerá no Airflow em pouco tempo para ser executada conforme na imagem abaixo:
Após a DAG criada, ative-a e a execute clicando em Trigger DAG conforme a imagem acima. Clique na task hello_operator (centro) e em seguida uma janela será aberta conforme imagem abaixo:
Clique no botão Log para ver mais detalhes da execução:
Perceba como é simples a criação de uma DAG, basta pensar nas diferentes possibilidades e cenários de aplicabilidade. A seguir, faremos mais exemplos um pouco mais complexos explorando várias outros cenários.
Conclusão
Com base no simples exemplo mostrado, Airflow apresentou uma abordagem flexível e simples para o controle de fluxos automatizados, desde a criação de DAGs até a navegação do seu componente web. Como citei no início, a sua utilização não se limita somente na orquestração de ETLs como é feito no mercado em geral, mas também na possibilidade do seu uso em tarefas que requer qualquer necessidade em controlar fluxos que possuem dependências entre seus componentes dentro de um contexto escalável ou não.
Repositório GitHub
Espero que tenha curtido!
Comments