programa
Construir un ETL Pipeline con Airflow
Bienvenido al mundo de las canalizaciones ETL con Apache Airflow. En este tutorial, nos centraremos en extraer datos bursátiles mediante la API Polygon, transformar estos datos y, a continuación, cargarlos en una base de datos SQLite para facilitar su acceso y manipulación. ¡Empecemos!
¿Qué es Apache Airflow y Airflow ETL?
Apache Airflow se considera un estándar del sector para la orquestación de datos y la gestión de canalizaciones. Se ha hecho popular entre los científicos de datos, los ingenieros de aprendizaje automático y los profesionales de la IA por su capacidad para orquestar flujos de trabajo complejos, gestionar dependencias entre tareas, reintentar tareas fallidas y proporcionar un registro exhaustivo.
Airflow ETL se refiere al uso de Apache Airflow para gestionar procesos ETL. Para repasar, ETL es un tipo de integración de datos que consiste en extraer datos de varias fuentes, transformarlos en un formato adecuado para el análisis y cargarlos en un destino final, como un almacén de datos.
Configurar nuestro entorno de desarrollo Airflow
Primero debemos configurar nuestro entorno de desarrollo antes de poder empezar a construir una canalización ETL con Airflow. Para obtener información detallada sobre cómo configurar nuestro entorno de desarrollo, consulta nuestro tutorial Introducción a Apache Airflow.
También tendremos que instalar la CLI de Astro. Consulta las instrucciones en Astronomer, que mantiene la CLI de Astro y proporciona documentación detallada.
Crear un proyecto de flujo de aire
Tras configurar nuestro entorno e instalar la CLI de Astro, creamos un proyecto Airflow. Para ello, abrimos un intérprete de comandos de terminal y creamos un nuevo directorio con la ruta elegida.
~/Documents/data-engineering/ETL-pipeline/
Desde la raíz de este directorio, ejecutamos el siguiente comando para crear los recursos necesarios:
astro dev init
El contenido del directorio será algo parecido a lo siguiente. El resultado exacto puede variar.
├── dags/
├── include/
├── plugins/
├── tests/
├── airflow_settings.yaml
├── Dockerfile
├── packages.txt
└── requirements.txt
Para poner en marcha tu proyecto, ejecuta el siguiente comando:
astro dev start
Tu entorno Airflow tardará aproximadamente un minuto en girar. Una vez que lo haga, ve a localhost: 8080
en tu navegador web, y te aparecerá la interfaz de usuario de Airflow.
¡Ya estás listo para empezar a desarrollar tu propio ETL pipeline con Airflow!
Diseñar un pipeline ETL
Es importante tomarse tiempo y planificar cada componente de tu pipeline antes de escribir ni una sola línea de código. En particular, es un buen hábito identificar primero tu fuente de datos y el destino al que se cargarán tus datos. Al dedicar algún tiempo a identificar las fuentes y el destino de los datos, también aprendemos cómo se transformarán los datos por el camino.
En nuestro ejemplo, diseñaremos una canalización de datos para extraer datos bursátiles de la API Polygon, antes de transformarlos y cargarlos en una base de datos SQLite. En este caso, el sistema de origen es la API Polygon, y el destino es una base de datos SQLite. Ilustremos esto con una imagen:
Diagrama de origen y destino
Sabemos por nuestra experiencia como ingenieros de datos que, para preparar los datos que se van a cargar en una base de datos SQLite, es necesario transformarlos de JSON a un formato tabular. Creamos un plan para transformar nuestros datos una vez extraídos de la API Polygon, utilizando Python nativo y la biblioteca pandas
. Actualicemos nuestro visual para mostrar este cambio:
Diagrama de arquitectura
Al añadir esta información adicional, hemos creado un diagrama de arquitectura, que es una amplia representación visual de nuestro sistema. Vemos los tres pasos lógicos de este pipeline, que corresponden a la E, la T y la L de nuestro proceso.
También podemos traducir cada una de estas tareas en un grafo acíclico dirigido, de DAG, que es una configuración específica que define todo el conjunto de tareas que ejecutará Airflow, su secuencia y sus dependencias entre sí. Aprende más sobre los grafos acíclicos dirigidos siguiendo nuestro Curso de Introducción a la Ingeniería de Datos, que repasa en detalle los DAG de Airflow.
En un entorno de datos empresarial, muchos equipos utilizan otro tipo de documentos llamados especificaciones técnicas, o tech specs, para acordar y documentar las opciones de diseño. En nuestro caso, basta con crear una tabla para documentar las opciones de diseño de nuestro conducto de datos.
Tipo de operador |
ID de tarea |
Notas |
---|---|---|
Extraer |
hit_polygon_api |
Utiliza la API TaskFlow y construye una función Python para autenticar y golpear la API Polygon, antes de devolver la respuesta |
Transforma |
flatten_market_data |
Aplana los datos devueltos por la tarea hit_polygon_api, prepáralos para cargarlos en SQLite |
Carga |
load_market_data |
Carga los datos aplanados en SQLite |
Opciones de canalización de datos
Ten en cuenta que nuestro DAG se centra en los detalles de las tareas y puede que no abarque toda la información. Podemos utilizar una segunda tabla para documentar detalles adicionales que tengamos que limar. Nuestras preguntas incluyen:
- ¿Con qué frecuencia se ejecutará este DAG?
- ¿Qué ocurre si falla una tarea de la cadena de producción?
- ¿Y si queremos recoger datos sobre poblaciones adicionales?
Parámetro |
Valor |
---|---|
ID DAG |
market_etl |
Fecha de inicio |
1 de enero de 2024 (9:00 AM UTC) |
Intervalo |
Diario |
¿Ponte al día? |
Verdadero (cargar todos los datos desde el 1 de enero de 2024) |
Concurrencia |
1 DAG funcionando a la vez |
Reintentos de tarea, retardo de reintentos |
3 reintentos, 5 minutos de retraso en cada reintento |
¿Múltiples tickers? |
DAG de generación dinámica |
Cuestiones por resolver
Hagamos balance de todo lo que hemos conseguido: Hemos creado un diagrama de arquitectura, documentado cómo se puede desglosar el pipeline en tareas Airflow, e identificado la información de alto nivel necesaria para configurar el DAG.
Si quieres saber más sobre cómo diseñar, desarrollar y probar canalizaciones de datos, consulta la Introducción a las canalizaciones de datos de DataCamp. Aquí dominarás los fundamentos de la construcción de canalizaciones ETL con Python, así como las mejores prácticas para garantizar que tu solución sea sólida, resistente y reutilizable.
Construir un ETL Pipeline con Airflow
Organizaremos cómo construimos nuestro canal ETL siguiendo los pasos en orden. Adoptar un enfoque estructurado garantiza que cada fase se ejecute con precisión.
Extraer datos con Airflow
Antes de extraer datos de la API de Polygon, tendremos que crear un token de API visitando Polygon y seleccionando el botón Crear clave de API. Ten en cuenta que para este tutorial no es necesario suscribirse a una suscripción de pago: el nivel gratuito generará una clave API y proporcionará toda la funcionalidad que necesitamos. Sólo recuerda copiar y guardar tu clave API.
Después de haber creado una clave API, ya estamos listos para empezar a extraer datos de la API Polygon con Airflow. Utilizamos la tabla de especificaciones técnicas que hemos creado, que incluye los detalles de la configuración de nuestro DAG, para ayudarnos a codificar.
from airflow import DAG
from datetime import datetime, timedelta
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=True,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
Vamos a añadir nuestra primera tarea. Utilizaremos la API TaskFlow y el módulo requests
para extraer datos de apertura-cierre de existencias de la API Polygon.
import requests
...
@task()
def hit_polygon_api(**context):
# Instantiate a list of tickers that will be pulled and looped over
stock_ticker = "AMZN"
# Set variables
polygon_api_key= "<your-api-key>"
ds = context.get("ds")
# Create the URL
url= f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
response = requests.get(url)
# Return the raw data
return response.json()
Hay algunas cosas que notarás en este código.
- La función
hit_polygon_api
está decorada con@task
. Este decorador convierte la función en una tarea Airflow que puede ejecutarse como parte de un DAG. - El parámetro
context
se define en la firmahit_polygon_api
. Posteriormente se utiliza para extraer el valor almacenado en la claveds
. context
es un diccionario que contiene metadatos sobre la tarea y el DAG.- Al extraer
ds
del diccionariocontext
, obtenemos la fecha dedata_interaval_end
en el formatoYYYY-mm-dd
. - Para asegurarnos de que nuestra nueva tarea se ejecuta cuando se ejecuta el DAG, tendremos que añadir una llamada a la página
hit_polygon_api
.
Juntando todo esto, el código para crear la primera parte de nuestro canal ETL tiene este aspecto.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta import requests
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=False,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
# Create a task using the TaskFlow API
@task()
def hit_polygon_api(**context):
# Instantiate a list of tickers that will be pulled and looped over
stock_ticker = "AMZN"
# Set variables
polygon_api_key = "<your-api-key>"
ds = context.get("ds")
# Create the URL
url = f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
response = requests.get(url)
# Return the raw data
return response.json()
hit_polygon_api()
Observarás que la respuesta contendrá un error cuando se ejecute para el 1 de enero de 2024. Como el mercado está cerrado ese día, Polygon devuelve una respuesta señalando la excepción. Nos ocuparemos de esto en el siguiente paso.
Transformar datos con Airflow
Una vez extraídos los datos de la API de Polígonos, estamos listos para transformarlos.
Para ello, crearemos otra tarea utilizando la API TaskFlow. Esta tarea se llamará flatten_market_data
y tiene los parámetros polygon_response
, que son los datos brutos devueltos por la función hit_polygon_api
, y **context
. Vamos a echar un vistazo más de cerca a polygon_response
dentro de un momento.
La transformación real que vamos a hacer es bastante sencilla. Vamos a aplanar el JSON devuelto por la API Polygon en una lista. El truco es que proporcionaremos valores por defecto únicos para cada clave si no existe en la respuesta.
Por ejemplo, si la clave from no existe en la respuesta, proporcionaremos un valor por defecto utilizando el contexto de Airflow. Esto soluciona el problema que vimos antes, cuando una respuesta contenía un conjunto limitado de claves (debido a que el mercado estaba cerrado). A continuación, convertiremos la lista en un DataFrame pandas
, y lo devolveremos. La tarea resultante tiene este aspecto:
@task
def flatten_market_data(polygon_response, **context):
# Create a list of headers and a list to store the normalized data in
columns = {
"status": "closed",
"from": context.get("ds"),
"symbol": "AMZN",
"open": None,
"high": None,
"low": None,
"close": None,
"volume": None
}
# Create a list to append the data to
flattened_record = []
for header_name, default_value in columns.items():
# Append the data
flattened_record.append(polygon_response.get(header_name, default_value))
# Convert to a pandas DataFrame
flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
return flattened_dataframe
Tendremos que añadir una dependencia entre las tareas hit_polygon_api
y flatten_market_data
. Para ello, actualizaremos el código de nuestro DAG para que coincida con lo que hay a continuación:
import pandas as pd
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=True,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
@task()
def hit_polygon_api(**context):
...
@task
def flatten_market_data(polygon_response, **context):
# Create a list of headers and a list to store the normalized data in
columns = {
"status": None,
"from": context.get("ds"),
"symbol": "AMZN",
"open": None,
"high": None,
"low": None,
"close": None,
"volume": None
}
# Create a list to append the data to
flattened_record = []
for header_name, default_value in columns.items():
# Append the data
flattened_record.append(polygon_response.get(header_name, default_value))
# Convert to a pandas DataFrame
flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
return flattened_dataframe
# Set dependencies
raw_market_data = hit_polygon_api()
flatten_market_data(raw_market_data)
Aquí, el valor de retorno de la tarea hit_polygon_api
se almacena en raw_market_data
. A continuación, raw_market_data
se pasa como argumento a la tarea flatten_market_data
mediante el parámetro polygon_response
. Este código no sólo establece una dependencia entre las tareas hit_polygon_api
y flatten_market_data
, sino que también permite compartir datos entre estas dos tareas.
Aunque la transformación que realizamos fue relativamente sencilla, Airflow ofrece la capacidad de ejecutar una amplia gama de manipulaciones de datos más complicadas. Además de utilizar tareas nativas, es fácil aprovechar la amplia colección de ganchos y operadores creados por proveedores de Airflow para orquestar la transformación utilizando herramientas como AWS Lambda y DBT.
Cargar datos con Airflow
Hemos llegado al último paso de nuestro ETL pipeline. Hemos planeado hacerlo utilizando una base de datos SQLite y una tarea final definida con la API TaskFlow.
Como antes, definiremos un único parámetro al crear nuestra tarea. A éste le llamaremos flattened_dataframe
. Esto permite pasar los datos devueltos por la tarea flatten_market_data
a nuestra nueva tarea.
Antes de escribir el código para cargar nuestros datos en una base de datos SQLite, tendremos que crear una conexión en la interfaz de usuario de Airflow. Para abrir la página de conexiones, sigue estos pasos:
- Abre la interfaz de usuario de Airflow
- Pasa el ratón por encima de la opción Admin
- Selecciona Conexiones.
- Pulsa el icono + para crear una nueva conexión.
Accederás a una pantalla parecida a ésta:
Página de conexiones de Airflow
Para rellenar la página de conexiones, sigue estos pasos:
- Cambia el Tipo de Conexión a Sqlite.
- Proporciona el valor "base_de_mercado_conn" para el Id. de conexión.
- Añade "/usr/local/airflow/market_data.db" al campo Host.
La configuración de esta conexión debe parecerse a la imagen siguiente. Cuando lo haga, haz clic en Guardar.
conexión Airflow a una base de datos SQLite Conexión Airflow a base de datos SQLite no guardada
Ahora que hemos creado una conexión, podemos recuperar esta información en nuestra tarea utilizando la dirección SqliteHook
. Echa un vistazo al código que aparece a continuación.
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
@task
def load_market_data(flattened_dataframe):
# Pull the connection
market_database_hook = SqliteHook("market_database_conn")
market_database_conn = market_database_hook.get_sqlalchemy_engine()
# Load the table to Postgres, replace if it exists
flattened_dataframe.to_sql(
name="market_data",
con=market_database_conn,
if_exists="append",
index=False
)
# print(market_database_hook.get_records("SELECT * FROM market_data;"))
Con este código, estamos creando una conexión a la base de datos SQLite que especificamos en el último paso. A continuación, el motor de conexión se recupera del gancho utilizando el método .get_sqlalchemy_engine()
. A continuación, se pasa como argumento al parámetro con
cuando se llama al método .to_sql()
sobre el flattened_dataframe
.
Ten en cuenta que el nombre de la tabla en la que se escriben estos datos es market_data
, y si la tabla existe, se añade a ella. Cuando hago pruebas, me gusta comprobar que los datos se escriben recuperando e imprimiendo estos registros. Puedes hacerlo descomentando la última línea de esta tarea.
Juntando todo esto, nuestro código debería parecerse un poco a esto:
from airflow import DAG
from airflow.decorators import task
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from datetime import datetime, timedelta
import requests
import pandas as pd
with DAG(
dag_id="market_etl",
start_date=datetime(2024, 1, 1, 9),
schedule="@daily",
catchup=True,
max_active_runs=1,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
) as dag:
# Create a task using the TaskFlow API
@task()
def hit_polygon_api(**context):
# Instantiate a list of tickers that will be pulled and looped over
stock_ticker = "AMZN"
# Set variables
polygon_api_key = "<your-api-key>"
ds = context.get("ds")
# Create the URL
url = f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
response = requests.get(url)
# Return the raw data
return response.json()
@task
def flatten_market_data(polygon_response, **context):
# Create a list of headers and a list to store the normalized data in
columns = {
"status": None,
"from": context.get("ds"),
"symbol": "AMZN",
"open": None,
"high": None,
"low": None,
"close": None,
"volume": None
}
# Create a list to append the data to
flattened_record = []
for header_name, default_value in columns.items():
# Append the data
flattened_record.append(polygon_response.get(header_name, default_value))
# Convert to a pandas DataFrame
flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
return flattened_dataframe
@task
def load_market_data(flattened_dataframe):
# Pull the connection
market_database_hook = SqliteHook("market_database_conn")
market_database_conn = market_database_hook.get_sqlalchemy_engine()
# Load the table to SQLite, append if it exists
flattened_dataframe.to_sql(
name="market_data",
con=market_database_conn,
if_exists="append",
index=False
)
# Set dependencies between tasks
raw_market_data = hit_polygon_api()
transformed_market_data = flatten_market_data(raw_market_data)
load_market_data(transformed_market_data)
De nuevo, actualizamos nuestras dependencias para permitir que los datos devueltos por la tarea flatten_market_data
se pasen a la tarea load_market_data
. La vista del gráfico resultante para nuestro DAG tiene este aspecto:
Vista gráfica de un canal ETLVista gráfica de un canal ETL
Prueba
Ahora que has construido tu primer Airflow DAG, es el momento de asegurarte de que funciona. Hay varias formas de hacerlo, pero una de las más comunes es ejecutar el DAG de extremo a extremo.
Para ello, en la interfaz de usuario de Airflow, ve a tu DAG y cambia el interruptor de azul a activo. Dado que catchup
se ha establecido en Verdadero, la ejecución de un DAG debería ponerse en cola y comenzar a ejecutarse. Si una tarea se ejecuta correctamente, la casilla asociada a ella en la interfaz de usuario se volverá verde. Si todas las tareas del DAG tienen éxito, el DAG se marcará como exitoso y se activará la siguiente ejecución del DAG.
Si una tarea falla, el estado será de reintento y estará marcado en amarillo. Cuando esto ocurra, lo mejor es echar un vistazo a los registros de esa tarea. Para ello, haz clic en el recuadro amarillo de la vista en cuadrícula y selecciona Registros. Aquí encontrarás el mensaje de excepción y podrás empezar a triar. Si una tarea falla más veces que el número de reintentos especificado, el estado de esa tarea y del DAG se establecerá como fallido.
Además de probar un DAG de extremo a extremo, Airflow facilita la escritura de pruebas unitarias. Cuando creas inicialmente tu entorno utilizando astro dev start
, se crea para ti un directorio tests/
. Aquí puedes añadir pruebas unitarias tanto para tu DAG como para los componentes de tu DAG.
A continuación se muestra una prueba unitaria de la configuración de nuestro DAG. Esta prueba valida varios de los parámetros establecidos al definir, como start_date
, schedule
, y catchup
. Una vez que hayas escrito tu prueba, navega hasta la raíz de tu proyecto y ejecútala:
from airflow.models.dagbag import DagBag
from datetime import datetime
import pytz
def test_market_etl_config():
# Pull the DAG
market_etl_dag = DagBag().get_dag("market_etl")
# Assert start date, schedule, and catchup
assert market_etl_dag.start_date == datetime(2024, 3, 25, 9, tzinfo=pytz.UTC)
assert market_etl_dag.schedule_interval == "@daily"
assert market_etl_dag.catchup
astro dev pytest
Este comando ejecutará todas las pruebas unitarias de tu directorio tests/
. Si sólo quieres ejecutar una única prueba, puedes proporcionar la ruta al archivo como argumento del comando anterior. Además de utilizar la CLI de Astro para ejecutar pruebas, se puede utilizar cualquier ejecutor de pruebas de Python para escribir y ejecutar pruebas unitarias.
Para proyectos personales, escribir pruebas unitarias te ayudará a asegurarte de que tu código funciona como esperas. En un entorno empresarial, las pruebas unitarias son casi siempre necesarias. La mayoría de los equipos de datos utilizarán algún tipo de herramienta CI/CD para desplegar su proyecto Airflow. Este proceso suele implicar la ejecución de pruebas unitarias y la validación de sus resultados para garantizar que el DAG que has escrito está listo para la producción. Para obtener más información sobre las pruebas unitarias, consulta nuestro tutorial Cómo utilizar Pytest para pruebas unitarias, así como la Introducción a las pruebas en Python.
Consejos y técnicas avanzadas de flujo de aire
Hemos construido una sencilla canalización de datos que funciona, e incluso hemos utilizado técnicas de transformación y persistencia. En otros casos, Airflow está equipado para orquestar flujos de trabajo complejos mediante operadores personalizados y creados por el proveedor, procesando terabytes de datos.
Algunos ejemplos son S3ToSnowflakeOperator
, y DatabricksRunNowOperator
, que permiten fácilmente la integración con una pila de datos mayor. Trabajar con este tipo de operadores es delicado en un entorno de afición. Por ejemplo, para utilizar S3ToSnowflakeOperator
, necesitarías tener tanto cuentas de AWS como de Snowflake y la configuración del recurso entre el que transferirías los datos.
Además de los flujos de trabajo ETL, Airflow es compatible con los flujos de trabajo ELT, que se están convirtiendo ampliamente en la norma del sector para los equipos que aprovechan los almacenes de datos en la nube. Asegúrate de tener esto en cuenta cuando diseñes canalizaciones de datos.
En el componente de carga de nuestro pipeline, creamos una conexión a una base de datos SQLite, que posteriormente se recuperó y utilizó para persistir los datos. Las Conexiones, a veces llamadas Secretos, son una función de Airflow diseñada para simplificar las interacciones con los sistemas de origen y destino. Al utilizar estas conexiones para almacenar información sensible, como tu clave API Polygon, mejoras la seguridad de tu código. Este enfoque también te permite gestionar las credenciales por separado de tu código base. Siempre que sea posible, es aconsejable utilizar ampliamente Conexiones para mantener tu flujo de trabajo seguro y organizado.
Te habrás dado cuenta de que el teletipo de la acción "AMZN" estaba codificado en nuestras tareas hit_polygon_api
y flatten_market_data
. Esto nos permitió extraer, transformar y cargar los datos de este único teletipo de bolsa. Pero, ¿y si quisieras utilizar este mismo código para varios teletipos de bolsa? Por suerte, es fácil generar DAGs dinámicamente. Con una refactorización mínima de nuestro código, podríamos hacer un bucle sobre una lista de teletipos de bolsa y parametrizar los valores de los teletipos de bolsa. Esto ayuda a que tus DAG sean más modulares y portátiles. Para obtener información sobre la generación dinámica de DAGs, consulta la documentación Generar DAGs dinámicamente en Airflow de Astronomer.
Conclusión
¡Enhorabuena! Has construido un DAG Airflow para extraer, transformar y cargar datos bursátiles de la API Polygon utilizando Python, pandas y SQLite. Por el camino, perfeccionaste tus habilidades para construir diagramas de arquitectura y especificaciones técnicas, crear conexiones Airflow y probar tus DAG. A medida que continúes tu viaje por Airflow, experimenta con técnicas más avanzadas que te ayuden a hacer que tus canalizaciones sean robustas, resistentes y reutilizables.
Mucha suerte y ¡feliz codificación!
Aprende Ingeniería de Datos con Datacamp
curso
Introduction to Airflow in Python
curso
Understanding Data Engineering
blog
Lista de las 19 mejores herramientas ETL y por qué elegirlas
DataCamp Team
12 min
blog
Cómo convertirse en ingeniero de datos en 2023: 5 pasos para el éxito profesional
tutorial
Construir un transformador con PyTorch
tutorial