Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, you have been tasked with creating a data pipeline for the analysis of demand and supply around the holidays and running preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales
table in PostgreSQL
database and extra_data.parquet
file that contains complementary data.
Here is information about all the available columns in the two data files:
"index"
- unique ID of the row"Store_ID"
- the store number"Date"
- the week of sales"Weekly_Sales"
- sales for the given store"IsHoliday"
- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"
- Temperature on the day of sale"Fuel_Price"
- Cost of fuel in the region"CPI"
– Prevailing consumer price index"Unemployment"
- The prevailing unemployment rate"MarkDown1"
,"MarkDown2"
,"MarkDown3"
,"MarkDown4"
- number of promotional markdowns"Dept"
- Department Number in each store"Size"
- size of the store"Type"
- type of the store (depends onSize
column)
You will need to merge those files for further data manipulations and store the merged file in the clean_data.csv
file that should contain the following columns:
"Store_ID"
"Month"
"Dept"
"IsHoliday"
"Weekly_Sales"
"CPI"
- "
"Unemployment"
"
After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis in the agg_date.csv
file that should look like:
Month | Weekly_Sales |
---|---|
1.0 | 33174.178494 |
2.0 | 34333.326579 |
... | ... |
It is recommended to use pandas
for this project.
-- Write your SQL query here
SELECT *
FROM grocery_sales
# Import required packages
import pandas as pd
import numpy as np
import logging
import os
# Start coding here...
# Extract
# Create extract() function
# Read "extra_data.parquet" file
# Merge dataframe using "index" column
def extract(store_data, extra_data):
extra_df = pd.read_parquet(extra_data)
merged_df = store_data.merge(extra_df, on = "index")
return merged_df
merged_df = extract(store_df, "extra_data.parquet")
# Transform
# Create the transform() function
# Fill missing values using mean
def transform(raw_data):
raw_data.fillna(
{
'CPI': raw_data['CPI'].mean(),
'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
'Unemployment': raw_data['Unemployment'].mean(),
}, inplace = True
)
# Add column "Month"
raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d")
raw_data["Month"] = raw_data["Date"].dt.month
# Keep the rows where the sales are over $10,000
raw_data = raw_data.loc[raw_data["Weekly_Sales"] > 10000, :]
# Drop column following ex: "clean_data.csv" columns
raw_data = raw_data.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis = 1)
return raw_data
# Call the transform() function pass merged_df and store it as the clean_data variable
clean_data = transform(merged_df)
# Create the avg_monthly_sales function that takes in clean_data
def avg_monthly_sales(clean_data):
# Select the "Month" and "Weekly_Sales" columns
holidays_sales = clean_data[["Month", "Weekly_Sales"]]
# Create a chain operation with groupby(), agg(), reset_index(), and round() functions
# Group by the "Month" column and calculate the average monthly sales
# Call reset_index() to start a new index order
# Round the results to two decimal places
holidays_sales = (holidays_sales.groupby("Month").agg(Avg_Sales = ("Weekly_Sales", "mean")).reset_index().round(2))
return holidays_sales
# Call the avg_monthly_sales() function pass clean_data and store it as the agg_sales variable
agg_data = avg_monthly_sales(clean_data)
# Load
# Create Load() function
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
# Save both DataFrames as csv files. Set index = False to drop the index columns
full_data.to_csv(full_data_file_path, index = False)
agg_data.to_csv(agg_data_file_path, index = False)
# Call the load() function
load(clean_data, "clean_data.csv", agg_data, "agg_data.csv")
# Create the validation() function
def validation(file_path):
# Use path.exists() function from the os package to check whether the file path exists.
file_exists = os.path.exists(file_path)
# Create an ìf statement that will raise Exception in case file doesn't exist.
if not file_exists:
raise Exception(f"There is no file at the path {file_path}")
# Call the validation() function
validation("clean_data.csv")
validation("agg_data.csv")