Skip to content

As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.

You can see the dataset schema below along with the cleaning requirements:

orders_data.parquet

columndata typedescriptioncleaning requirements
order_datetimestampDate and time when the order was madeModify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date
time_of_daystringPeriod of the day when the order was madeNew column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm
order_idlongOrder IDN/A
productstringName of a product orderedRemove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase
product_eandoubleProduct IDN/A
categorystringBroader category of a productEnsure all values are lowercase
purchase_addressstringAddress line where the order was made ("House Street, City, State Zipcode")N/A
purchase_statestringUS State of the purchase addressNew column containing: the State that the purchase was ordered from
quantity_orderedlongNumber of product units orderedN/A
price_eachdoublePrice of a product unitN/A
cost_pricedoubleCost of production per product unitN/A
turnoverdoubleTotal amount paid for a product (quantity x price)N/A
margindoubleProfit made by selling a product (turnover - cost)N/A

from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()
# Start here, using as many cells as you require
df =(orders_data
     #create requested columns
     .withColumn('hour',F.hour(F.col('order_date')))
     .withColumn('order_date',F.col('order_date').cast(types.DateType()))
     .withColumn('product',F.lower(F.col('product')))
     .withColumn('time_of_day',
                 
                 F.when(F.col('hour').between(5,11),'morning').when(F.col('hour')
                                                                        
                                                                    .between(12,17),'afternoon').when(F.col('hour').between(18,24),'evening').otherwise(None))
     .withColumn('category',F.lower(F.col('category')))
     #use string functions to isolate state
     .withColumn('purchase_state',(F.split(F.split('purchase_address',',')[2],' ')[1]))
     #Filter out orders placed before 6am and those for TV's
     .filter(F.col('hour')>=6)
     .filter(~F.array_contains(F.split('product',' '),'tv'))
     
     )
df_final = (df.select('order_date','time_of_day','order_id','product','category','purchase_address','purchase_state','quantity_ordered','price_each','cost_price','turnover','margin'))
df_final.show()
df_final.write.mode('overwrite').save('orders_data_clean.parquet')