Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database with the following features:
grocery_sales
grocery_sales"index"- unique ID of the row"Store_ID"- the store number"Date"- the week of sales"Weekly_Sales"- sales for the given store
Also, you have the extra_data.parquet file that contains complementary data:
extra_data.parquet
extra_data.parquet"IsHoliday"- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"- Temperature on the day of sale"Fuel_Price"- Cost of fuel in the region"CPI"– Prevailing consumer price index"Unemployment"- The prevailing unemployment rate"MarkDown1","MarkDown2","MarkDown3","MarkDown4"- number of promotional markdowns"Dept"- Department Number in each store"Size"- size of the store"Type"- type of the store (depends onSizecolumn)
You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data variable containing the following columns:
"Store_ID""Month""Dept""IsHoliday""Weekly_Sales""CPI"- "
"Unemployment""
After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data variable that should look like:
| Month | Weekly_Sales |
|---|---|
| 1.0 | 33174.178494 |
| 2.0 | 34333.326579 |
| ... | ... |
Finally, you should save the clean_data and agg_data as the csv files.
It is recommended to use pandas for this project.
-- Write your SQL query here
SELECT *
FROM grocery_salesimport pandas as pd
import os
# Start here...
#Part1
grocery_sales = pd.DataFrame(grocery_sales) # Define grocery_sales as a DataFrame
#print(grocery_sales.head())
temp = pd.read_parquet("extra_data.parquet")
#print(temp.head())
def extract(df1, df2):
merged_df = df1.merge(df2, on = 'index', how = 'outer')
return merged_df
# Corrected the print statement to access the merged_df returned from the function
merged_df = extract(grocery_sales, temp)
print(merged_df.head())
#Part2
def transform(df): # Removed the default argument value for df
clean_data = pd.DataFrame()
df = df.drop(['index','Temperature','Fuel_Price','MarkDown1',
'MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5', 'Type', 'Size'], axis=1)
df = df.loc[df['Weekly_Sales'] > 10000, :]
for col in df.columns:
df[col].fillna(df[col].mean(), inplace = True)
clean_data[col] = df[col]
if col == 'Date':
df['Date'] = pd.to_datetime(df['Date'], format="%Y-%m-%d")
df['Month'] = df['Date'].dt.month
clean_data = df.copy()
clean_data = clean_data.drop('Date', axis = 1)
return clean_data
clean_data = transform(merged_df)
print(clean_data.head())
#print(clean_data[['Weekly_Sales']].head())
print(clean_data.columns)
#Part3
def avg_monthly_sales(df):
agg_data = pd.DataFrame()
df_new = pd.DataFrame()
for col in df.columns:
if col == 'Month' or col == 'Weekly_Sales': # Changed 'and' to 'or' for correct comparison
df_new = df[['Month', 'Weekly_Sales']]
agg_data = df_new.groupby('Month')['Weekly_Sales'].agg(Avg_Sales = 'mean').reset_index().round(2)
return agg_data
agg_data = avg_monthly_sales(clean_data)
print(agg_data.head())
#part4
def load(df1, file_path_1, df2, file_path_2):
df1 = clean_data
file_path_1 = "clean_data.csv"
df1.to_csv(file_path_1, index = False)
df2 = agg_data
file_path_2 = "agg_data.csv"
df2.to_csv(file_path_2, index = False)
load(clean_data, "clean_data.csv", agg_data, "agg_data.csv")
#Part5
def validation(file_path):
try:
if os.path.exists(file_path): # Corrected the method to check file existence
print("File exists")
else:
print("File does not exist")
except:
print("Error occurred while checking file existence")
validation("clean_data.csv")