Skip to content
PySpark Data Cleaning to SQL Server
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType
from pyspark.sql.functions import col, trim, when, regexp_replace, to_date
# Ruta al driver JDBC
driver_path = r"C:\Jar-spark\sqljdbc_12.10\enu\jars\mssql-jdbc-12.10.1.jre11.jar"
# Parámetros de conexión
jdbc_url = (
"jdbc:sqlserver://DESKTOP-ILGINUM\\SQLEXPRESS:1433;"
"databaseName=db_practica;"
"encrypt=true;"
"trustServerCertificate=true;"
)
usuario = "sa"
contrasena = "xxxxx"
tabla_destino = "frutas_limpias"
file_path = r"C:\Users\Benjamin\Downloads\ventas_frutas_sucio.csv"
# Configurar JAVA_HOME
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.27.6-hotspot"
# Crear SparkSession
try:
spark = SparkSession.builder \
.appName("LimpiezaVentaFrutas") \
.config("spark.jars", driver_path) \
.getOrCreate()
print("SparkSession creada correctamente")
except Exception as e:
print("Error al crear SparkSession:", e)
raise SystemExit()
# Definir schema del CSV
schema = StructType([
StructField("fecha", StringType(), True),
StructField("producto", StringType(), True),
StructField("categoria", StringType(), True),
StructField("cantidad", FloatType(), True),
StructField("precio_unitario", DoubleType(), True),
StructField("cliente", StringType(), True),
StructField("ciudad", StringType(), True)
])
# Leer archivo CSV
try:
df = spark.read.option("header", True).schema(schema).csv(file_path)
print("CSV leído correctamente")
except Exception as e:
print("Error al leer CSV:", e)
spark.stop()
raise SystemExit()
# Limpieza de datos
try:
for campo in ["producto", "categoria", "cliente", "ciudad"]:
df = df.withColumn(campo, trim(col(campo)))
df = df.withColumn("producto", regexp_replace(col("producto"), "platano", "Plátano"))
df = df.withColumn("categoria", regexp_replace(col("categoria"), "frutas", "Fruta"))
df = df.withColumn("cantidad", col("cantidad").cast("integer"))
df = df.withColumn("cantidad", when(col("cantidad").isNull(), 0).otherwise(col("cantidad")))
df = df.withColumn("cliente", when(col("cliente").isNull(), "Desconocido").otherwise(col("cliente")))
df = df.withColumn("precio_unitario", when((col("producto") == "Naranja") & col("precio_unitario").isNull(), 460.0).otherwise(col("precio_unitario")))
df = df.withColumn("fecha", when(col("fecha").rlike(r"\d{2}/\d{2}/\d{4}"), to_date(col("fecha"),"dd/MM/yyyy")).otherwise(to_date(col("fecha"),"yyyy-MM-dd")))
print("Limpieza de datos completada")
df.show()
except Exception as e:
print("Error durante limpieza:", e)
spark.stop()
raise SystemExit()
# Escribir en SQL Server
try:
df.write.format("jdbc") \
.option("url", jdbc_url) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("dbtable", tabla_destino) \
.option("user", usuario) \
.option("password", contrasena) \
.mode("overwrite") \
.save()
print(f"Datos insertados correctamente en la tabla [{tabla_destino}]")
except Exception as e:
print("Error al guardar en SQL Server:", e)
# Cerrar sesión
spark.stop()
print("Proceso finalizado")