Skip to content
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType
from pyspark.sql.functions import col, trim, when, regexp_replace, to_date

# Ruta al driver JDBC
driver_path = r"C:\Jar-spark\sqljdbc_12.10\enu\jars\mssql-jdbc-12.10.1.jre11.jar"

# Parámetros de conexión
jdbc_url = (
    "jdbc:sqlserver://DESKTOP-ILGINUM\\SQLEXPRESS:1433;"
    "databaseName=db_practica;"
    "encrypt=true;"
    "trustServerCertificate=true;"
)
usuario = "sa"
contrasena = "xxxxx"
tabla_destino = "frutas_limpias"
file_path = r"C:\Users\Benjamin\Downloads\ventas_frutas_sucio.csv"

# Configurar JAVA_HOME
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.27.6-hotspot"

# Crear SparkSession
try:
    spark = SparkSession.builder \
        .appName("LimpiezaVentaFrutas") \
        .config("spark.jars", driver_path) \
        .getOrCreate()
    print("SparkSession creada correctamente")
except Exception as e:
    print("Error al crear SparkSession:", e)
    raise SystemExit()

# Definir schema del CSV
schema = StructType([
    StructField("fecha", StringType(), True),
    StructField("producto", StringType(), True),
    StructField("categoria", StringType(), True),
    StructField("cantidad", FloatType(), True),
    StructField("precio_unitario", DoubleType(), True),
    StructField("cliente", StringType(), True),
    StructField("ciudad", StringType(), True)
])

# Leer archivo CSV
try:
    df = spark.read.option("header", True).schema(schema).csv(file_path)
    print("CSV leído correctamente")
except Exception as e:
    print("Error al leer CSV:", e)
    spark.stop()
    raise SystemExit()

# Limpieza de datos
try:
    for campo in ["producto", "categoria", "cliente", "ciudad"]:
        df = df.withColumn(campo, trim(col(campo)))

    df = df.withColumn("producto", regexp_replace(col("producto"), "platano", "Plátano"))
    df = df.withColumn("categoria", regexp_replace(col("categoria"), "frutas", "Fruta"))
    df = df.withColumn("cantidad", col("cantidad").cast("integer"))
    df = df.withColumn("cantidad", when(col("cantidad").isNull(), 0).otherwise(col("cantidad")))
    df = df.withColumn("cliente", when(col("cliente").isNull(), "Desconocido").otherwise(col("cliente")))
    df = df.withColumn("precio_unitario", when((col("producto") == "Naranja") & col("precio_unitario").isNull(), 460.0).otherwise(col("precio_unitario")))
    df = df.withColumn("fecha", when(col("fecha").rlike(r"\d{2}/\d{2}/\d{4}"), to_date(col("fecha"),"dd/MM/yyyy")).otherwise(to_date(col("fecha"),"yyyy-MM-dd")))

    print("Limpieza de datos completada")
    df.show()
except Exception as e:
    print("Error durante limpieza:", e)
    spark.stop()
    raise SystemExit()

# Escribir en SQL Server
try:
    df.write.format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("dbtable", tabla_destino) \
        .option("user", usuario) \
        .option("password", contrasena) \
        .mode("overwrite") \
        .save()
    print(f"Datos insertados correctamente en la tabla [{tabla_destino}]")
except Exception as e:
    print("Error al guardar en SQL Server:", e)

# Cerrar sesión
spark.stop()
print("Proceso finalizado")