Saltar al contenido principal

Las 5 mejores alternativas a Airflow para la orquestación de datos (ejemplos de código incluidos)

Explora cinco alternativas de orquestación de datos a Airflow con ejemplos de código para construir, ejecutar y visualizar una canalización ETL sencilla.
Actualizado 29 jul 2024  · 25 min de lectura

Elige una plantilla de meme Alternativas al flujo de aire

Imagen del autor.

Apache Airflow es una popular herramienta de orquestación de datos de código abierto diseñada para construir, programar y supervisar canalizaciones de datos. Cuenta con un panel de control que ayuda a gestionar el estado de los flujos de trabajo, lo que la convierte en una herramienta perfecta para la mayoría de las necesidades de los flujos de trabajo.

Sin embargo, Airflow carece de algunas funciones importantes que pueden ser vitales para los requisitos de orquestación de datos complejos y modernos.

En este tutorial, exploraremos cinco alternativas a Airflow que ofrecen capacidades mejoradas y abordan algunas de sus limitaciones. Además, aprenderemos a construir una canalización ETL sencilla utilizando cada herramienta, ejecutarla y visualizarla en su panel de control.

¿Por qué elegir una alternativa de flujo de aire? 

Airflow es una potente herramienta para diversos flujos de trabajo de datos, pero tiene varias limitaciones que pueden hacer que las empresas se planteen alternativas. 

He aquí algunas razones por las que podrías elegir una alternativa:

  1. Curva de aprendizaje pronunciada: Airflow puede resultar difícil de aprender, sobre todo para quienes no conocen las herramientas de gestión de flujos de trabajo.
  2. Mantenimiento: Requiere un mantenimiento importante, sobre todo en despliegues a gran escala.
  3. Documentación insuficiente: Los usuarios han informado de múltiples problemas de documentación que dificultan la resolución de problemas o el aprendizaje de nuevas funciones. 
  4. Requiere muchos recursos: El flujo de aire puede consumir muchos recursos, ya que requiere una cantidad considerable de ordenadores y memoria para funcionar con eficacia.
  5. Flexibilidad limitada para usuarios que no utilicen Python: La filosofía del flujo de trabajo como código se basa en gran medida en Python, que puede excluir a expertos en la materia que no dominen la programación.
  6. Escalabilidad: Algunos usuarios informan de dificultades para escalar Airflow para grandes flujos de trabajo.
  7. Procesamiento limitado en tiempo real: Airflow está diseñado principalmente para el procesamiento por lotes, no para flujos de datos en tiempo real.

Antes de sumergirnos en la parte de codificación de otras herramientas de orquestación de datos, es importante que aprendas a escribir la canalización de datos utilizando Apache Airflow siguiendo el tutorial Introducción a Apache Airflow para que puedas comparar las alternativas.

Si eres completamente nuevo en Airflow, considera la posibilidad de realizar el breve Introducción a Airflow en Python para aprender los fundamentos de la construcción y programación de canalizaciones de datos.

5 Mejores Alternativas a Airflow para la Orquestación de Datos

Ahora vamos a describir las 5 mejores alternativas a Airflow y a mostrar cómo utilizarlas con ejemplos prácticos de código.

1. Prefecto

Prefect es una herramienta de orquestación de flujos de trabajo en Python de código abierto creada para los ingenieros modernos de datos y aprendizaje automático. Ofrece una API sencilla que te permite construir una canalización de datos rápidamente y gestionarla a través de un panel interactivo. 

Perfect ofrece un modelo de ejecución híbrido, lo que significa que puedes desplegar el flujo de trabajo en la nube y ejecutarlo allí o utilizar el repositorio local.

Comparado con Airflow, Prefect viene con funciones avanzadas como dependencias automatizadas de tareas, activadores basados en eventos, notificaciones integradas, infraestructura específica de flujos de trabajo e intercambio de datos entre tareas. Estas capacidades la convierten en una potente solución para gestionar flujos de trabajo complejos de forma eficiente y eficaz.

Prefect es sencillo y tiene funciones potentes. Básicamente tardé 5 minutos en ejecutar el código de ejemplo. Me gusta especialmente cómo está diseñada la interfaz de usuario del panel de control, cómo puedes configurar notificaciones, volver a ejecutar canalizaciones, gestionar y supervisar todo a través del panel de control.

Abid Ali AwanAuthor

Lee el Airflow vs Prefect: Decidir cuál es la adecuada para tu flujo de trabajo de datos blog para conocer la comparación detallada entre estas dos herramientas de orquestación de datos .

Primeros pasos con Prefect

Comenzaremos nuestro proyecto Prefect instalando el paquete Python. Ejecuta el siguiente comando en un terminal.

$ pip install -U prefect

Después, crearemos un script de Python llamado prefect_etl.py y escribiremos el siguiente código.

from prefect import task, flow
import pandas as pd

# Extract data
@task
def extract_data():
    # Simulating data extraction
    data = {
        "name": ["Alice", "Bob", "Charlie"],
        "age": [25, 30, 35],
        "city": ["New York", "Los Angeles", "Chicago"]
    }
    df = pd.DataFrame(data)
    return df

# Transform data
@task
def transform_data(df: pd.DataFrame):
    # Example transformation: adding a new column
    df["age_plus_ten"] = df["age"] + 10
    return df

# Load data
@task
def load_data(df: pd.DataFrame):
    # Simulating data load
    print("Loading data to target destination:")
    print(df)

# Defining the flow
@flow(log_prints=True)
def etl():
    raw_data = extract_data()
    transformed_data = transform_data(raw_data)
    load_data(transformed_data)

# Running the flow
if __name__ == "__main__":
    etl()

El código anterior define las funciones de tarea extract_data(),transform_data(), y load_data()y las ejecuta en serie en una función de flujo llamada etl(). Estas funciones se crean utilizando decoradores Prefect Python. 

En resumen, estamos creando un DataFrame de pandas, transformándolo y mostrando el resultado final mediante print. Esta es una forma sencilla de simular un canal ETL.

Para ejecutar el flujo de trabajo, basta con ejecutar el script de Python mediante el siguiente comando.

$ python prefect_etl.py 

Como podemos ver, la ejecución de nuestro flujo de trabajo se ha completado con éxito.

Registros de ejecución de flujo prefijado

Registros de ejecución de flujo prefecto.

Desplegar el flujo

Ahora desplegaremos nuestro flujo de trabajo para que podamos ejecutarlo según una programación o activarlo en función de un evento. Desplegar el flujo también nos permite supervisar y gestionar varios flujos de trabajo de forma centralizada.

Para desplegar el flujo, utilizaremos la CLI de Prefect. La función deploy requiere el nombre del archivo Python, el nombre de la función de flujo en el archivo y el nombre de la implantación. En este caso, llamaremos a esta implantación "simple_etl".

$ prefect deploy prefect_etl.py:etl -n 'simple_etl'

Tras ejecutar el script anterior en el terminal, puede que recibamos el mensaje de que no tenemos un grupo de trabajadores para ejecutar el despliegue. Para crear el pool de trabajadores, utiliza el siguiente comando.

$ prefect worker start --pool 'datacamp'

Ahora que tenemos un grupo de trabajadores, lanzaremos otra ventana de terminal y ejecutaremos el despliegue. El comando prefect deployment run requiere "<nombre-función-flujo>/<nombre-despliegue>" como argumento, como se muestra en el comando siguiente.

$ prefect deployment run 'etl/simple_etl

Como resultado de la ejecución del despliegue, recibirás el mensaje de que el flujo de trabajo se está ejecutando. Normalmente, al flujo que se crea se le asigna un nombre aleatorio, en mi caso witty-lorikeet.

Creating flow run for deployment 'etl/simple_etl'...
Created flow run 'witty-lorikeet'.
└── UUID: 4e0495b0-9c7e-4ed8-b9ab-5160994dc7f0
└── Parameters: {}
└── Job Variables: {}
└── Scheduled start time: 2024-06-22 14:05:01 PKT (now)
└── URL: <no dashboard available>

Para ver el registro completo, vuelve a la ventana del terminal donde iniciaste el pool de trabajadores.

Resumen de la ejecución del flujo prefecto

Resumen de la ejecución del flujo prefecto.

Debes iniciar el servidor web de Prefect para visualizar la ejecución del flujo de forma más sencilla y gestionar otros flujos de trabajo.

$ prefect server start 

Tras ejecutar el comando anterior, se te redirigirá al panel de Prefect. También puedes ir directamente a http://127.0.0.1:4200 en tu navegador.

Prefecta interfaz de usuario del servidor web

Prefecta interfaz de usuario del servidor web

El panel de control te permite volver a ejecutar el flujo de trabajo, ver los registros, comprobar los pools de trabajo, establecer notificaciones y seleccionar otras opciones avanzadas. Es una solución completa para tus necesidades modernas de orquestación de datos.

Para aprender a construir y ejecutar pipelines de aprendizaje automático utilizando Prefect, puedes seguir el tutorial Uso de Prefect para flujos de trabajo de aprendizaje automático de aprendizaje automático.

2. Dagster

Dasgter es un marco de trabajo de código abierto diseñado para que los ingenieros de datos definan, programen y supervisen canalizaciones de datos. Es muy escalable y facilita la colaboración entre varios equipos de datos. 

Dagster permite a los usuarios definir sus activos de datos como funciones Python utilizando decoradores. Una vez definidos estos activos, los usuarios pueden ejecutarlos sin problemas mediante la programación o activadores basados en eventos.

Comparado con Airflow, Dagster nos permite desarrollar, probar y revisar la canalización localmente, proporciona un enfoque de la orquestación basado en activos, y es nativo de la nube y los contenedores.

En lugar de pensar en el flujo de trabajo en términos de pasos y flujos, tuve que cambiar mi forma de pensar y construir una canalización utilizando activos de datos. Aparte de eso, construir y ejecutar una simple canalización ETL fue bastante sencillo. Además, el servidor web es relativamente mínimo, pero proporciona toda la información para supervisar activos, ejecuciones y despliegues.

Abid Ali AwanAuthor

Empezar con Dagster

Crearemos una sencilla canalización ETL, la ejecutaremos y la visualizaremos utilizando el servidor web Dagster. De forma similar al panel de Prefect, el servidor web de Dagster proporciona formas centralizadas de supervisar múltiples flujos de trabajo y programar ejecuciones y activos.

Empezaremos instalando el paquete Python.

$ pip install dagster -q

A continuación, crearemos tres funciones Python para extraer, transformar y cargar los datos. Estas funciones se denominan create_dirty_data(),clean_data(), y load_cleaned_data()en el código. Utilizando el decorador @asset, declararemos las funciones como activos de datos en Dagster.

A continuación, crearemos el trabajo de activos (job variable) utilizando todos los activos (all_assets variable) y, a continuación, crearemos la definición del activo (defs variable).

Puedes saltarte la parte de definición de activos, pero es importante si quieres programar tu ejecución, ejecutar varios trabajos y configurar sensores.

import pandas as pd
import numpy as np
from dagster import asset, Definitions, define_asset_job, materialize

@asset
def create_dirty_data():
    # Create a sample DataFrame with dirty data
    data = {
        'Name': [' John Doe ', 'Jane Smith', 'Bob Johnson ', '  Alice Brown'],
        'Age': [30, np.nan, 40, 35],
        'City': ['New York', 'los angeles', 'CHICAGO', 'Houston'],
        'Salary': ['50,000', '60000', '75,000', 'invalid']
    }
    df = pd.DataFrame(data)
    
    # Save the DataFrame to a CSV file
    dirty_file_path = 'dag_data/dirty_data.csv'
    df.to_csv(dirty_file_path, index=False)
    
    return dirty_file_path

@asset
def clean_data(create_dirty_data):
    # Read the dirty CSV file
    df = pd.read_csv(create_dirty_data)
    
    # Clean the data
    df['Name'] = df['Name'].str.strip()
    df['Age'] = pd.to_numeric(df['Age'], errors='coerce').fillna(df['Age'].mean())
    df['City'] = df['City'].str.upper()
    df['Salary'] = df['Salary'].replace('[\$,]', '', regex=True)
    df['Salary'] = pd.to_numeric(df['Salary'], errors='coerce').fillna(0)
    
    # Calculate average salary
    avg_salary = df['Salary'].mean()
    
    # Save the cleaned DataFrame to a new CSV file
    cleaned_file_path = 'dag_data/cleaned_data.csv'
    df.to_csv(cleaned_file_path, index=False)
    
    return {
        'cleaned_file_path': cleaned_file_path,
        'avg_salary': avg_salary
    }

@asset
def load_cleaned_data(clean_data):
    cleaned_file_path = clean_data['cleaned_file_path']
    avg_salary = clean_data['avg_salary']
    
    # Read the cleaned CSV file to verify
    df = pd.read_csv(cleaned_file_path)
    
    print({
        'num_rows': len(df),
        'num_columns': len(df.columns),
        'avg_salary': avg_salary
    })

# Define all assets
all_assets = [create_dirty_data, clean_data, load_cleaned_data]

# Create a job that will materialize all assets
job = define_asset_job("all_assets_job", selection=all_assets)

# Create Definitions object
defs = Definitions(
    assets=all_assets,
    jobs=[job]
)

if __name__ == "__main__":
    result = materialize(all_assets)
    print("Pipeline execution result:", result.success)

Puedes ejecutar el código anterior en un cuaderno Jupyter o crear el archivo Python y ejecutarlo. 

Como resultado de la ejecución del código, obtendremos un registro completo de la ejecución del flujo de trabajo. 

Resumen de la ejecución de Dagster

Servidor web Dagster

Para visualizar los activos y las ejecuciones de trabajo, tenemos que instalar y ejecutar el servidor web Dagster. El servidor web te permite ejecutar los trabajos, materializar activos individuales y supervisar varios trabajos a la vez.

$ pip install dagster-webserver

Para iniciar el servidor Dagster, utilizaremos la CLI de Daster y le proporcionaremos la ubicación del archivo Python. En este caso, he llamado al archivo dagster_pipe.py.

$ dagster dev -f dagster_pipe.py  

El comando anterior iniciará automáticamente el servidor web en tu navegador. También puedes ir directamente a http://127.0.0.1:3000 en tu navegador.

Servidor web Dagster

IU del servidor web Dagster.

Sólo hemos desplegado el trabajo hasta ahora. Para ejecutar el flujo de trabajo, ve a la pestaña "Ejecuciones" y haz clic en el botón "Lanzar una nueva ejecución". 

¡La ejecución debería completarse con éxito! Para ver los registros, haz clic en el ID de la tirada que te interese.

Dagster corre vista detallada

Registros de ejecución de Dagster.

3. Mago IA

Mage AI es un marco de orquestación de datos híbridos de código abierto. Híbrido significa que obtienes la flexibilidad de un Cuaderno Jupyter y el control del código modular. 

Cualquiera, incluso con conocimientos limitados de Python, puede construir, ejecutar y supervisar canalizaciones de datos. En lugar de escribir y ejecutar directamente un archivo Python, crearás un proyecto Mage AI y lo lanzarás en el panel de control, donde podrás construir, ejecutar y gestionar tus canalizaciones de datos.

Comparado con Airflow, Mage AI ofrece una interfaz fácil de usar y de utilizar, lo que lo convierte en una opción excelente para los que se inician en la ingeniería de datos. Está diseñado teniendo en cuenta la escalabilidad y es capaz de manejar grandes volúmenes de datos y estructuras de canalización complejas de forma eficaz.

Me sentí extraña porque era completamente diferente a lo que estoy acostumbrada. Tuve que instalar e iniciar la interfaz web de Mage AI. Se suponía que iba a ser fácil, pero me resultó difícil construir y ejecutar la canalización ETL. Por otro lado, puedo entender por qué este diseño único puede resultar atractivo para las personas que se inician en este campo, ya que básicamente consiste en arrastrar y soltar y pulsar botones.

Abid Ali AwanAuthor

Primeros pasos con la IA de Mago

Iniciar la IA Mago es bastante sencillo. Sólo tenemos que instalar el paquete Mage AI Python.

$ pip install mage-ai

Y pon en marcha el proyecto Mago AI. 

$ mage start mage_ai_etl 

El comando anterior iniciará el servidor web. Como se ha mencionado antes, toda la edición de código, ejecución de trabajos y supervisión de trabajos se realiza a través de la interfaz de usuario de Mage AI.

IU de la IA de los magos

IU de la IA de mago.

Haz clic en "+ Nueva canalización" para crear tu primera canalización ETL. Yo llamé al mío "simple_etl".

Creación de la nueva canalización en Mage AI

Creación de la nueva canalización en la IA de Mago.

A continuación, la interfaz te pedirá que añadas un módulo para empezar a codificar. Selecciona el módulo "Cargador de datos" y escribe el siguiente código Python. 

Aquí declaramos una funcióncreate_sample_csv(), que es el primer paso de nuestro pipeline. Utilizamos el decorador Mage AI @data_loader. También definimos una función test_output() que afirma si existe la salida. Esto ayuda a gestionar la dependencia de las tareas.

import io
import pandas as pd

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def create_sample_csv() -> pd.DataFrame:
    """
    Create a sample CSV file with duplicates and missing values
    """
    csv_data = """
category,product,quantity,price
Electronics,Laptop,5,1000
Electronics,Smartphone,10,500
Clothing,T-shirt,50,20
Clothing,Jeans,30,50
Books,Novel,100,15
Books,Textbook,20,80
Electronics,Laptop,5,1000
Clothing,T-shirt,,20
Electronics,Tablet,,300
Books,Magazine,25,
"""
    return pd.read_csv(io.StringIO(csv_data.strip()))
   
@test
def test_output(df) -> None:
    """
    Template code for testing the output of the block.
    """
    assert df is not None, 'The output is undefined'

Creación del bloque cargador de datos en Mage AI

Crear el bloque cargador de datos en Mago AI.

A continuación, crea otro módulo llamado "Transformador" y añade la función limpiar_datos() como se muestra en el código siguiente.

Puedes ignorar la función test() función; sólo tienes que añadir la función principal del transformador, clean_data().

import pandas as pd

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@transformer
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean and transform the data
    """
    # Remove duplicates
    df = df.drop_duplicates()
    # Fill missing values with 0
    df = df.fillna(0)
    return df

@test
def test_output(df) -> None:
    """
    Template code for testing the output of the block.
    """
    assert df is not None, 'The output is undefined'

Del mismo modo, crea un módulo "Exportador de datos" y añade el siguiente código. El código declara una función de carga de datos, export_data_to_csv(), que guarda los datos transformados en un archivo CSV .

import pandas as pd

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_data_to_csv(df: pd.DataFrame) -> None:
    """
    Export the processed data to a CSV file
    """
    df.to_csv('output_data.csv', index=False)
    print("Data exported successfully to output_data.csv")

Para ejecutar la canalización, ve a la pestaña "Activador" y haz clic en "Ejecutar@una vez".

Ejecutar el pipeline en Mage AI

Ejecutar la canalización en la IA de Mago.

Para ver los registros de ejecución, ve a la pestaña "Ejecuciones" y haz clic en el botón "Registros" de la tubería ejecutada recientemente.

Registros de ejecución de flujo de IA de mago

Registros de ejecución de flujo de IA de mago.

4. Kedro

Kedro es otro popular marco de orquestación de datos de código abierto que es ligeramente diferente de las otras herramientas. Se creó para ingenieros de aprendizaje automático y toma prestados muchos conceptos de la ingeniería de software, aplicándolos a proyectos de aprendizaje automático.

Kedro está diseñado para ser altamente modular, lo que significa que incluso para exportar un conjunto de datos, tienes que crear un catálogo de datos que especifique la ubicación y el tipo de datos, garantizando una gestión de datos estandarizada y eficaz en todo el proceso.

Para comprender cómo encaja Kedro en el ecosistema del aprendizaje automático, puedes explorar varias herramientas de MLOps leyendo el artículo Las 25 principales herramientas MLOps que debes conocer en 2024.

Comparada con Airflow, la API de Kedro es más sencilla para construir una canalización de datos. Se centra más en la ingeniería de aprendizaje automático y ofrece categorización y versionado de datos.

La parte de codificación es bastante sencilla, pero surgen problemas cuando quieres ejecutar tu canalización. Tienes que crear un catálogo de datos, registrar la canalización y averiguar la estructura del proyecto Kedro. Yo diría que es más desafiante en comparación con Dagster y Prefect. Sin embargo, entiendo por qué está diseñado así: para que tu canal de datos sea fiable y no contenga errores.

Abid Ali AwanAuthor

Primeros pasos con Kedro

Construir una canalización de datos Kedro es un juego diferente. El marco es modular, y necesitas comprender la estructura del proyecto y los distintos pasos que implica para ejecutar el flujo de trabajo con éxito. 

Empieza instalando el paquete Kedro Python. 

$ pip install kedro

Inicializa el proyecto Kedro. 

$ kedro new --name=kedro_etl --tools=none --example=n 

Desplázate al directorio del proyecto. 

$ cd kedro-etl  

Crea una carpeta dentro de la carpeta pipelines carpeta llamada data_processing.

$ mkdir -p src/kedro_etl/pipelines/data_processing  

Crea un archivo Python llamado kedro_pipe.py y ábrelo en tu IDE favorito, por ejemplo, puedes utilizar Visual Studio Code.

$ code src/kedro_etl/pipelines/data_processing/kedro_pipe.py

El script de Python debe contener las funciones de extracción, transformación y carga, que son nodos de la canalización. En este caso, se trata de las funciones create_sample_data(),clean_data(), y load_and_process_data().

A continuación, unimos estos nodos utilizando la clase Kedro Pipeline dentro de la funcióncreate_pipeline(). En la función pipeline, definimos nodos, y cada nodo tiene inputs,outputs, y un nodo name.

import pandas as pd
import numpy as np
from kedro.pipeline import Pipeline, node

def create_sample_data():
    data = {
        'id': range(1, 101),
        'name': [f'Person_{i}' for i in range(1, 101)],
        'age': np.random.randint(18, 80, 100),
        'salary': np.random.randint(20000, 100000, 100),
        'missing_values': [np.nan if i % 10 == 0 else i for i in range(100)]
    }
    return pd.DataFrame(data)

def clean_data(df: pd.DataFrame):
    # Remove rows with missing values
    df_cleaned = df.dropna()

    # Convert salary to thousands
    df_cleaned['salary'] = df_cleaned['salary'] / 1000

    # Capitalize names
    df_cleaned['name'] = df_cleaned['name'].str.upper()

    return df_cleaned

def load_and_process_data(df: pd.DataFrame):
    # Calculate average salary
    avg_salary = df['salary'].mean()

    # Add a new column for salary category
    df['salary_category'] = df['salary'].apply(
        lambda x: 'High' if x > avg_salary else 'Low')

    # Calculate age groups
    df['age_group'] = pd.cut(df['age'], bins=[0, 30, 50, 100], labels=[
                             'Young', 'Middle', 'Senior'])

    print(df)
    return df

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=create_sample_data,
                inputs=None,
                outputs="raw_data",
                name="create_sample_data_node",
            ),
            node(
                func=clean_data,
                inputs="raw_data",
                outputs="cleaned_data",
                name="clean_data_node",
            ),
            node(
                func=load_and_process_data,
                inputs="cleaned_data",
                outputs="processed_data",
                name="load_and_process_data_node",
            ),
        ]
    )

Si ejecutamos la canalización sin crear el catálogo de datos, no exportará nuestros datos. Por tanto, tenemos que ir al archivo conf/base/catalog.yml y editarlo proporcionando la configuración del conjunto de datos.

raw_data:
  type: pandas.CSVDataset
  filepath: ./data/kedro/sample_data.csv

cleaned_data:
  type: pandas.CSVDataset
  filepath: ./data/kedro/cleaned_data.csv

processed_data:
  type: pandas.CSVDataset
  filepath: ./data/kedro/processed_data.csv

También debemos incluir nuestro archivo Python recién creado dentro del registro de la tubería. Para ello, ve al archivo Python src/simple_etl/pipeline_registry.py e incluye el siguiente código .

"""Project pipelines."""
from __future__ import annotations

from kedro.pipeline import Pipeline
from kedro_etl.pipelines.data_processing import kedro_pipe

def register_pipelines() -> Dict[str, Pipeline]:
    data_processing_pipeline = kedro_pipe.create_pipeline()

    return {
        "__default__": data_processing_pipeline,
        "data_processing": data_processing_pipeline,
    }

Ejecuta la tubería y visualiza los registros en vivo dentro del terminal ejecutando el siguiente comando.

$ kedro run

Registros de la ejecución del oleoducto Kedro

Registros de la ejecución del oleoducto Kedro.

Tras ejecutar el pipeline, tus archivos se almacenarán en formato CSV en la ubicación definida en el catálogo de datos.

Archivos de salida de la ejecución de la tubería Kedro

Ficheros de salida de la ejecución de la tubería Kedro.

Si tienes problemas para ejecutar la tubería, considera la posibilidad de instalar Kedro con todas las extensiones. 

$ pip install "kedro[all]"

Visualización Kedro

Podemos visualizar y compartir nuestros pipelines instalando la herramienta kedro-viz .

$ pip install kedro-viz

A continuación, ejecutar el siguiente comando nos permitirá visualizar todas las canalizaciones de datos y nodos de datos. También proporciona una opción para el rastreo de experimentos y la posibilidad de compartir la visualización de la tubería.

$ kedro viz run

Kedro Visualization

Visualización de la tubería Kedro.

5. Luigi

Luigi es un framework de código abierto basado en Python desarrollado por Spotify que destaca en la gestión de procesos por lotes de larga duración y canalizaciones de datos complejas. Es bueno en la resolución de dependencias, la gestión de flujos de trabajo, la visualización y la recuperación de fallos, lo que lo convierte en una potente herramienta para orquestar flujos de trabajo de datos. 

En comparación con Airflow, Luigi tiene una API mínima, programación por calendario y una base de usuarios leales que te ayudarán con cualquier problema relacionado con el canal de orquestación de datos. 

Si eres principiante en Python, puede que te resulte difícil construir y ejecutar las canalizaciones. Sin embargo, la documentación y las guías pueden ayudarte a empezar rápidamente. Los registros proporcionan información limitada, y el panel de control es sólo una herramienta de visualización de DAGs y dependencias.

Abid Ali AwanAuthor

Primeros pasos con Luigi

La creación de una canalización de datos Luigi requiere conocimientos de programación orientada a objetos. Empecemos por instalar el paquete Luigi Python. 

$ pip install luigi

Para desarrollar un canal ETL sencillo en Luigi, crearemos tareas interconectadas. En lugar de crear funciones Python como tareas, crearemos una clase Python para cada paso dentro de la tubería, FetchData,ProcessData y GenerateReport. Cada clase tendrá tres funciones llamadas requires(), output(), y run().

Las funciones requires() y output() conectarán las tareas, y la funciónrun() ejecutará el código de procesamiento. Al final, construiremos la tubería utilizando la última tarea de la tubería. 

import luigi
import pandas as pd
import numpy as np

class FetchData(luigi.Task):
    def output(self):
        return luigi.LocalTarget('data/fetch_data.csv')
    
    def run(self):
        # Simulate fetching data by creating a sample CSV file
        data = {
            'column1': [1, 2, np.nan, 4],
            'column2': ['A', 'B', 'C', np.nan]
        }
        df = pd.DataFrame(data)
        df.to_csv(self.output().path, index=False)

class ProcessData(luigi.Task):
    def requires(self):
        return FetchData()
    
    def output(self):
        return luigi.LocalTarget('data/process_data.csv')
    
    def run(self):
        df = pd.read_csv(self.input().path)
        # Fill missing values
        df['column1'].fillna(df['column1'].mean(), inplace=True)
        df['column2'].fillna('B', inplace=True)
        df.to_csv(self.output().path, index=False)

class GenerateReport(luigi.Task):
    def requires(self):
        return ProcessData()
    
    def output(self):
        return luigi.LocalTarget('data/generate_report.txt')
    
    def run(self):
        df = pd.read_csv(self.input().path)
        # Simple data analysis: calculate mean of column1 and value counts of column2
        mean_column1 = df['column1'].mean()
        value_counts_column2 = df['column2'].value_counts()
        
        with self.output().open('w') as out_file:
            out_file.write(f'Mean of column1: {mean_column1}\n')
            out_file.write('Value counts of column2:\n')
            out_file.write(value_counts_column2.to_string())

if __name__ == '__main__':
    luigi.build([GenerateReport()], local_scheduler=True)

Ejecuta el código anterior en el Cuaderno Jupyter o crea el archivo Python y ejecútalo utilizando el terminal. 

Resumen de la ejecución de Luigi

De forma similar a Luigi, también puedes aprender a construir un ETL pipeline con Apache Airflow. El tutorial cubre los aspectos básicos de la extracción, transformación y carga de datos con Apache Airflow.

Luigi planificador central

Necesitamos inicializar el planificador central de Luigi para programar las ejecuciones de la tubería o activarlas con un evento.

Inicia el programador escribiendo el siguiente comando en el terminal.

$ luigid  
2024-06-22 13:35:18,636 luigi[25056] INFO: logging configured by default settings
2024-06-22 13:35:18,636 luigi.scheduler[25056] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state
2024-06-22 13:35:18,640 luigi.server[25056] INFO: Scheduler starting up

Para ejecutar el pipeline, abre un nuevo terminal y escribe el siguiente comando. El comando Luigi requiere un nombre de archivo Python y la última tarea que queremos ejecutar. En este caso, el nombre del archivo es luigi_pipe.py, y nuestra última tarea Luigi es GenerateReport.

$ python -m luigi --module luigi_pipe GenerateReport

Si quieres visualizar la ejecución del pipeline y el estado de las tareas, sólo tienes que ir a http://localhost:8082 en tu navegador.

Planificador Central Luigi webUI

Planificador Central Luigi webUI.

¡Con esto terminamos nuestro recorrido por las 5 mejores alternativas a Airflow! Si quieres profundizar en alguno de los ejemplos presentados en este artículo, aquí tienes algunos recursos a tener en cuenta:

  • Para obtener el código fuente y los datos de Prefect, Dagster y Luigi, consulta el espacio de trabajo de espacio de trabajo DataLab.
  • Para el código fuente y los datos de Mage AI y Kedro, consulta el repositorio GitHub.

Reflexiones finales

En este tutorial, hemos hablado de las mejores alternativas gratuitas y de código abierto a Airflow. También hemos aprendido sobre cada herramienta de orquestación de datos, y hemos construido y ejecutado una sencilla canalización ETL. Ver ejemplos de código te ayudará a decidir cuál funciona mejor para tu caso de uso.

Si eres principiante, te sugiero que empieces con Prefect o Mage AI, ya que son fáciles de usar y vienen con una configuración sencilla. Sin embargo, si buscas herramientas más avanzadas que se adhieran a las prácticas de la ingeniería del software, te recomiendo que explores Dagster, Kedro y Luigi.

Después de explorar este artículo, el siguiente paso natural en tu viaje por la ingeniería de datos es obtener una certificación como la de DataCamp Ingeniero de Datos en Python para aprender sobre otras herramientas y construir una canalización de datos de extremo a extremo que puedas desplegar en producción.

Temas

¡Aprende más sobre ingeniería de datos con estos cursos!

curso

Introduction to Data Engineering

4 hr
114.5K
Learn about the world of data engineering in this short course, covering tools and topics like ETL and cloud computing.
Ver detallesRight Arrow
Comienza El Curso
Ver másRight Arrow
Relacionado

blog

Lista de las 19 mejores herramientas ETL y por qué elegirlas

Esta entrada de blog cubre las 19 mejores herramientas ETL (Extraer, Transformar, Cargar) para organizaciones, como Talend Open Studio, Oracle Data Integrate y Hadoop.
DataCamp Team's photo

DataCamp Team

12 min

blog

11 técnicas de visualización de datos para cada caso de uso con ejemplos

Descubra los análisis, técnicas y herramientas más populares para dominar el arte de la visualización de datos.

blog

Procesamiento por lotes frente a procesamiento por flujos: Cuándo utilizar cada uno y por qué es importante

Una mirada en profundidad a las diferencias entre el procesamiento por lotes y por flujos para los conductos de datos. Aprende las ventajas y desventajas únicas de cada enfoque para aplicar las técnicas adecuadas a tu canalización de datos.
Tim Lu's photo

Tim Lu

21 min

blog

20 proyectos de análisis de datos para todos los niveles

Explora nuestra lista de proyectos de análisis de datos para principiantes, estudiantes de último curso y profesionales. La lista consta de proyectos guiados/no guiados y tutoriales con código fuente.
Abid Ali Awan's photo

Abid Ali Awan

17 min

blog

Explorando 12 de las mejores herramientas de visualización de datos en 2023 con ejemplos

Existen muchas herramientas de visualización de datos. En este artículo, hemos preparado una lista exhaustiva de algunas de las herramientas de visualización de datos más útiles en la ciencia de datos.
Javier Canales Luna's photo

Javier Canales Luna

17 min

See MoreSee More