Skip to content

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.

In this project, you have been tasked with creating a data pipeline for the analysis of demand and supply around the holidays and running preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database and extra_data.parquet file that contains complementary data.

Here is information about all the available columns in the two data files:

  • "index" - unique ID of the row
  • "Store_ID" - the store number
  • "Date" - the week of sales
  • "Weekly_Sales" - sales for the given store
  • "IsHoliday" - Whether the week contains a public holiday - 1 if yes, 0 if no.
  • "Temperature" - Temperature on the day of sale
  • "Fuel_Price" - Cost of fuel in the region
  • "CPI" – Prevailing consumer price index
  • "Unemployment" - The prevailing unemployment rate
  • "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4" - number of promotional markdowns
  • "Dept" - Department Number in each store
  • "Size" - size of the store
  • "Type" - type of the store (depends on Size column)

You will need to merge those files for further data manipulations and store the merged file in the clean_data.csv file that should contain the following columns:

  • "Store_ID"
  • "Month"
  • "Dept"
  • "IsHoliday"
  • "Weekly_Sales"
  • "CPI"
  • ""Unemployment""

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis in the agg_date.csv file that should look like:

MonthWeekly_Sales
1.033174.178494
2.034333.326579
......

It is recommended to use pandas for this project.

Spinner
DataFrameas
store_df
variable
-- Write your SQL query here
SELECT *
FROM grocery_sales
# Import required packages
import pandas as pd
import numpy as np
import logging
import os

# Start coding here...

# Extract
# Create extract() function
# Read "extra_data.parquet" file
# Merge dataframe using "index" column
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

merged_df = extract(store_df, "extra_data.parquet")

# Transform
# Create the transform() function
# Fill missing values using mean
def transform(raw_data):
    raw_data.fillna(
      {
          'CPI': raw_data['CPI'].mean(),
          'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
          'Unemployment': raw_data['Unemployment'].mean(),
      }, inplace = True
    )
    # Add column "Month"
    raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d")
    raw_data["Month"] = raw_data["Date"].dt.month
    # Keep the rows where the sales are over $10,000
    raw_data = raw_data.loc[raw_data["Weekly_Sales"] > 10000, :]
    # Drop column following ex: "clean_data.csv" columns
    raw_data = raw_data.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis = 1)
    return raw_data

# Call the transform() function pass merged_df and store it as the clean_data variable
clean_data = transform(merged_df)

# Create the avg_monthly_sales function that takes in clean_data
def avg_monthly_sales(clean_data):
    # Select the "Month" and "Weekly_Sales" columns
    holidays_sales = clean_data[["Month", "Weekly_Sales"]]
    # Create a chain operation with groupby(), agg(), reset_index(), and round() functions
    # Group by the "Month" column and calculate the average monthly sales
    # Call reset_index() to start a new index order
    # Round the results to two decimal places
    holidays_sales = (holidays_sales.groupby("Month").agg(Avg_Sales = ("Weekly_Sales", "mean")).reset_index().round(2))
    return holidays_sales

# Call the avg_monthly_sales() function pass clean_data and store it as the agg_sales variable
agg_data = avg_monthly_sales(clean_data)

# Load
# Create Load() function
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    # Save both DataFrames as csv files. Set index = False to drop the index columns
    full_data.to_csv(full_data_file_path, index = False)
    agg_data.to_csv(agg_data_file_path, index = False)
    
# Call the load() function
load(clean_data, "clean_data.csv", agg_data, "agg_data.csv")

# Create the validation() function
def validation(file_path):
  	# Use path.exists() function from the os package to check whether the file path exists.
    file_exists = os.path.exists(file_path)
    # Create an ìf statement that will raise Exception in case file doesn't exist.
    if not file_exists:
        raise Exception(f"There is no file at the path {file_path}")

# Call the validation() function
validation("clean_data.csv")
validation("agg_data.csv")