Skip to content
Project: Building a Demand Forecasting Model
  • AI Chat
  • Code
  • Report
  • It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed.

    Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

    You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.

    The Data

    You are provided with a sales dataset to use. A summary and preview are provided below.

    Online Retail.csv

    ColumnDescription
    'InvoiceNo'A 6-digit number uniquely assigned to each transaction
    'StockCode'A 5-digit number uniquely assigned to each distinct product
    'Description'The product name
    'Quantity'The quantity of each product (item) per transaction
    'UnitPrice'Product price per unit
    'CustomerID'A 5-digit number uniquely assigned to each customer
    'Country'The name of the country where each customer resides
    'InvoiceDate'The day and time when each transaction was generated "MM/DD/YYYY"
    'Year'The year when each transaction was generated
    'Month'The month when each transaction was generated
    'Week'The week when each transaction was generated (1-52)
    'Day'The day of the month when each transaction was generated (1-31)
    'DayOfWeek'The day of the weeke when each transaction was generated
    (0 = Monday, 6 = Sunday)
    # Import required libraries
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml import Pipeline
    from pyspark.ml.regression import RandomForestRegressor
    from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
    from pyspark.ml.feature import StringIndexer
    from pyspark.ml.evaluation import RegressionEvaluator
    
    # Initialize Spark session
    my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()
    
    # Importing sales data
    sales_data = my_spark.read.csv(
        "Online Retail.csv", header=True, inferSchema=True, sep=",")
    
    # Convert InvoiceDate to datetime 
    sales_data = sales_data.withColumn("InvoiceDate", to_date(
        to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))
    # Aggregate data into daily intervals
    daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum", "UnitPrice": "avg"})
    # Rename the target column
    daily_sales_data = daily_sales_data.withColumnRenamed(
        "sum(Quantity)", "Quantity")
    
    # Split the data into two sets based on the spliting date, "2011-09-25". All data up to and including this date should be in the training set, while data after this date should be in the testing set. Return a pandas Dataframe, pd_daily_train_data, containing, at least, the columns ["Country", "StockCode", "InvoiceDate", "Quantity"].
    
    split_date_train_test = "2011-09-25"
    
    # Creating the train and test datasets
    train_data = daily_sales_data.filter(
        col("InvoiceDate") <= split_date_train_test)
    test_data = daily_sales_data.filter(col("InvoiceDate") > split_date_train_test)
    
    pd_daily_train_data = train_data.toPandas()
    
    # Creating indexer for categorical columns
    country_indexer = StringIndexer(
        inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
    stock_code_indexer = StringIndexer(
        inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")
    
    # Selectiong features columns
    feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year",
                    "DayOfWeek", "Day", "Week"]
    
    # Using vector assembler to combine features
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    
    # Initializing a Random Forest model
    rf = RandomForestRegressor(
        featuresCol="features",
        labelCol="Quantity",
        maxBins=4000
    )
    
    # Create a pipeline for staging the processes
    pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])
    
    # Training the model
    model = pipeline.fit(train_data)
    
    # Getting test predictions
    test_predictions = model.transform(test_data)
    test_predictions = test_predictions.withColumn(
        "prediction", col("prediction").cast("double"))
    
    # Provide the Mean Absolute Error (MAE) for your forecast? Return a double/floar "mae"
    
    # Initializing the evaluator
    mae_evaluator = RegressionEvaluator(
        labelCol="Quantity", predictionCol="prediction", metricName="mae")
    
    # Obtaining MAE
    mae = mae_evaluator.evaluate(test_predictions)
    
    # How many units will be sold during the  week 39 of 2011? Return an integer `quantity_sold_w39`.
    
    # Getting the weekly sales of all countries
    weekly_test_predictions = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})
    
    # Finding the quantity sold on the 39 week. 
    promotion_week = weekly_test_predictions.filter(col('Week')==39)
    
    # Storing prediction as quantity_sold_w30
    quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])
    
    # Stop the Spark session
    my_spark.stop()