Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database with the following features:
grocery_sales
grocery_sales"index"- unique ID of the row"Store_ID"- the store number"Date"- the week of sales"Weekly_Sales"- sales for the given store
Also, you have the extra_data.parquet file that contains complementary data:
extra_data.parquet
extra_data.parquet"IsHoliday"- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"- Temperature on the day of sale"Fuel_Price"- Cost of fuel in the region"CPI"– Prevailing consumer price index"Unemployment"- The prevailing unemployment rate"MarkDown1","MarkDown2","MarkDown3","MarkDown4"- number of promotional markdowns"Dept"- Department Number in each store"Size"- size of the store"Type"- type of the store (depends onSizecolumn)
You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data variable containing the following columns:
"Store_ID""Month""Dept""IsHoliday""Weekly_Sales""CPI"- "
"Unemployment""
After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data variable that should look like:
| Month | Weekly_Sales |
|---|---|
| 1.0 | 33174.178494 |
| 2.0 | 34333.326579 |
| ... | ... |
Finally, you should save the clean_data and agg_data as the csv files.
It is recommended to use pandas for this project.
-- Write your SQL query here
SELECT * FROM grocery_salesimport pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
# Extract function is already implemented for you
def extract(store_data, extra_data):
extra_df = pd.read_parquet(extra_data)
merged_df = store_data.merge(extra_df, on = "index")
return merged_df
# Call the extract() function and store it as the "merged_df" variable
merged_df = extract(grocery_sales, "extra_data.parquet")# Create the transform() function with one parameter: "raw_data"
def transform(raw_data):
# Extract the Month from the Date
raw_data['Month'] = pd.to_datetime(raw_data['Date']).dt.month
# Select only the required columns
clean_data = raw_data[['Store_ID', 'Month', 'Dept', 'IsHoliday', 'Weekly_Sales', 'CPI', 'Unemployment']]
# Drop rows with missing values (if necessary)
clean_data = clean_data.dropna()
return clean_data# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
# Write your code here
pass# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
def avg_weekly_sales_per_month(clean_data):
agg_data = clean_data.groupby('Month', as_index=False)['Weekly_Sales'].mean()
agg_data.rename(columns={'Weekly_Sales': 'Avg_Weekly_Sales'}, inplace=True)
return agg_data# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
full_data.to_csv(full_data_file_path, index=False)
agg_data.to_csv(agg_data_file_path, index=False)
pass# Assuming clean_data and agg_data are the cleaned and aggregated DataFrames respectively
# and clean_data_file_path and agg_data_file_path are their corresponding file paths
# Define the DataFrames and file paths
clean_data = ... # Add the code to define or load the clean_data DataFrame
agg_data = ... # Add the code to define or load the agg_data DataFrame
clean_data_file_path = 'path/to/clean_data_file.csv'
agg_data_file_path = 'path/to/agg_data_file.csv'
# Call the load() function and pass the cleaned and aggregated DataFrames with their # Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
# Check if the file exists
if os.path.exists(file_path):
print(f"File {file_path} exists and was saved correctly.")
else:
print(f"File {file_path} does not exist. Check the load function.")
pass# Visualization function
def create_graphs(clean_data, agg_data):
# Set the style
sns.set(style="whitegrid")
# Plot monthly average sales
plt.figure(figsize=(10, 6))
sns.barplot(x='Month', y='Avg_Weekly_Sales', data=agg_data, palette="viridis")
plt.title('Average Weekly Sales per Month')
plt.xlabel('Month')
plt.ylabel('Average Weekly Sales')
plt.xticks(range(12), ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
plt.savefig("monthly_sales.png")
plt.show()
# Plot sales distribution per store
plt.figure(figsize=(12, 6))
sns.boxplot(x='Store_ID', y='Weekly_Sales', data=clean_data, palette="coolwarm")
plt.title('Sales Distribution per Store')
plt.xlabel('Store ID')
plt.ylabel('Weekly Sales')
plt.xticks(rotation=45)
plt.savefig("sales_distribution.png")
plt.show()
# Implementation pipeline
# Assuming `grocery_sales` is a DataFrame loaded from the PostgreSQL database
# Replace 'extra_data.parquet' with the correct path to your file
# Extract
merged_df = extract(grocery_sales, "extra_data.parquet")
# Transform
clean_data = transform(merged_df)
# Aggregate
agg_data = avg_weekly_sales_per_month(clean_data)
# File paths for saving
clean_data_file_path = "clean_data.csv"
agg_data_file_path = "agg_data.csv"
# Load
load(clean_data, clean_data_file_path, agg_data, agg_data_file_path)
# Validate
validation(clean_data_file_path)
validation(agg_data_file_path)
# Create graphs
create_graphs(clean_data, agg_data)