Skip to content

📄 Enunciado Tienes dos archivos CSV:

books.csv — catálogo de libros

reviews.csv — reseñas hechas por usuarios

Debes construir un pipeline de ETL que:

Extraiga los datos desde los archivos CSV.

Transforme los datos:

Asegúrate de que las columnas rating sean tipo numérico.

Elimina las filas donde rating sea menor a 3.

Agregue el promedio de calificación (rating) por genre.

Cargue los datos limpios y agregados a dos archivos CSV.

Valide que los archivos de salida existan.

Usa logging y try-except para registrar cada paso del proceso.

import pandas as pd
import os 
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

#file_reviews = r"C:\Users\Benjamin\Downloads\reviews.csv"
#file_books = r"C:\Users\Benjamin\Downloads\books.csv"

#Extraer

def  extract(reviews_path, books_path):
    try:

        logging.info("Extrayendo Datasets")
        reviews = pd.read_csv(reviews_path)
        books = pd.read_csv(books_path)
        df = reviews.merge(books, on="book_id", how="left")
        return df
    except Exception as e:
        logging.error(f"Error en extracción {e}")
        raise

#Transformar

def transform(df):
    try:
        logging.info("Transformando datos")
        df["rating"] = pd.to_numeric(df["rating"], errors="coerce")
        df.dropna(subset=["rating"], inplace=True)
        df = df[df["rating"] >= 3]
        logging.info("Transformacion completada")
        return df
    
    except Exception as e:
        logging.error(f"Error en transformacion {e}")
        raise

def aggregate(df):
    try:
        logging.info("Calculando promedio de rating por genero")
        promedio = df.groupby("genre").agg(avg_rating=("rating", "mean")).reset_index().round(2)
        return promedio
    
    except Exception as e:
        logging.error(f"Error en el calculo de promedio {e}")
        raise

def load(clean_df, agg_df):
    try:
        logging.info("Guardando archivos")
        clean_df.to_csv("clean_path.csv", index=False)
        agg_df.to_csv("agg_path.csv", index=False)
        logging.info("Archivos guardados")

    except Exception as e:
        logging.error(f"El archivo no se pudo cargar correctamente {e}")


def validate(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Archivo no encontrado {path}")
    else:
        logging.info(f"Validación Exitosa {path}")

#Ejecución de pipeline

df = extract(r"C:\Users\Benjamin\Downloads\reviews.csv", r"C:\Users\Benjamin\Downloads\books.csv")
clean_df = transform(df)
agg_df = aggregate(clean_df)
load(clean_df, agg_df)


validate("clean_path.csv")
validate("agg_path.csv")