Skip to content
Project: Cleaning an Orders Dataset with PySpark
As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.
An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.
You can see the dataset schema below along with the cleaning requirements:
orders_data.parquet
orders_data.parquet| column | data type | description | cleaning requirements | 
|---|---|---|---|
order_date | timestamp | Date and time when the order was made | Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date | 
time_of_day | string | Period of the day when the order was made | New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm | 
order_id | long | Order ID | N/A | 
product | string | Name of a product ordered | Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase | 
product_ean | double | Product ID | N/A | 
category | string | Broader category of a product | Ensure all values are lowercase | 
purchase_address | string | Address line where the order was made ("House Street, City, State Zipcode") | N/A | 
purchase_state | string | US State of the purchase address | New column containing: the State that the purchase was ordered from | 
quantity_ordered | long | Number of product units ordered | N/A | 
price_each | double | Price of a product unit | N/A | 
cost_price | double | Cost of production per product unit | N/A | 
turnover | double | Total amount paid for a product (quantity x price) | N/A | 
margin | double | Profit made by selling a product (turnover - cost) | N/A | 
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)
spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()from pyspark.sql.functions import (
    col, to_timestamp, to_date, hour, when, lower, regexp_extract
)
# Load the original Parquet file
orders_data = spark.read.parquet("orders_data.parquet")
# Step 0: Clean column names (if necessary)
orders_data = orders_data.toDF(*[c.strip().lower().replace(" ", "_") for c in orders_data.columns])
# Step 1: Convert order_date to timestamp
orders_data = orders_data.withColumn("order_date", to_timestamp("order_date"))
# Step 2: Remove orders placed between 00:00 and 05:00
orders_data = orders_data.filter(~(hour("order_date").between(0, 5)))
# Step 3: Extract hour
orders_data = orders_data.withColumn("order_hour", hour("order_date"))
# Step 4: Create time_of_day column
orders_data = orders_data.withColumn(
    "time_of_day",
    when((col("order_hour") >= 5) & (col("order_hour") < 12), "morning")
    .when((col("order_hour") >= 12) & (col("order_hour") < 18), "afternoon")
    .when((col("order_hour") >= 18) & (col("order_hour") < 24), "evening")
)
# Step 5: Strip time to keep only date
orders_data = orders_data.withColumn("order_date", to_date("order_date"))
# Step 6: Clean product and category columns
orders_data = orders_data.withColumn("product", lower(col("product")))
orders_data = orders_data.withColumn("category", lower(col("category")))
# Step 7: Remove rows with 'tv' in product name
orders_data = orders_data.filter(~col("product").contains("tv"))
# Step 8: Extract purchase_state from purchase_address
orders_data = orders_data.withColumn(
    "purchase_state",
    regexp_extract(col("purchase_address"), r",\s([A-Z]{2})\s\d{5}$", 1)
)
# Step 9: Drop helper column
orders_data = orders_data.drop("order_hour")
# Step 10: Save cleaned data
orders_data.write.mode("overwrite").parquet("orders_data_clean.parquet")
print("✅ Cleaned file saved as orders_data_clean.parquet")