Skip to content

As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.

You can see the dataset schema below along with the cleaning requirements:

orders_data.parquet

columndata typedescriptioncleaning requirements
order_datetimestampDate and time when the order was madeModify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date
time_of_daystringPeriod of the day when the order was madeNew column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm
order_idlongOrder IDN/A
productstringName of a product orderedRemove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase
product_eandoubleProduct IDN/A
categorystringBroader category of a productEnsure all values are lowercase
purchase_addressstringAddress line where the order was made ("House Street, City, State Zipcode")N/A
purchase_statestringUS State of the purchase addressNew column containing: the State that the purchase was ordered from
quantity_orderedlongNumber of product units orderedN/A
price_eachdoublePrice of a product unitN/A
cost_pricedoubleCost of production per product unitN/A
turnoverdoubleTotal amount paid for a product (quantity x price)N/A
margindoubleProfit made by selling a product (turnover - cost)N/A

from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)
# 1) Read the parquet file
orders = spark.read.parquet("orders_data.parquet")
orders.toPandas().head()
# to be sure order_date is a TIMESTAMP 
orders = orders.withColumn("order_timestamp", F.col("order_date").cast("timestamp"))
orders.toPandas().head()
# Extract the hour and filter out orders placed between 00:00 and 05:00 (inclusive)
orders = orders.withColumn("hour", F.hour("order_timestamp"))
orders = orders.filter(~((F.col("hour") >= 0) & (F.col("hour") <= 5)))
# Create time_of_day column (lower bound inclusive, upper bound exclusive)
orders = orders.withColumn(
    "time_of_day",
    F.when((F.col("hour") >= 5) & (F.col("hour") < 12), "morning")
     .when((F.col("hour") >= 12) & (F.col("hour") < 18), "afternoon")
     .otherwise("evening")
)
# Clean product/category: lowercase values and remove rows containing "TV"
orders = orders.withColumn("product", F.lower(F.col("product")))
orders = orders.withColumn("category", F.lower(F.col("category")))
orders = orders.filter(~F.col("product").contains("tv"))  
orders.toPandas().head()
# Extract purchase_state from purchase_address ("House Street, City, State Zipcode")
#  simple method: split by ', ' and take the 3rd part, then take the first token (State)
orders = orders.withColumn("purchase_state", F.split(F.col("purchase_address"), ", ")[2])
orders = orders.withColumn("purchase_state", F.split(F.col("purchase_state"), " ").getItem(0))
orders.toPandas().head()
# Convert final order_date to DATE (if the workbook requires only the date part)
orders = orders.withColumn("order_date", F.col("order_timestamp").cast("date"))
orders.toPandas().head()
# Drop temporary columns
orders_clean = orders.drop("order_timestamp", "hour")
orders_clean.toPandas().head()
# check schema and row count
orders_clean.printSchema()
print("Number of rows after cleaning:", orders_clean.count())
# Export the cleaned dataset as parquet
orders_clean.write.mode("overwrite").parquet("orders_data_clean.parquet")
print("File saved: orders_data_clean.parquet")
# Read the cleaned parquet file and display a sample in Pandas
orders_clean = spark.read.parquet("orders_data_clean.parquet")

# Limit to 100 rows for safety, convert to Pandas and display
pdf = orders_clean.limit(100).toPandas()
pdf