Saltar al contenido principal

Construir un ETL Pipeline con Airflow

Domina los fundamentos de la extracción, transformación y carga de datos con Apache Airflow.
Actualizado 30 jul 2024  · 15 min de lectura

Bienvenido al mundo de las canalizaciones ETL con Apache Airflow. En este tutorial, nos centraremos en extraer datos bursátiles mediante la API Polygon, transformar estos datos y, a continuación, cargarlos en una base de datos SQLite para facilitar su acceso y manipulación. ¡Empecemos!

¿Qué es Apache Airflow y Airflow ETL?

Apache Airflow se considera un estándar del sector para la orquestación de datos y la gestión de canalizaciones. Se ha hecho popular entre los científicos de datos, los ingenieros de aprendizaje automático y los profesionales de la IA por su capacidad para orquestar flujos de trabajo complejos, gestionar dependencias entre tareas, reintentar tareas fallidas y proporcionar un registro exhaustivo.

Airflow ETL se refiere al uso de Apache Airflow para gestionar procesos ETL. Para repasar, ETL es un tipo de integración de datos que consiste en extraer datos de varias fuentes, transformarlos en un formato adecuado para el análisis y cargarlos en un destino final, como un almacén de datos. 

Configurar nuestro entorno de desarrollo Airflow 

Primero debemos configurar nuestro entorno de desarrollo antes de poder empezar a construir una canalización ETL con Airflow. Para obtener información detallada sobre cómo configurar nuestro entorno de desarrollo, consulta nuestro tutorial Introducción a Apache Airflow.

También tendremos que instalar la CLI de Astro. Consulta las instrucciones en Astronomer, que mantiene la CLI de Astro y proporciona documentación detallada.

Crear un proyecto de flujo de aire

Tras configurar nuestro entorno e instalar la CLI de Astro, creamos un proyecto Airflow. Para ello, abrimos un intérprete de comandos de terminal y creamos un nuevo directorio con la ruta elegida.

 ~/Documents/data-engineering/ETL-pipeline/ 

Desde la raíz de este directorio, ejecutamos el siguiente comando para crear los recursos necesarios:

astro dev init

El contenido del directorio será algo parecido a lo siguiente. El resultado exacto puede variar.

├── dags/
├── include/
├── plugins/
├── tests/
├── airflow_settings.yaml
├── Dockerfile
├── packages.txt
└── requirements.txt

Para poner en marcha tu proyecto, ejecuta el siguiente comando: 

astro dev start

Tu entorno Airflow tardará aproximadamente un minuto en girar. Una vez que lo haga, ve a localhost: 8080 en tu navegador web, y te aparecerá la interfaz de usuario de Airflow.

¡Ya estás listo para empezar a desarrollar tu propio ETL pipeline con Airflow!

Diseñar un pipeline ETL

Es importante tomarse tiempo y planificar cada componente de tu pipeline antes de escribir ni una sola línea de código. En particular, es un buen hábito identificar primero tu fuente de datos y el destino al que se cargarán tus datos. Al dedicar algún tiempo a identificar las fuentes y el destino de los datos, también aprendemos cómo se transformarán los datos por el camino.

En nuestro ejemplo, diseñaremos una canalización de datos para extraer datos bursátiles de la API Polygon, antes de transformarlos y cargarlos en una base de datos SQLite. En este caso, el sistema de origen es la API Polygon, y el destino es una base de datos SQLite. Ilustremos esto con una imagen: 

Un diagrama que incluye un origen (API Polygon) y un destino (base de datos SQLite).Diagrama de origen y destino

Sabemos por nuestra experiencia como ingenieros de datos que, para preparar los datos que se van a cargar en una base de datos SQLite, es necesario transformarlos de JSON a un formato tabular. Creamos un plan para transformar nuestros datos una vez extraídos de la API Polygon, utilizando Python nativo y la biblioteca pandas. Actualicemos nuestro visual para mostrar este cambio:

Un diagrama de arquitectura que incluye la API Polygon, la lógica de transformación y una base de datos SQLite.Diagrama de arquitectura


Al añadir esta información adicional, hemos creado un diagrama de arquitectura, que es una amplia representación visual de nuestro sistema. Vemos los tres pasos lógicos de este pipeline, que corresponden a la E, la T y la L de nuestro proceso.

También podemos traducir cada una de estas tareas en un grafo acíclico dirigido, de DAG, que es una configuración específica que define todo el conjunto de tareas que ejecutará Airflow, su secuencia y sus dependencias entre sí. Aprende más sobre los grafos acíclicos dirigidos siguiendo nuestro Curso de Introducción a la Ingeniería de Datos, que repasa en detalle los DAG de Airflow.

En un entorno de datos empresarial, muchos equipos utilizan otro tipo de documentos llamados especificaciones técnicas, o tech specs, para acordar y documentar las opciones de diseño. En nuestro caso, basta con crear una tabla para documentar las opciones de diseño de nuestro conducto de datos. 


Tipo de operador

ID de tarea

Notas

Extraer

hit_polygon_api

Utiliza la API TaskFlow y construye una función Python para autenticar y golpear la API Polygon, antes de devolver la respuesta

Transforma

flatten_market_data

Aplana los datos devueltos por la tarea hit_polygon_api, prepáralos para cargarlos en SQLite

Carga

load_market_data

Carga los datos aplanados en SQLite

Opciones de canalización de datos


Ten en cuenta que nuestro DAG se centra en los detalles de las tareas y puede que no abarque toda la información. Podemos utilizar una segunda tabla para documentar detalles adicionales que tengamos que limar. Nuestras preguntas incluyen:

  • ¿Con qué frecuencia se ejecutará este DAG? 
  • ¿Qué ocurre si falla una tarea de la cadena de producción? 
  • ¿Y si queremos recoger datos sobre poblaciones adicionales?

Parámetro

Valor

ID DAG

market_etl

Fecha de inicio

1 de enero de 2024 (9:00 AM UTC)

Intervalo

Diario

¿Ponte al día?

Verdadero (cargar todos los datos desde el 1 de enero de 2024)

Concurrencia

1 DAG funcionando a la vez

Reintentos de tarea, retardo de reintentos

3 reintentos, 5 minutos de retraso en cada reintento

¿Múltiples tickers?

DAG de generación dinámica

Cuestiones por resolver

Hagamos balance de todo lo que hemos conseguido: Hemos creado un diagrama de arquitectura, documentado cómo se puede desglosar el pipeline en tareas Airflow, e identificado la información de alto nivel necesaria para configurar el DAG.

Si quieres saber más sobre cómo diseñar, desarrollar y probar canalizaciones de datos, consulta la Introducción a las canalizaciones de datos de DataCamp. Aquí dominarás los fundamentos de la construcción de canalizaciones ETL con Python, así como las mejores prácticas para garantizar que tu solución sea sólida, resistente y reutilizable.

Construir un ETL Pipeline con Airflow

Organizaremos cómo construimos nuestro canal ETL siguiendo los pasos en orden. Adoptar un enfoque estructurado garantiza que cada fase se ejecute con precisión. 

Extraer datos con Airflow

Antes de extraer datos de la API de Polygon, tendremos que crear un token de API visitando Polygon y seleccionando el botón Crear clave de API. Ten en cuenta que para este tutorial no es necesario suscribirse a una suscripción de pago: el nivel gratuito generará una clave API y proporcionará toda la funcionalidad que necesitamos. Sólo recuerda copiar y guardar tu clave API.

Después de haber creado una clave API, ya estamos listos para empezar a extraer datos de la API Polygon con Airflow. Utilizamos la tabla de especificaciones técnicas que hemos creado, que incluye los detalles de la configuración de nuestro DAG, para ayudarnos a codificar. 

from airflow import DAG 
from datetime import datetime, timedelta

with DAG(
    dag_id="market_etl",
    start_date=datetime(2024, 1, 1, 9),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5)
    }
) as dag:

Vamos a añadir nuestra primera tarea. Utilizaremos la API TaskFlow y el módulo requests para extraer datos de apertura-cierre de existencias de la API Polygon. 

import requests
...
@task()

def hit_polygon_api(**context):
    # Instantiate a list of tickers that will be pulled and looped over
    stock_ticker = "AMZN"
    # Set variables
    polygon_api_key= "<your-api-key>"
  ds = context.get("ds")

  # Create the URL
    url= f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
    response = requests.get(url)
    # Return the raw data
    return response.json()

Hay algunas cosas que notarás en este código. 

  • La función hit_polygon_api está decorada con @task. Este decorador convierte la función en una tarea Airflow que puede ejecutarse como parte de un DAG.
  • El parámetro context se define en la firma hit_polygon_api. Posteriormente se utiliza para extraer el valor almacenado en la clave ds.
  • context es un diccionario que contiene metadatos sobre la tarea y el DAG.
  • Al extraer ds del diccionario context, obtenemos la fecha de data_interaval_end en el formato YYYY-mm-dd.  
  • Para asegurarnos de que nuestra nueva tarea se ejecuta cuando se ejecuta el DAG, tendremos que añadir una llamada a la página hit_polygon_api.

Juntando todo esto, el código para crear la primera parte de nuestro canal ETL tiene este aspecto.

from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta import requests

with DAG(
    dag_id="market_etl",
    start_date=datetime(2024, 1, 1, 9),
    schedule="@daily",
    catchup=False,
    max_active_runs=1,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5)
    }
) as dag:
    # Create a task using the TaskFlow API
    @task()
    def hit_polygon_api(**context):
        # Instantiate a list of tickers that will be pulled and looped over
        stock_ticker = "AMZN"
        # Set variables
        polygon_api_key = "<your-api-key>"
        ds = context.get("ds")
        # Create the URL
        url = f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
        response = requests.get(url)
        # Return the raw data
        return response.json()
    hit_polygon_api()


Observarás que la respuesta contendrá un error cuando se ejecute para el 1 de enero de 2024. Como el mercado está cerrado ese día, Polygon devuelve una respuesta señalando la excepción. Nos ocuparemos de esto en el siguiente paso.

Transformar datos con Airflow

Una vez extraídos los datos de la API de Polígonos, estamos listos para transformarlos.

Para ello, crearemos otra tarea utilizando la API TaskFlow. Esta tarea se llamará flatten_market_data y tiene los parámetros polygon_response, que son los datos brutos devueltos por la función hit_polygon_api, y **context. Vamos a echar un vistazo más de cerca a polygon_response dentro de un momento.

La transformación real que vamos a hacer es bastante sencilla. Vamos a aplanar el JSON devuelto por la API Polygon en una lista. El truco es que proporcionaremos valores por defecto únicos para cada clave si no existe en la respuesta.

Por ejemplo, si la clave from no existe en la respuesta, proporcionaremos un valor por defecto utilizando el contexto de Airflow. Esto soluciona el problema que vimos antes, cuando una respuesta contenía un conjunto limitado de claves (debido a que el mercado estaba cerrado). A continuación, convertiremos la lista en un DataFrame pandas, y lo devolveremos. La tarea resultante tiene este aspecto:

@task
def flatten_market_data(polygon_response, **context):
    # Create a list of headers and a list to store the normalized data in
    columns = {
        "status": "closed",
        "from": context.get("ds"),
        "symbol": "AMZN",
        "open": None,
        "high": None,
        "low": None,
        "close": None,
        "volume": None
    }
    # Create a list to append the data to
    flattened_record = []
    for header_name, default_value in columns.items():
        # Append the data
        flattened_record.append(polygon_response.get(header_name, default_value))
    # Convert to a pandas DataFrame
    flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
    return flattened_dataframe

Tendremos que añadir una dependencia entre las tareas hit_polygon_api y flatten_market_data. Para ello, actualizaremos el código de nuestro DAG para que coincida con lo que hay a continuación:

import pandas as pd
with DAG(
    dag_id="market_etl",
    start_date=datetime(2024, 1, 1, 9),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5)
    }
) as dag:
    @task()
    def hit_polygon_api(**context):
        ...
    @task
    def flatten_market_data(polygon_response, **context):
        # Create a list of headers and a list to store the normalized data in
        columns = {
            "status": None,
            "from": context.get("ds"),
            "symbol": "AMZN",
            "open": None,
            "high": None,
            "low": None,
            "close": None,
            "volume": None
        }
        # Create a list to append the data to
        flattened_record = []
        for header_name, default_value in columns.items():
            # Append the data
            flattened_record.append(polygon_response.get(header_name, default_value))
        # Convert to a pandas DataFrame
        flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
        return flattened_dataframe
# Set dependencies
    raw_market_data = hit_polygon_api()
    flatten_market_data(raw_market_data)

Aquí, el valor de retorno de la tarea hit_polygon_api se almacena en raw_market_data. A continuación, raw_market_data se pasa como argumento a la tarea flatten_market_data mediante el parámetro polygon_response. Este código no sólo establece una dependencia entre las tareas hit_polygon_api y flatten_market_data, sino que también permite compartir datos entre estas dos tareas.

Aunque la transformación que realizamos fue relativamente sencilla, Airflow ofrece la capacidad de ejecutar una amplia gama de manipulaciones de datos más complicadas. Además de utilizar tareas nativas, es fácil aprovechar la amplia colección de ganchos y operadores creados por proveedores de Airflow para orquestar la transformación utilizando herramientas como AWS Lambda y DBT.

Cargar datos con Airflow

Hemos llegado al último paso de nuestro ETL pipeline. Hemos planeado hacerlo utilizando una base de datos SQLite y una tarea final definida con la API TaskFlow.

Como antes, definiremos un único parámetro al crear nuestra tarea. A éste le llamaremos flattened_dataframe. Esto permite pasar los datos devueltos por la tarea flatten_market_data a nuestra nueva tarea.

Antes de escribir el código para cargar nuestros datos en una base de datos SQLite, tendremos que crear una conexión en la interfaz de usuario de Airflow. Para abrir la página de conexiones, sigue estos pasos:

  • Abre la interfaz de usuario de Airflow
  • Pasa el ratón por encima de la opción Admin
  • Selecciona Conexiones
  • Pulsa el icono + para crear una nueva conexión.

Accederás a una pantalla parecida a ésta:

La página de conexiones de Airflow al iniciar una nueva conexión.Página de conexiones de Airflow

Para rellenar la página de conexiones, sigue estos pasos:

  • Cambia el Tipo de Conexión a Sqlite.
  • Proporciona el valor "base_de_mercado_conn" para el Id. de conexión.
  • Añade "/usr/local/airflow/market_data.db" al campo Host.

La configuración de esta conexión debe parecerse a la imagen siguiente. Cuando lo haga, haz clic en Guardar.

Unaconexión Airflow a una base de datos SQLite Conexión Airflow a base de datos SQLite no guardada

Ahora que hemos creado una conexión, podemos recuperar esta información en nuestra tarea utilizando la dirección SqliteHook. Echa un vistazo al código que aparece a continuación.

from airflow.providers.sqlite.hooks.sqlite import SqliteHook
@task
def load_market_data(flattened_dataframe):
    # Pull the connection
    market_database_hook = SqliteHook("market_database_conn")
    market_database_conn = market_database_hook.get_sqlalchemy_engine()
    # Load the table to Postgres, replace if it exists
    flattened_dataframe.to_sql(
        name="market_data",
        con=market_database_conn,
        if_exists="append",
        index=False
    )
    # print(market_database_hook.get_records("SELECT * FROM market_data;"))

Con este código, estamos creando una conexión a la base de datos SQLite que especificamos en el último paso. A continuación, el motor de conexión se recupera del gancho utilizando el método .get_sqlalchemy_engine(). A continuación, se pasa como argumento al parámetro con cuando se llama al método .to_sql() sobre el flattened_dataframe.

Ten en cuenta que el nombre de la tabla en la que se escriben estos datos es market_data, y si la tabla existe, se añade a ella. Cuando hago pruebas, me gusta comprobar que los datos se escriben recuperando e imprimiendo estos registros. Puedes hacerlo descomentando la última línea de esta tarea.

Juntando todo esto, nuestro código debería parecerse un poco a esto:

from airflow import DAG
from airflow.decorators import task
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from datetime import datetime, timedelta
import requests
import pandas as pd

with DAG(
    dag_id="market_etl",
    start_date=datetime(2024, 1, 1, 9),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5)
    }
) as dag:
    # Create a task using the TaskFlow API
    @task()
    def hit_polygon_api(**context):
        # Instantiate a list of tickers that will be pulled and looped over
        stock_ticker = "AMZN"
        # Set variables
        polygon_api_key = "<your-api-key>"
        ds = context.get("ds")
        # Create the URL
        url = f"<https://api.polygon.io/v1/open-close/{stock_ticker}/{ds}?adjusted=true&apiKey={polygon_api_key}>"
        response = requests.get(url)
        # Return the raw data
        return response.json()
    @task
    def flatten_market_data(polygon_response, **context):
        # Create a list of headers and a list to store the normalized data in
        columns = {
            "status": None,
            "from": context.get("ds"),
            "symbol": "AMZN",
            "open": None,
            "high": None,
            "low": None,
            "close": None,
            "volume": None
        }
        # Create a list to append the data to
        flattened_record = []
        for header_name, default_value in columns.items():
            # Append the data
            flattened_record.append(polygon_response.get(header_name, default_value))
        # Convert to a pandas DataFrame
        flattened_dataframe = pd.DataFrame([flattened_record], columns=columns.keys())
        return flattened_dataframe
    @task
    def load_market_data(flattened_dataframe):
        # Pull the connection
        market_database_hook = SqliteHook("market_database_conn")
        market_database_conn = market_database_hook.get_sqlalchemy_engine()
        # Load the table to SQLite, append if it exists
        flattened_dataframe.to_sql(
            name="market_data",
            con=market_database_conn,
            if_exists="append",
            index=False
        )
    # Set dependencies between tasks
    raw_market_data = hit_polygon_api()
    transformed_market_data = flatten_market_data(raw_market_data)
    load_market_data(transformed_market_data)

De nuevo, actualizamos nuestras dependencias para permitir que los datos devueltos por la tarea flatten_market_data se pasen a la tarea load_market_data. La vista del gráfico resultante para nuestro DAG tiene este aspecto:

.Vista gráfica de un canal ETLVista gráfica de un canal ETL

Prueba

Ahora que has construido tu primer Airflow DAG, es el momento de asegurarte de que funciona. Hay varias formas de hacerlo, pero una de las más comunes es ejecutar el DAG de extremo a extremo.

Para ello, en la interfaz de usuario de Airflow, ve a tu DAG y cambia el interruptor de azul a activo. Dado que catchup se ha establecido en Verdadero, la ejecución de un DAG debería ponerse en cola y comenzar a ejecutarse. Si una tarea se ejecuta correctamente, la casilla asociada a ella en la interfaz de usuario se volverá verde. Si todas las tareas del DAG tienen éxito, el DAG se marcará como exitoso y se activará la siguiente ejecución del DAG.

Si una tarea falla, el estado será de reintento y estará marcado en amarillo. Cuando esto ocurra, lo mejor es echar un vistazo a los registros de esa tarea. Para ello, haz clic en el recuadro amarillo de la vista en cuadrícula y selecciona Registros. Aquí encontrarás el mensaje de excepción y podrás empezar a triar. Si una tarea falla más veces que el número de reintentos especificado, el estado de esa tarea y del DAG se establecerá como fallido.

Además de probar un DAG de extremo a extremo, Airflow facilita la escritura de pruebas unitarias. Cuando creas inicialmente tu entorno utilizando astro dev start, se crea para ti un directorio tests/. Aquí puedes añadir pruebas unitarias tanto para tu DAG como para los componentes de tu DAG.

A continuación se muestra una prueba unitaria de la configuración de nuestro DAG. Esta prueba valida varios de los parámetros establecidos al definir, como start_date, schedule, y catchup. Una vez que hayas escrito tu prueba, navega hasta la raíz de tu proyecto y ejecútala:

from airflow.models.dagbag import DagBag
from datetime import datetime
import pytz

def test_market_etl_config():
    # Pull the DAG
    market_etl_dag = DagBag().get_dag("market_etl")
    # Assert start date, schedule, and catchup
    assert market_etl_dag.start_date == datetime(2024, 3, 25, 9, tzinfo=pytz.UTC)
    assert market_etl_dag.schedule_interval == "@daily"
    assert market_etl_dag.catchup
astro dev pytest

Este comando ejecutará todas las pruebas unitarias de tu directorio tests/. Si sólo quieres ejecutar una única prueba, puedes proporcionar la ruta al archivo como argumento del comando anterior. Además de utilizar la CLI de Astro para ejecutar pruebas, se puede utilizar cualquier ejecutor de pruebas de Python para escribir y ejecutar pruebas unitarias.

Para proyectos personales, escribir pruebas unitarias te ayudará a asegurarte de que tu código funciona como esperas. En un entorno empresarial, las pruebas unitarias son casi siempre necesarias. La mayoría de los equipos de datos utilizarán algún tipo de herramienta CI/CD para desplegar su proyecto Airflow. Este proceso suele implicar la ejecución de pruebas unitarias y la validación de sus resultados para garantizar que el DAG que has escrito está listo para la producción. Para obtener más información sobre las pruebas unitarias, consulta nuestro tutorial Cómo utilizar Pytest para pruebas unitarias, así como la Introducción a las pruebas en Python.

Consejos y técnicas avanzadas de flujo de aire

Hemos construido una sencilla canalización de datos que funciona, e incluso hemos utilizado técnicas de transformación y persistencia. En otros casos, Airflow está equipado para orquestar flujos de trabajo complejos mediante operadores personalizados y creados por el proveedor, procesando terabytes de datos.

Algunos ejemplos son S3ToSnowflakeOperator, y DatabricksRunNowOperator, que permiten fácilmente la integración con una pila de datos mayor. Trabajar con este tipo de operadores es delicado en un entorno de afición. Por ejemplo, para utilizar S3ToSnowflakeOperator, necesitarías tener tanto cuentas de AWS como de Snowflake y la configuración del recurso entre el que transferirías los datos.

Además de los flujos de trabajo ETL, Airflow es compatible con los flujos de trabajo ELT, que se están convirtiendo ampliamente en la norma del sector para los equipos que aprovechan los almacenes de datos en la nube. Asegúrate de tener esto en cuenta cuando diseñes canalizaciones de datos.

En el componente de carga de nuestro pipeline, creamos una conexión a una base de datos SQLite, que posteriormente se recuperó y utilizó para persistir los datos. Las Conexiones, a veces llamadas Secretos, son una función de Airflow diseñada para simplificar las interacciones con los sistemas de origen y destino. Al utilizar estas conexiones para almacenar información sensible, como tu clave API Polygon, mejoras la seguridad de tu código. Este enfoque también te permite gestionar las credenciales por separado de tu código base. Siempre que sea posible, es aconsejable utilizar ampliamente Conexiones para mantener tu flujo de trabajo seguro y organizado.

Te habrás dado cuenta de que el teletipo de la acción "AMZN" estaba codificado en nuestras tareas hit_polygon_api y flatten_market_data. Esto nos permitió extraer, transformar y cargar los datos de este único teletipo de bolsa. Pero, ¿y si quisieras utilizar este mismo código para varios teletipos de bolsa? Por suerte, es fácil generar DAGs dinámicamente. Con una refactorización mínima de nuestro código, podríamos hacer un bucle sobre una lista de teletipos de bolsa y parametrizar los valores de los teletipos de bolsa. Esto ayuda a que tus DAG sean más modulares y portátiles. Para obtener información sobre la generación dinámica de DAGs, consulta la documentación Generar DAGs dinámicamente en Airflow de Astronomer.

Conclusión

¡Enhorabuena! Has construido un DAG Airflow para extraer, transformar y cargar datos bursátiles de la API Polygon utilizando Python, pandas y SQLite. Por el camino, perfeccionaste tus habilidades para construir diagramas de arquitectura y especificaciones técnicas, crear conexiones Airflow y probar tus DAG. A medida que continúes tu viaje por Airflow, experimenta con técnicas más avanzadas que te ayuden a hacer que tus canalizaciones sean robustas, resistentes y reutilizables. 

Mucha suerte y ¡feliz codificación!

Temas

Aprende Ingeniería de Datos con Datacamp

programa

Data Engineer

57hrs hr
Gain in-demand skills to efficiently ingest, clean, manage data, and schedule and monitor pipelines, setting you apart in the data engineering field.
Ver detallesRight Arrow
Comienza El Curso
Ver másRight Arrow
Relacionado

blog

Lista de las 19 mejores herramientas ETL y por qué elegirlas

Esta entrada de blog cubre las 19 mejores herramientas ETL (Extraer, Transformar, Cargar) para organizaciones, como Talend Open Studio, Oracle Data Integrate y Hadoop.
DataCamp Team's photo

DataCamp Team

12 min

Data Engineering Vector Image

blog

Cómo convertirse en ingeniero de datos en 2023: 5 pasos para el éxito profesional

Descubre cómo convertirte en ingeniero de datos y aprende las habilidades esenciales. Desarrolla tus conocimientos y tu portafolio para prepararte para la entrevista de ingeniero de datos.
Javier Canales Luna's photo

Javier Canales Luna

18 min

tutorial

Construir un transformador con PyTorch

Aprende a construir un modelo Transformer utilizando PyTorch, una potente herramienta del aprendizaje automático moderno.
Arjun Sarkar's photo

Arjun Sarkar

26 min

tutorial

Cómo formar a un LLM con PyTorch

Domine el proceso de entrenamiento de grandes modelos lingüísticos con PyTorch, desde la configuración inicial hasta la implementación final.
Zoumana Keita 's photo

Zoumana Keita

8 min

See MoreSee More