Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales
table in PostgreSQL
database with the following features:
grocery_sales
grocery_sales
"index"
- unique ID of the row"Store_ID"
- the store number"Date"
- the week of sales"Weekly_Sales"
- sales for the given store
Also, you have the extra_data.parquet
file that contains complementary data:
extra_data.parquet
extra_data.parquet
"IsHoliday"
- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"
- Temperature on the day of sale"Fuel_Price"
- Cost of fuel in the region"CPI"
– Prevailing consumer price index"Unemployment"
- The prevailing unemployment rate"MarkDown1"
,"MarkDown2"
,"MarkDown3"
,"MarkDown4"
- number of promotional markdowns"Dept"
- Department Number in each store"Size"
- size of the store"Type"
- type of the store (depends onSize
column)
You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data
variable containing the following columns:
"Store_ID"
"Month"
"Dept"
"IsHoliday"
"Weekly_Sales"
"CPI"
- "
"Unemployment"
"
After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data
variable that should look like:
Month | Weekly_Sales |
---|---|
1.0 | 33174.178494 |
2.0 | 34333.326579 |
... | ... |
Finally, you should save the clean_data
and agg_data
as the csv files.
It is recommended to use pandas
for this project.
-- Write your SQL query here
SELECT * FROM grocery_sales
Create Merged Dataframe
import pandas as pd
def extract (df1, pqt_file):
pqt = pd.read_parquet(pqt_file)
merged_df = pd.merge(df1, pqt, on='index', how='inner')
return merged_df
file = "extra_data.parquet"
extract(grocery_sales, file)
print (merged_df)
Transform
def transform(df):
# Fill null values
df.fillna(
{
'Weekly_Sales': df['Weekly_Sales'].mean(),
'MarkDown4': df['MarkDown4'].mean(),
'MarkDown5': df['MarkDown5'].mean(),
'CPI': df['CPI'].mean(),
'Unemployment': df['Unemployment'].mean(),
# Removed 'Type' as it's likely a categorical column
'Size': df['Size'].mean()
}, inplace=True
)
# Extract month from 'Date' column and create a new 'Month' column
df['Month'] = df['Date'].dt.month
# Filter sales over 10k
sales_over_10k = df[df['Weekly_Sales'] > 10000]
# List of columns to keep
columns_to_keep = ['Store_ID', 'Month', 'Dept', 'IsHoliday', 'Weekly_Sales', 'CPI', 'Unemployment']
clean_data = sales_over_10k[columns_to_keep]
return clean_data
# Assuming merged_df is defined
cleaned_df = transform(merged_df)
print(cleaned_df)
Aggregate Data
def avg_monthly_sales(clean_data):
# Group by 'Month' and calculate the average 'Weekly_Sales'
aggregated_data = clean_data.groupby('Month')['Weekly_Sales'].mean().reset_index()
# Rename the columns
aggregated_data.columns = ['Month', 'Avg_Sales']
# Round the 'Avg_Sales' to 2 decimals
aggregated_data['Avg_Sales'] = aggregated_data['Avg_Sales'].round(2)
return aggregated_data
# Example usage
# Assuming cleaned_df is defined from the transform function
agg_data = avg_monthly_sales(cleaned_df)
print(agg_data)
Load Data
def load(cleaned_df, aggregated_df, clean_data_path, agg_data_path):
# Save the cleaned DataFrame to a CSV file without the index
cleaned_df.to_csv(clean_data_path, index=False)
# Save the aggregated DataFrame to a CSV file without the index
aggregated_df.to_csv(agg_data_path, index=False)
# Example usage
# Assuming cleaned_df and aggregated_df are defined
load(cleaned_df, agg_data, 'clean_data.csv', 'agg_data.csv')
Validation
import os
def validation(file_path):
# Check if the file exists
file_exists = os.path.isfile(file_path)
# Return True if the file exists, else False
return file_exists
# Example usage
# Assuming 'clean_data.csv' and 'agg_data.csv' are the paths used in the load function
clean_data_exists = validation('clean_data.csv')
agg_data_exists = validation('agg_data.csv')
print(f"clean_data.csv exists: {clean_data_exists}")
print(f"agg_data.csv exists: {agg_data_exists}")