programa
Criando um pipeline de ETL com o Airflow
Bem-vindo ao mundo dos pipelines de ETL usando o Apache Airflow. Neste tutorial, vamos nos concentrar em extrair dados do mercado de ações usando a API Polygon, transformando esses dados e, em seguida, carregando-os em um banco de dados SQLite para facilitar o acesso e a manipulação. Vamos começar!
O que é o Apache Airflow e o Airflow ETL?
O Apache Airflow é considerado um padrão do setor para orquestração de dados e gerenciamento de pipeline. Ele se tornou popular entre os cientistas de dados, engenheiros de machine learning e profissionais de IA por sua capacidade de orquestrar fluxos de trabalho complexos, gerenciar dependências entre tarefas, repetir tarefas que falharam e fornecer registro extensivo.
Airflow ETL refere-se ao uso do Apache Airflow para gerenciar processos de ETL. Para revisar, ETL é um tipo de integração de dados que envolve a extração de dados de várias fontes, transformando-os em um formato adequado para análise e carregando-os em um destino final, como um data warehouse.
Configurando nosso ambiente de desenvolvimento do Airflow
Primeiro, precisamos configurar nosso ambiente de desenvolvimento antes de começarmos a criar um pipeline de ETL com o Airflow. Para obter informações detalhadas sobre como configurar nosso ambiente de desenvolvimento, consulte nosso tutorial Getting Started with Apache Airflow.
Você também terá que instalar o Astro CLI. Consulte o Astronomer, que mantém a CLI do Astro e fornece documentação detalhada, para obter instruções.
Criando um projeto de fluxo de ar
Depois de configurar nosso ambiente e instalar o Astro CLI, criamos um projeto Airflow. Para isso, abrimos um terminal shell e criamos um novo diretório com o caminho escolhido.
~/Documents/data-engineering/ETL-pipeline/
Na raiz desse diretório, executamos o seguinte comando para criar os recursos necessários:
astro dev init
O conteúdo do diretório terá a aparência abaixo. O resultado exato pode variar.
├── dags/
├── include/
├── plugins/
├── tests/
├── airflow_settings.yaml
├── Dockerfile
├── packages.txt
└── requirements.txt
Para colocar seu projeto em funcionamento, execute o seguinte comando:
astro dev start
O ambiente do Airflow levará cerca de um minuto para girar. Depois disso, navegue até localhost: 8080
no navegador da Web e você será recebido pela interface do usuário do Airflow.
Agora você está pronto para começar a desenvolver seu próprio pipeline de ETL com o Airflow!
Projetando um pipeline de ETL
É importante reservar um tempo e planejar cada componente do pipeline antes de você escrever uma única linha de código. Em particular, é um bom hábito identificar primeiro a fonte de dados e o destino para o qual os dados serão carregados. Ao dedicar algum tempo para identificar as fontes e o destino dos dados, também aprendemos sobre como os dados serão transformados ao longo do caminho.
Para o nosso exemplo, projetaremos um pipeline de dados para extrair dados do mercado de ações da API Polygon, antes de transformar e carregar esses dados em um banco de dados SQLite. Nesse caso, o sistema de origem é a API Polygon, e o destino é um banco de dados SQLite. Vamos ilustrar isso com uma imagem:
.
Com base em nossa experiência como engenheiros de dados, sabemos que, para preparar os dados a serem carregados em um banco de dados SQLite, eles precisam ser transformados de JSON para um formato tabular. Criamos um plano para transformar nossos dados depois que eles forem extraídos da API do Polygon, usando Python nativo e a biblioteca pandas
. Vamos atualizar nosso visual para mostrar essa alteração:
.
Ao acrescentar essas informações adicionais, criamos um diagrama de arquitetura, que é uma representação visual ampla do nosso sistema. Vemos as três etapas lógicas nesse pipeline, que correspondem a E, T e L do nosso processo.
Também podemos traduzir cada uma dessas tarefas em um gráfico acíclico direcionado, ou DAG, que é uma configuração específica que define todo o conjunto de tarefas a serem executadas pelo Airflow, sua sequência e suas dependências entre si. Saiba mais sobre gráficos acíclicos direcionados fazendo nosso curso de Introdução à engenharia de dados, que analisa detalhadamente os DAGs do Airflow.
Em uma configuração de dados corporativos, muitas equipes usam outros tipos de documentos chamados especificações técnicas, ou tech specs, para chegar a um acordo e documentar as escolhas de design. Para o nosso caso, basta criar uma tabela para documentar as escolhas de design do nosso pipeline de dados.
Tipo de operador |
ID da tarefa |
Notas |
---|---|---|
Extrair |
hit_polygon_api |
Use a API do TaskFlow e crie uma função Python para autenticar e acessar a API do Polygon, antes de retornar a resposta. |
Transformar |
flatten_market_data |
Achate os dados retornados da tarefa hit_polygon_api e prepare-os para serem carregados no SQLite |
Carga |
load_market_data |
Carregar os dados achatados no SQLite |
Opções de pipeline de dados
Lembre-se de que nosso DAG se concentra nas especificidades de nível de tarefa e pode não abranger todas as informações. Você pode usar uma segunda tabela para documentar detalhes adicionais que precisam ser resolvidos. Nossas perguntas incluem:
- Com que frequência esse DAG será executado?
- O que acontece se uma tarefa no pipeline falhar?
- E se quisermos coletar dados sobre outros estoques?
Parâmetro |
Valor |
---|---|
ID DO DAG |
market_etl |
Data de início |
1º de janeiro de 2024 (9:00 AM UTC) |
Intervalo |
Diariamente |
Você está em dia? |
True (carregar todos os dados desde 1º de janeiro de 2024) |
Concorrência |
1 DAG em execução por vez |
Tarefas repetidas, atraso de repetição |
3 tentativas, atraso de 5 minutos em cada tentativa |
Múltiplos marcadores de ações? |
DAGs de criação dinâmica |
Questões a serem resolvidas
Vamos fazer um balanço de tudo o que realizamos: Criamos um diagrama de arquitetura, documentamos como o pipeline pode ser dividido em tarefas do Airflow e identificamos as informações de alto nível necessárias para configurar o DAG.
Se você quiser saber mais sobre como projetar, desenvolver e testar pipelines de dados, confira o curso Introduction to Data Pipelines do DataCamp. Aqui, você dominará os conceitos básicos da criação de pipelines de ETL com Python, bem como as práticas recomendadas para garantir que sua solução seja robusta, resiliente e reutilizável.
Criando um pipeline de ETL com o Airflow
Organizaremos o modo como criamos o pipeline de ETL, seguindo as etapas em ordem. A adoção de uma abordagem estruturada garante que cada fase seja executada com precisão.
Extração de dados com o Airflow
Antes de extrair dados da API do Polygon, precisaremos criar um token de API visitando o Polygon e selecionando o botão Create API Key (Criar chave de API ). Observe que, para este tutorial, não há necessidade de se inscrever em uma assinatura paga - o nível gratuito gerará uma chave de API e fornecerá toda a funcionalidade de que precisamos. Lembre-se apenas de copiar e salvar sua chave de API.
Depois de criarmos uma chave de API, estamos prontos para começar a extrair dados da API do Polygon com o Airflow. Usamos a tabela de especificações técnicas que criamos, que inclui os detalhes da nossa configuração de DAG, para nos ajudar a codificar.
from airflow import DAG
from datetime import datetime, timedelta
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=True,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
Vamos adicionar nossa primeira tarefa. Usaremos a API TaskFlow e o módulo requests
para extrair dados de abertura e fechamento de ações da API Polygon.
import requests
...
@task()
def hit_polygon_api(**context):
# Instantiate a list of tickers that will be pulled and looped over
stock_ticker = "AMZN"
# Set variables
polygon_api_key= "<your-api-key>"
ds = context.get("ds")
# Create the URL
url= f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
response = requests.get(url)
# Return the raw data
return response.json()
Há algumas coisas que você notará sobre esse código.
- A função
hit_polygon_api
é decorada com@task
. Esse decorador converte a função em uma tarefa do Airflow que pode ser executada como parte de um DAG. - O parâmetro
context
é definido na assinaturahit_polygon_api
. Posteriormente, ele é usado para extrair o valor armazenado na chaveds
. context
é um dicionário que contém metadados sobre a tarefa e o DAG.- Ao extrair
ds
do dicionáriocontext
, obtemos a data dodata_interaval_end
no formatoYYYY-mm-dd
. - Para garantir que nossa nova tarefa seja executada quando o DAG for executado, precisaremos adicionar uma chamada ao site
hit_polygon_api
.
Juntando tudo isso, o código para criar a primeira parte do nosso pipeline de ETL tem a seguinte aparência.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta import requests
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=False,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
# Create a task using the TaskFlow API
@task()
def hit_polygon_api(**context):
# Instantiate a list of tickers that will be pulled and looped over
stock_ticker = "AMZN"
# Set variables
polygon_api_key = "<your-api-key>"
ds = context.get("ds")
# Create the URL
url = f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
response = requests.get(url)
# Return the raw data
return response.json()
hit_polygon_api()
Você perceberá que a resposta conterá um erro quando isso for executado para 1º de janeiro de 2024. Como o mercado está fechado nesse dia, o Polygon retorna uma resposta observando a exceção. Trataremos disso na próxima etapa.
Transformando dados com o Airflow
Depois que os dados foram extraídos da API Polygon, estamos prontos para transformá-los.
Para fazer isso, criaremos outra tarefa usando a API TaskFlow. Essa tarefa será denominada flatten_market_data
e terá os parâmetros polygon_response
, que são os dados brutos retornados pela função hit_polygon_api
, e **context
. Vamos dar uma olhada mais de perto em polygon_response
em breve.
A transformação real que faremos é bastante simples. Vamos achatar o JSON retornado pela API Polygon em uma lista. O problema é que forneceremos valores padrão exclusivos para cada chave se ela não existir na resposta.
Por exemplo, se a chave from não existir na resposta, forneceremos um valor padrão usando o contexto do Airflow. Isso resolve o problema que vimos anteriormente, em que uma resposta continha um conjunto limitado de chaves (devido ao fato de o mercado estar fechado). Em seguida, converteremos a lista em um DataFrame pandas
e a retornaremos. A tarefa resultante tem a seguinte aparência:
@task
def flatten_market_data(polygon_response, **context):
# Create a list of headers and a list to store the normalized data in
columns = {
"status": "closed",
"from": context.get("ds"),
"symbol": "AMZN",
"open": None,
"high": None,
"low": None,
"close": None,
"volume": None
}
# Create a list to append the data to
flattened_record = []
for header_name, default_value in columns.items():
# Append the data
flattened_record.append(polygon_response.get(header_name, default_value))
# Convert to a pandas DataFrame
flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
return flattened_dataframe
Teremos que adicionar uma dependência entre as tarefas hit_polygon_api
e flatten_market_data
. Para fazer isso, atualizaremos o código em nosso DAG para que corresponda ao que está abaixo:
import pandas as pd
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=True,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
@task()
def hit_polygon_api(**context):
...
@task
def flatten_market_data(polygon_response, **context):
# Create a list of headers and a list to store the normalized data in
columns = {
"status": None,
"from": context.get("ds"),
"symbol": "AMZN",
"open": None,
"high": None,
"low": None,
"close": None,
"volume": None
}
# Create a list to append the data to
flattened_record = []
for header_name, default_value in columns.items():
# Append the data
flattened_record.append(polygon_response.get(header_name, default_value))
# Convert to a pandas DataFrame
flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
return flattened_dataframe
# Set dependencies
raw_market_data = hit_polygon_api()
flatten_market_data(raw_market_data)
Aqui, o valor de retorno da tarefa hit_polygon_api
é armazenado em raw_market_data
. Em seguida, raw_market_data
é passado como um argumento para a tarefa flatten_market_data
por meio do parâmetro polygon_response
. Esse código não apenas define uma dependência entre as tarefas hit_polygon_api
e flatten_market_data
, mas também permite que os dados sejam compartilhados entre essas duas tarefas.
Embora a transformação que realizamos tenha sido relativamente simples, o Airflow oferece a capacidade de executar uma ampla gama de manipulações de dados mais complicadas. Além de usar tarefas nativas, é fácil aproveitar a ampla coleção de ganchos e operadores criados pelo provedor do Airflow para orquestrar a transformação usando ferramentas como o AWS Lambda e o DBT.
Carregando dados com o Airflow
Chegamos à última etapa do nosso pipeline de ETL. Planejamos fazer isso usando um banco de dados SQLite e uma tarefa final definida com a API TaskFlow.
Como antes, definiremos um único parâmetro quando criarmos nossa tarefa. Vamos chamá-lo de flattened_dataframe
. Isso permite que os dados retornados pela tarefa flatten_market_data
sejam transmitidos à nossa nova tarefa.
Antes de escrevermos o código para carregar nossos dados em um banco de dados SQLite, primeiro precisamos criar uma conexão na interface do usuário do Airflow. Para abrir a página de conexões, siga estas etapas:
- Abra a interface do usuário do Airflow
- Passe o mouse sobre a opção Admin
- Selecione Conexões.
- Clique no ícone + para criar uma nova conexão.
Você será direcionado para uma tela semelhante a esta:
Página de conexões do Airflow
Para preencher a página de conexões, siga estas etapas:
- Altere o tipo de conexão para Sqlite.
- Forneça o valor "market_database_conn" para o ID da conexão.
- Adicione "/usr/local/airflow/market_data.db" ao campo Host.
A configuração para essa conexão deve ser semelhante à imagem abaixo. Quando isso acontecer, clique em Salvar.
. Conexão do Airflow não salva com o banco de dados SQLite
Agora que criamos uma conexão, podemos recuperar essas informações em nossa tarefa usando o SqliteHook
. Dê uma olhada no código abaixo.
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
@task
def load_market_data(flattened_dataframe):
# Pull the connection
market_database_hook = SqliteHook("market_database_conn")
market_database_conn = market_database_hook.get_sqlalchemy_engine()
# Load the table to Postgres, replace if it exists
flattened_dataframe.to_sql(
name="market_data",
con=market_database_conn,
if_exists="append",
index=False
)
# print(market_database_hook.get_records("SELECT * FROM market_data;"))
Com esse código, estamos criando uma conexão com o banco de dados SQLite que especificamos na última etapa. Em seguida, o mecanismo de conexão é recuperado do gancho usando o método .get_sqlalchemy_engine()
. Em seguida, isso é passado como um argumento para o parâmetro con
quando o método .to_sql()
é chamado no flattened_dataframe
.
Observe que o nome da tabela em que esses dados estão sendo gravados é market_data
e, se a tabela existir, ela será anexada a ela. Ao testar, gosto de verificar se os dados estão sendo gravados, recuperando e imprimindo esses registros. Você pode fazer isso descomentando a última linha dessa tarefa.
Juntando tudo isso, nosso código deve ficar mais ou menos assim:
from airflow import DAG
from airflow.decorators import task
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from datetime import datetime, timedelta
import requests
import pandas as pd
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=True,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
# Create a task using the TaskFlow API
@task()
def hit_polygon_api(**context):
# Instantiate a list of tickers that will be pulled and looped over
stock_ticker = "AMZN"
# Set variables
polygon_api_key = "<your-api-key>"
ds = context.get("ds")
# Create the URL
url = f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
response = requests.get(url)
# Return the raw data
return response.json()
@task
def flatten_market_data(polygon_response, **context):
# Create a list of headers and a list to store the normalized data in
columns = {
"status": None,
"from": context.get("ds"),
"symbol": "AMZN",
"open": None,
"high": None,
"low": None,
"close": None,
"volume": None
}
# Create a list to append the data to
flattened_record = []
for header_name, default_value in columns.items():
# Append the data
flattened_record.append(polygon_response.get(header_name, default_value))
# Convert to a pandas DataFrame
flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
return flattened_dataframe
@task
def load_market_data(flattened_dataframe):
# Pull the connection
market_database_hook = SqliteHook("market_database_conn")
market_database_conn = market_database_hook.get_sqlalchemy_engine()
# Load the table to SQLite, append if it exists
flattened_dataframe.to_sql(
name="market_data",
con=market_database_conn,
if_exists="append",
index=False
)
# Set dependencies between tasks
raw_market_data = hit_polygon_api()
transformed_market_data = flatten_market_data(raw_market_data)
load_market_data(transformed_market_data)
Novamente, atualizamos nossas dependências para permitir que os dados retornados da tarefa flatten_market_data
sejam passados para a tarefa load_market_data
. A visualização do gráfico resultante para o nosso DAG é semelhante a esta:
Visualização de gráfico para pipeline de ETL
Testes
Agora que você construiu seu primeiro Airflow DAG, é hora de garantir que ele funcione. Há algumas maneiras de fazer isso, mas uma das mais comuns é executar o DAG de ponta a ponta.
Para fazer isso na interface do usuário do Airflow, você navegará até o DAG e alternará a chave de azul para ativo. Como catchup
foi definido como True, uma execução de DAG deve ser enfileirada e começar a ser executada. Se uma tarefa for executada com êxito, a caixa associada a ela na interface do usuário ficará verde. Se todas as tarefas no DAG forem bem-sucedidas, o DAG será marcado como bem-sucedido e a próxima execução do DAG será acionada.
Se uma tarefa falhar, o estado será ativado para nova tentativa e marcado em amarelo. Quando isso acontece, é melhor você dar uma olhada nos logs dessa tarefa. Isso pode ser feito clicando na caixa amarela na exibição de grade e selecionando Logs. Aqui, você encontrará a mensagem de exceção e poderá começar a fazer a triagem. Se uma tarefa falhar mais do que o número de tentativas especificadas, o estado dessa tarefa e do DAG será definido como failed.
Além de testar um DAG de ponta a ponta, o Airflow facilita a criação de testes unitários. Quando você cria inicialmente o ambiente usando astro dev start
, um diretório tests/
é criado para você. Aqui, você pode adicionar testes de unidade para o seu DAG e para os componentes do seu DAG.
Abaixo está um teste de unidade para a configuração do nosso DAG. Esse teste valida vários dos parâmetros definidos ao definir, como start_date
, schedule
e catchup
. Depois que você tiver escrito o teste, navegue até a raiz do projeto e execute-o:
from airflow.models.dagbag import DagBag
from datetime import datetime
import pytz
def test_market_etl_config():
# Pull the DAG
market_etl_dag = DagBag().get_dag("market_etl")
# Assert start date, schedule, and catchup
assert market_etl_dag.start_date == datetime(2024, 3, 25, 9, tzinfo=pytz.UTC)
assert market_etl_dag.schedule_interval == "@daily"
assert market_etl_dag.catchup
astro dev pytest
Esse comando executará todos os testes de unidade em seu diretório tests/
. Se quiser executar apenas um único teste, você pode fornecer o caminho para o arquivo como um argumento para o comando acima. Além de usar a CLI do Astro para executar testes, qualquer executor de testes Python pode ser usado para escrever e executar testes de unidade.
Para projetos pessoais, escrever testes de unidade ajudará a garantir que o código funcione como você espera. Em um ambiente empresarial, os testes de unidade são quase sempre necessários. A maioria das equipes de dados utilizará algum tipo de ferramenta de CI/CD para implantar seu projeto Airflow. Normalmente, esse processo envolve a execução de testes unitários e a validação de seus resultados para garantir que o DAG que você escreveu esteja pronto para a produção. Para obter mais informações sobre testes de unidade, consulte nosso tutorial Como usar o Pytest para testes de unidade, bem como a Introdução aos testes em Python.
Dicas e técnicas avançadas de fluxo de ar
Criamos um pipeline de dados simples que funciona e até usamos técnicas de transformação e persistência. Em outros casos, o Airflow está equipado para orquestrar fluxos de trabalho complexos usando operadores personalizados e criados pelo provedor, processando terabytes de dados.
Alguns exemplos disso são o S3ToSnowflakeOperator
e o DatabricksRunNowOperator
, que permitem facilmente a integração com uma pilha de dados maior. Trabalhar com esses tipos de operadores é complicado em um ambiente de hobby. Por exemplo, para usar o S3ToSnowflakeOperator
, você precisaria ter contas e configurações do AWS e do Snowflake para o recurso entre o qual estaria transferindo dados.
Além dos fluxos de trabalho ETL, o Airflow oferece suporte a fluxos de trabalho ELT, que estão se tornando o padrão do setor para equipes que utilizam data warehouses na nuvem. Lembre-se disso ao projetar pipelines de dados.
No componente de carga do nosso pipeline, criamos uma conexão com um banco de dados SQLite, que foi recuperado posteriormente e usado para manter os dados. As conexões, às vezes chamadas de segredos, são um recurso do Airflow projetado para simplificar as interações com os sistemas de origem e destino. Ao usar essas conexões para armazenar informações confidenciais, como sua chave de API do Polygon, você aumenta a segurança do seu código. Essa abordagem também permite que você gerencie as credenciais separadamente da sua base de código. Sempre que possível, é aconselhável utilizar o Connections extensivamente para manter seu fluxo de trabalho seguro e organizado.
Você deve ter notado que o ticker de ações "AMZN" foi codificado em nossas tarefas hit_polygon_api
e flatten_market_data
. Isso nos permitiu extrair, transformar e carregar dados para esse único ticker de ações. Mas e se você quisesse usar esse mesmo código para vários tickers de ações? Felizmente, é fácil gerar DAGs dinamicamente. Com um mínimo de refatoração em nosso código, poderíamos fazer um loop em uma lista de tickers de ações e parametrizar os valores dos tickers de ações. Isso ajuda a tornar seus DAGs mais modulares e portáteis. Para obter informações sobre a geração dinâmica de DAGs, consulte a documentação Dynamically Generate DAGs in Airflow do Astronomer.
Conclusão
Parabéns! Você criou um DAG do Airflow para extrair, transformar e carregar dados do mercado de ações da API Polygon usando Python, pandas e SQLite. Ao longo do caminho, você aprimorou suas habilidades na criação de diagramas de arquitetura e especificações técnicas, criando conexões Airflow e testando seus DAGs. À medida que você continuar sua jornada no Airflow, experimente técnicas mais avançadas para ajudar a tornar seus pipelines robustos, resilientes e reutilizáveis.
Boa sorte e boa codificação!
Aprenda engenharia de dados com a Datacamp
curso
Introduction to Airflow in Python
curso
Understanding Data Engineering
blog
Uma lista das 19 melhores ferramentas de ETL e por que escolhê-las
DataCamp Team
12 min
blog
O que é o Alteryx? Um guia introdutório
tutorial
Tutorial de Pipes em R para iniciantes
tutorial
Criando um transformador com o PyTorch
tutorial