It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed.
Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.
You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.
The Data
You are provided with a sales dataset to use. A summary and preview are provided below.
Online Retail.csv
| Column | Description |
|---|---|
'InvoiceNo' | A 6-digit number uniquely assigned to each transaction |
'StockCode' | A 5-digit number uniquely assigned to each distinct product |
'Description' | The product name |
'Quantity' | The quantity of each product (item) per transaction |
'UnitPrice' | Product price per unit |
'CustomerID' | A 5-digit number uniquely assigned to each customer |
'Country' | The name of the country where each customer resides |
'InvoiceDate' | The day and time when each transaction was generated "MM/DD/YYYY" |
'Year' | The year when each transaction was generated |
'Month' | The month when each transaction was generated |
'Week' | The week when each transaction was generated (1-52) |
'Day' | The day of the month when each transaction was generated (1-31) |
'DayOfWeek' | The day of the weeke when each transaction was generated ( 0 = Monday, 6 = Sunday) |
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year, to_date, to_timestamp, weekofyear, dayofweek, sum as spark_sum
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()
# Importing sales data
sales_data = my_spark.read.csv(
"Online Retail.csv", header=True, inferSchema=True, sep=",")
# Convert InvoiceDate to datetime
sales_data = sales_data.withColumn("InvoiceDate", to_date(
to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))# View of the first 5 rows of the data
sales_data.show(5)
# To view the schema (data types) of the DataFrame
sales_data.printSchema()# Aggregate the data
aggregated_data = sales_data.groupBy(
"Country",
"StockCode",
"InvoiceDate",
"Year",
"Month",
"Day",
"Week",
"DayOfWeek"
).agg(
F.sum("Quantity").alias("Quantity"),
F.avg("UnitPrice").alias("AvgUnitPrice")
)
# Show the aggregated data
aggregated_data.show(5)
# Check the count
print(f"Original data count: {sales_data.count()}")
print(f"Aggregated data count: {aggregated_data.count()}")# Split the data into training and test sets using a date cut
split_date = '2011-09-01'
train_data = aggregated_data.filter(col("InvoiceDate") <= split_date)
test_data = aggregated_data.filter(col("InvoiceDate") > split_date)
# Counting the rows for train and test set
print("Train data has", train_data.count(), "rows, corresponding to", round(train_data.count()/aggregated_data.count(), 2), "of the dataset")
print("Test data has", test_data.count(), "rows, corresponding to", round(test_data.count()/aggregated_data.count(), 2), "of the dataset")
# Convert Spark DataFrame to Pandas DataFrame and select columns
pd_daily_train_data = train_data.select("Country", "StockCode", "InvoiceDate", "Quantity").toPandas()
pd_daily_test_data = test_data.select("Country", "StockCode", "InvoiceDate", "Quantity").toPandas()# StringIndexers for categorical columns
stockcode_indexer = StringIndexer(inputCol="StockCode", outputCol="StockCode_index")
country_indexer = StringIndexer(inputCol="Country", outputCol="Country_index")
# Assemble features
assembler = VectorAssembler(
inputCols=["StockCode_index", "Country_index", "Quantity"],
outputCol="features"
)
# Pipeline for train data
pipeline = Pipeline(stages=[stockcode_indexer, country_indexer, assembler])
train_model = pipeline.fit(train_data)
train_data_with_features = train_model.transform(train_data)
# Pipeline for test data (use the same indexers as train)
test_data_with_features = train_model.transform(test_data)
# Show resulting DataFrames with features column
train_data_with_features.select("InvoiceDate", "StockCode", "Country", "Quantity", "features").show(5)
test_data_with_features.select("InvoiceDate", "StockCode", "Country", "Quantity", "features").show(5)# Building the regression model
# Create StringIndexers for categorical columns
country_indexer = StringIndexer(
inputCol="Country",
outputCol="CountryIndex",
handleInvalid="keep"
)
stockcode_indexer = StringIndexer(
inputCol="StockCode",
outputCol="StockCodeIndex",
handleInvalid="keep"
)
# Define feature columns (all numeric + indexed categorical)
feature_columns = [
"CountryIndex",
"StockCodeIndex",
"Year",
"Month",
"Day",
"Week",
"DayOfWeek",
"AvgUnitPrice"
]
# Create VectorAssembler to combine features
assembler = VectorAssembler(
inputCols=feature_columns,
outputCol="features",
handleInvalid="skip"
)
# Define RandomForestRegressor
rf = RandomForestRegressor(
featuresCol="features",
labelCol="Quantity",
predictionCol="prediction",
seed=42,
numTrees=20, # number of trees
maxDepth=5, # max depth of trees
maxBins=4000 # increased to handle categorical features
)
# Create a Pipeline
pipeline = Pipeline(stages=[
country_indexer,
stockcode_indexer,
assembler,
rf
])
# 3.6: Fit the model on training data
model = pipeline.fit(train_data)
# 3.7: Make predictions on test data
test_predictions = model.transform(test_data)
# Show some predictions
test_predictions.select(
"InvoiceDate", "StockCode", "Country", "Quantity", "prediction"
).show(5)
# Step 4: Evaluate the model using MAE
evaluator = RegressionEvaluator(
labelCol="Quantity",
predictionCol="prediction",
metricName="mae"
)
mae = float(evaluator.evaluate(test_predictions))
print(f"\nMean Absolute Error (MAE): {mae:.2f}")# Identify the quantity sold at specific week
# Filter predictions for week 39 of 2011
week_39_predictions = test_predictions.filter(
(col("Year") == 2011) & (col("Week") == 39)
)
# Show some of the week 39 predictions
print("Sample predictions for Week 39 of 2011:")
week_39_predictions.select(
"InvoiceDate", "Country", "StockCode", "Quantity", "prediction"
).show(10)
# Sum all predicted quantities for week 39 globally (across all countries and products)
quantity_sold_w39 = int(
week_39_predictions.agg(spark_sum("prediction")).collect()[0][0]
)
print(f"\nExpected units to be sold during week 39 of 2011: {quantity_sold_w39}")
# Compare with actual quantities
actual_quantity_w39 = int(
week_39_predictions.agg(spark_sum("Quantity")).collect()[0][0]
)
print(f"Actual units sold during week 39 of 2011: {actual_quantity_w39}")
print(f"Difference: {quantity_sold_w39 - actual_quantity_w39} units")
print(f"Accuracy: {(1 - abs(quantity_sold_w39 - actual_quantity_w39) / actual_quantity_w39) * 100:.2f}%")