Skip to content

As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.

You can see the dataset schema below along with the cleaning requirements:

orders_data.parquet

columndata typedescriptioncleaning requirements
order_datetimestampDate and time when the order was madeModify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date
time_of_daystringPeriod of the day when the order was madeNew column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm
order_idlongOrder IDN/A
productstringName of a product orderedRemove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase
product_eandoubleProduct IDN/A
categorystringBroader category of a productEnsure all values are lowercase
purchase_addressstringAddress line where the order was made ("House Street, City, State Zipcode")N/A
purchase_statestringUS State of the purchase addressNew column containing: the State that the purchase was ordered from
quantity_orderedlongNumber of product units orderedN/A
price_eachdoublePrice of a product unitN/A
cost_pricedoubleCost of production per product unitN/A
turnoverdoubleTotal amount paid for a product (quantity x price)N/A
margindoubleProfit made by selling a product (turnover - cost)N/A

from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()
# Start here, using as many cells as you require
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

# Initiate a Spark session
spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

# IMPORT DATA

# Read data from the parquet file
orders_data = spark.read.parquet('orders_data.parquet')

# DATA CLEANING AND PREPROCESSING

orders_data = (
    orders_data
    # Create a new column time_of_day
    .withColumn(
        'time_of_day',
        # When/otherwise (similar to case/when/else) statements extracting hour from timestamp
        F.when((F.hour('order_date') >= 0) & (F.hour('order_date') <= 5), 'night')
         .when((F.hour('order_date') >= 6) & (F.hour('order_date') <= 11), 'morning')
         .when((F.hour('order_date') >= 12) & (F.hour('order_date') <= 17), 'afternoon')
         .when((F.hour('order_date') >= 18) & (F.hour('order_date') <= 23), 'evening')
        # You can keep the otherwise statement as None to validate whether the conditions are exhaustive
         .otherwise(None)
    )
    # Filter by time of day
    .filter(
        F.col('time_of_day') != 'night'
    )
    # Cast order_date to date as it is originally a timestamp
    .withColumn(
        'order_date',
        F.col('order_date').cast(types.DateType())
    )
)


orders_data = (
    orders_data
    # Make product and category columns lowercase
    .withColumn(
        'product',
        F.lower('product')
    )
    .withColumn(
        'category',
        F.lower('category')
    )
    # Remove rows where product column contains "tv" (as you have already made it lowercase)
    .filter(
        ~F.col('product').contains('tv')
    )
)


orders_data = (
    orders_data
    # First you split the purchase address by space (" ")
    .withColumn(
        'address_split',
        F.split('purchase_address', ' ')
    )
    # If you look at the address lines, you can see that the state abbreviation is always at the 2nd last position
    .withColumn(
        'purchase_state',
        F.col('address_split').getItem(F.size('address_split') - 2)
    )
    # Dropping address_split columns as it is a temporary technical column
    .drop('address_split')
)

# Use distinct and count to calculate the number of unique values
n_states = (
    orders_data
    .select('purchase_state')
    .distinct()
    .count()
)


# EXPORT

# Export the resulting table to parquet format with the new name
(
    orders_data
    .write
    .parquet(
        'orders_data_clean.parquet',
        mode='overwrite',
    )
)