Skip to content
Project: Cleaning an Orders Dataset with PySpark
As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.
An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.
You can see the dataset schema below along with the cleaning requirements:
orders_data.parquet
orders_data.parquet| column | data type | description | cleaning requirements |
|---|---|---|---|
order_date | timestamp | Date and time when the order was made | Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date |
time_of_day | string | Period of the day when the order was made | New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm |
order_id | long | Order ID | N/A |
product | string | Name of a product ordered | Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase |
product_ean | double | Product ID | N/A |
category | string | Broader category of a product | Ensure all values are lowercase |
purchase_address | string | Address line where the order was made ("House Street, City, State Zipcode") | N/A |
purchase_state | string | US State of the purchase address | New column containing: the State that the purchase was ordered from |
quantity_ordered | long | Number of product units ordered | N/A |
price_each | double | Price of a product unit | N/A |
cost_price | double | Cost of production per product unit | N/A |
turnover | double | Total amount paid for a product (quantity x price) | N/A |
margin | double | Profit made by selling a product (turnover - cost) | N/A |
from pyspark.sql import (
SparkSession,
types,
functions as F,
)
spark = (
SparkSession
.builder
.appName('cleaning_orders_dataset_with_pyspark')
.getOrCreate()
)orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()# Start here, using as many cells as you require
from pyspark.sql import (
SparkSession,
types,
functions as F,
)
# Initiate a Spark session
spark = (
SparkSession
.builder
.appName('cleaning_orders_dataset_with_pyspark')
.getOrCreate()
)
# IMPORT DATA
# Read data from the parquet file
orders_data = spark.read.parquet('orders_data.parquet')
# DATA CLEANING AND PREPROCESSING
orders_data = (
orders_data
# Create a new column time_of_day
.withColumn(
'time_of_day',
# When/otherwise (similar to case/when/else) statements extracting hour from timestamp
F.when((F.hour('order_date') >= 0) & (F.hour('order_date') <= 5), 'night')
.when((F.hour('order_date') >= 6) & (F.hour('order_date') <= 11), 'morning')
.when((F.hour('order_date') >= 12) & (F.hour('order_date') <= 17), 'afternoon')
.when((F.hour('order_date') >= 18) & (F.hour('order_date') <= 23), 'evening')
# You can keep the otherwise statement as None to validate whether the conditions are exhaustive
.otherwise(None)
)
# Filter by time of day
.filter(
F.col('time_of_day') != 'night'
)
# Cast order_date to date as it is originally a timestamp
.withColumn(
'order_date',
F.col('order_date').cast(types.DateType())
)
)
orders_data = (
orders_data
# Make product and category columns lowercase
.withColumn(
'product',
F.lower('product')
)
.withColumn(
'category',
F.lower('category')
)
# Remove rows where product column contains "tv" (as you have already made it lowercase)
.filter(
~F.col('product').contains('tv')
)
)
orders_data = (
orders_data
# First you split the purchase address by space (" ")
.withColumn(
'address_split',
F.split('purchase_address', ' ')
)
# If you look at the address lines, you can see that the state abbreviation is always at the 2nd last position
.withColumn(
'purchase_state',
F.col('address_split').getItem(F.size('address_split') - 2)
)
# Dropping address_split columns as it is a temporary technical column
.drop('address_split')
)
# Use distinct and count to calculate the number of unique values
n_states = (
orders_data
.select('purchase_state')
.distinct()
.count()
)
# EXPORT
# Export the resulting table to parquet format with the new name
(
orders_data
.write
.parquet(
'orders_data_clean.parquet',
mode='overwrite',
)
)