Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, I have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. I will be working with two data sources: grocery sales and complementary data. I have been provided with the grocery_sales table in PostgreSQL database with the following features:
grocery_sales
grocery_sales"index"- unique ID of the row"Store_ID"- the store number"Date"- the week of sales"Weekly_Sales"- sales for the given store
Also, I have the extra_data.parquet file that contains complementary data:
extra_data.parquet
extra_data.parquet"IsHoliday"- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"- Temperature on the day of sale"Fuel_Price"- Cost of fuel in the region"CPI"– Prevailing consumer price index"Unemployment"- The prevailing unemployment rate"MarkDown1","MarkDown2","MarkDown3","MarkDown4"- number of promotional markdowns"Dept"- Department Number in each store"Size"- size of the store"Type"- type of the store (depends onSizecolumn)
I merged those files and performed some data manipulations. The transformed DataFrame can then be stored as the clean_data variable containing the following columns:
"Store_ID""Month""Dept""IsHoliday""Weekly_Sales""CPI"- "
"Unemployment""
After merging and cleaning the data, I analyzed monthly sales of Walmart and store the results of your analysis as the agg_data variable that should look like:
| Month | Weekly_Sales |
|---|---|
| 1.0 | 33174.178494 |
| 2.0 | 34333.326579 |
| ... | ... |
Finally, I save the clean_data and agg_data as the csv files.
SELECT * FROM grocery_salesimport pandas as pd
import os
# Create the extract() function with two parameters: one for the store data and the other one for the extra data
# Read the extra data from the parquet file and merge the DataFrames using "index" column
def extract(store_data, extra_data):
extra_df = pd.read_parquet(extra_data)
merged_df = store_data.merge(extra_df, on = "index")
return merged_df
# Call the extract() function and store it as the "merged_df" variable
merged_df = extract(grocery_sales, "extra_data.parquet")
# Create the transform() function with one parameter: "raw_data"
def transform(raw_data):
# Fill NaNs using mean since we are dealing with numeric columns
# Set inplace = True to do the replacing on the current DataFrame
raw_data.fillna(
{
'CPI': raw_data['CPI'].mean(),
'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
'Unemployment': raw_data['Unemployment'].mean(),
}, inplace = True
)
# Define the type of the "Date" column and its format
raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d")
# Extract the month value from the "Date" column to calculate monthly sales later on
raw_data["Month"] = raw_data["Date"].dt.month
# Filter the entire DataFrame using the "Weekly_Sales" column. Use .loc to access a group of rows
raw_data = raw_data.loc[raw_data["Weekly_Sales"] > 10000, :]
# Drop unnecessary columns. Set axis = 1 to specify that the columns should be removed
raw_data = raw_data.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis = 1)
return raw_data
# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)
# Create the avg_monthly_sales function that takes in the cleaned data from the last step
def avg_monthly_sales(clean_data):
# Select the "Month" and "Weekly_Sales" columns as they are the only ones needed for this analysis
holidays_sales = clean_data[["Month", "Weekly_Sales"]]
# Create a chain operation with groupby(), agg(), reset_index(), and round() functions
# Group by the "Month" column and calculate the average monthly sales
# Call reset_index() to start a new index order
# Round the results to two decimal places
holidays_sales = (holidays_sales.groupby("Month")
.agg(Avg_Sales = ("Weekly_Sales", "mean"))
.reset_index().round(2))
return holidays_sales
# Call the avg_monthly_sales() function and pass the cleaned DataFrame
agg_data = avg_monthly_sales(clean_data)
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
# Save both DataFrames as csv files. Set index = False to drop the index columns
full_data.to_csv(full_data_file_path, index = False)
agg_data.to_csv(agg_data_file_path, index = False)
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths
load(clean_data, "clean_data.csv", agg_data, "agg_data.csv")
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
# Use the "os" package to check whether a path exists
file_exists = os.path.exists(file_path)
# Raise an exception if the path doesn't exist, hence, if there is no file found on a given path
if not file_exists:
raise Exception(f"There is no file at the path {file_path}")
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation("clean_data.csv")
validation("agg_data.csv")
print(clean_data)
print(agg_data)