Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, you have been tasked with creating a data pipeline for the analysis of demand and supply around the holidays and running preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database and extra_data.parquet file that contains complementary data.
Here is information about all the available columns in the two data files:
"index"- unique ID of the row"Store_ID"- the store number"Date"- the week of sales"Weekly_Sales"- sales for the given store"IsHoliday"- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"- Temperature on the day of sale"Fuel_Price"- Cost of fuel in the region"CPI"– Prevailing consumer price index"Unemployment"- The prevailing unemployment rate"MarkDown1","MarkDown2","MarkDown3","MarkDown4"- number of promotional markdowns"Dept"- Department Number in each store"Size"- size of the store"Type"- type of the store (depends onSizecolumn)
You will need to merge those files for further data manipulations and store the merged file in the clean_data.csv file that should contain the following columns:
"Store_ID""Month""Dept""IsHoliday""Weekly_Sales""CPI"- "
"Unemployment""
After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis in the agg_date.csv file that should look like:
| Month | Weekly_Sales |
|---|---|
| 1.0 | 33174.178494 |
| 2.0 | 34333.326579 |
| ... | ... |
It is recommended to use pandas for this project.
SELECT * FROM grocery_sales# Import required packages
import pandas as pd
import numpy as np
import logging
import os
# Start coding here...def extract(sql_table, parquet_file_path):
# SQL table is in DF format already
# Convert partquet to DF
parquet_df = pd.read_parquet(parquet_file_path)
# Merge DataFrames
extracted_df = sql_table.merge(parquet_df, on='index')
return extracted_dfdef transform(input_df):
# Fill missing values with mean of that column
input_df.fillna(
{
'CPI': input_df['CPI'].mean(),
'Weekly_Sales': input_df['Weekly_Sales'].mean(),
'Unemployment': input_df['Unemployment'].mean()
},
inplace=True
)
# Create a "Month" column
input_df["Date"] = pd.to_datetime(input_df["Date"], format = "%Y-%m-%d")
input_df['Month'] = input_df['Date'].dt.month
# Keep rows with sales over $10,000
input_df = input_df.loc[input_df['Weekly_Sales'] > 10000, :]
# Drop index and columns
transformed_df = input_df.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis=1)
return transformed_dfdef avg_monthly_sales(cleaned_df):
# Split of weekly sales and month
sales_df = cleaned_df[["Month","Weekly_Sales"]]
# Aggreate for weekly sales
monthly_sales_df = (sales_df.groupby("Month").agg(Avg_Sales = ("Weekly_Sales", "mean")).reset_index().round(2))
return monthly_sales_dfdef load(cleaned_df, cleaned_path, aggregated_df, aggregated_path):
# Saved cleaned df
cleaned_df.to_csv(cleaned_path, index=False)
# Saved aggregated df
aggregated_df.to_csv(aggregated_path, index=False)def validation(file_path):
# Use the "os" package to check whether a path exists
file_exists = os.path.exists(file_path)
# Raise an exception if the path doesn't exist, hence, if there is no file found on a given path
if not file_exists:
raise Exception(f"There is no file at the path {file_path}")merged_df = extract(store_df, "extra_data.parquet")
clean_data = transform(merged_df)
agg_data = avg_monthly_sales(clean_data)
load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')
validation('clean_data.csv')
validation('agg_data.csv')