Skip to content

As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.

You can see the dataset schema below along with the cleaning requirements:

orders_data.parquet

columndata typedescriptioncleaning requirements
order_datetimestampDate and time when the order was madeModify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date
time_of_daystringPeriod of the day when the order was madeNew column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm
order_idlongOrder IDN/A
productstringName of a product orderedRemove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase
product_eandoubleProduct IDN/A
categorystringBroader category of a productEnsure all values are lowercase
purchase_addressstringAddress line where the order was made ("House Street, City, State Zipcode")N/A
purchase_statestringUS State of the purchase addressNew column containing: the State that the purchase was ordered from
quantity_orderedlongNumber of product units orderedN/A
price_eachdoublePrice of a product unitN/A
cost_pricedoubleCost of production per product unitN/A
turnoverdoubleTotal amount paid for a product (quantity x price)N/A
margindoubleProfit made by selling a product (turnover - cost)N/A

from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()
### Time of Day col, and filter out night time orders

orders_data = orders_data.withColumn("order_hour", F.hour(orders_data.order_date))
orders_data = orders_data.withColumn(
        "order_period",
        F.when(orders_data.order_hour.between(0,4), 1)
         .when(orders_data.order_hour.between(5,11), 2)
         .when(orders_data.order_hour.between(12,17), 3)
         .when(orders_data.order_hour.between(18,23), 4)
         .otherwise(0)
    )

orders_data = orders_data.where(F.col('order_period') >= 2)

orders_data = orders_data.withColumn(
    "time_of_day",
    F.when(orders_data.order_period == 1, "night")
     .when(orders_data.order_period == 2, "morning")
     .when(orders_data.order_period == 3, "afternoon")
     .when(orders_data.order_period == 4, "evening")
)

orders_data = orders_data.withColumn("order_date", F.col("order_date").cast(types.DateType()))

# orders_data.toPandas().head(1000)
### Lowercase products and category, and Filter out 'TV' products
orders_data = orders_data.withColumn("product", F.lower(F.col('product')))
orders_data = orders_data.filter(~ F.col('product').contains("tv"))


orders_data = orders_data.withColumn("category", F.lower(F.col('category')))
### Purchase State column

orders_data = orders_data.withColumn(
    "purchase_address_split",
    F.split(F.col('purchase_address'), ',')
)

orders_data = orders_data.withColumn(
    "purchase_address_split",
    F.split(
        F.col('purchase_address_split').getItem(
            F.size('purchase_address_split') - 1
        ),
        ' ')
).withColumn(
    "purchase_state",
    F.col('purchase_address_split').getItem(1)
)
orders_data.toPandas().head(1000)
distinct_states = orders_data.select('purchase_state').distinct()
n_states = distinct_states.count()

print('n_states: ', n_states)
print(distinct_states.toPandas())

### Write to file

orders_data.write.mode("overwrite").parquet("orders_data_clean.parquet")