Skip to content
Project: Cleaning an Orders Dataset with PySpark
As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.
An analyst shared a parquet file called "orders_data.parquet" for you to clean and preprocess.
You can see the dataset schema below along with the cleaning requirements:
orders_data.parquet
orders_data.parquet| column | data type | description | cleaning requirements |
|---|---|---|---|
order_date | timestamp | Date and time when the order was made | Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date |
time_of_day | string | Period of the day when the order was made | New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm |
order_id | long | Order ID | N/A |
product | string | Name of a product ordered | Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase |
product_ean | double | Product ID | N/A |
category | string | Broader category of a product | Ensure all values are lowercase |
purchase_address | string | Address line where the order was made ("House Street, City, State Zipcode") | N/A |
purchase_state | string | US State of the purchase address | New column containing: the State that the purchase was ordered from |
quantity_ordered | long | Number of product units ordered | N/A |
price_each | double | Price of a product unit | N/A |
cost_price | double | Cost of production per product unit | N/A |
turnover | double | Total amount paid for a product (quantity x price) | N/A |
margin | double | Profit made by selling a product (turnover - cost) | N/A |
from pyspark.sql import (
SparkSession,
types,
functions as F,
)
spark = (
SparkSession
.builder
.appName('cleaning_orders_dataset_with_pyspark')
.getOrCreate()
)orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()### Time of Day col, and filter out night time orders
orders_data = orders_data.withColumn("order_hour", F.hour(orders_data.order_date))
orders_data = orders_data.withColumn(
"order_period",
F.when(orders_data.order_hour.between(0,4), 1)
.when(orders_data.order_hour.between(5,11), 2)
.when(orders_data.order_hour.between(12,17), 3)
.when(orders_data.order_hour.between(18,23), 4)
.otherwise(0)
)
orders_data = orders_data.where(F.col('order_period') >= 2)
orders_data = orders_data.withColumn(
"time_of_day",
F.when(orders_data.order_period == 1, "night")
.when(orders_data.order_period == 2, "morning")
.when(orders_data.order_period == 3, "afternoon")
.when(orders_data.order_period == 4, "evening")
)
orders_data = orders_data.withColumn("order_date", F.col("order_date").cast(types.DateType()))
# orders_data.toPandas().head(1000)### Lowercase products and category, and Filter out 'TV' products
orders_data = orders_data.withColumn("product", F.lower(F.col('product')))
orders_data = orders_data.filter(~ F.col('product').contains("tv"))
orders_data = orders_data.withColumn("category", F.lower(F.col('category')))### Purchase State column
orders_data = orders_data.withColumn(
"purchase_address_split",
F.split(F.col('purchase_address'), ',')
)
orders_data = orders_data.withColumn(
"purchase_address_split",
F.split(
F.col('purchase_address_split').getItem(
F.size('purchase_address_split') - 1
),
' ')
).withColumn(
"purchase_state",
F.col('purchase_address_split').getItem(1)
)orders_data.toPandas().head(1000)distinct_states = orders_data.select('purchase_state').distinct()
n_states = distinct_states.count()
print('n_states: ', n_states)
print(distinct_states.toPandas())
### Write to file
orders_data.write.mode("overwrite").parquet("orders_data_clean.parquet")