Skip to content

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database with the following features:

grocery_sales

  • "index" - unique ID of the row
  • "Store_ID" - the store number
  • "Date" - the week of sales
  • "Weekly_Sales" - sales for the given store

Also, you have the extra_data.parquet file that contains complementary data:

extra_data.parquet

  • "IsHoliday" - Whether the week contains a public holiday - 1 if yes, 0 if no.
  • "Temperature" - Temperature on the day of sale
  • "Fuel_Price" - Cost of fuel in the region
  • "CPI" – Prevailing consumer price index
  • "Unemployment" - The prevailing unemployment rate
  • "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4" - number of promotional markdowns
  • "Dept" - Department Number in each store
  • "Size" - size of the store
  • "Type" - type of the store (depends on Size column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data variable containing the following columns:

  • "Store_ID"
  • "Month"
  • "Dept"
  • "IsHoliday"
  • "Weekly_Sales"
  • "CPI"
  • ""Unemployment""

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data variable that should look like:

MonthWeekly_Sales
1.033174.178494
2.034333.326579
......

Finally, you should save the clean_data and agg_data as the csv files.

It is recommended to use pandas for this project.

Spinner
DataFrameas
grocery_sales
variable
SELECT COUNT(*) FROM grocery_sales;

SELECT * from grocery_sales;
import pandas as pd
import os

# Extract Data
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

merged_df = extract(grocery_sales, "extra_data.parquet")
merged_df
def transform(merged_df):
    # Fill missing values with column means
    merged_df.fillna({
        'CPI': merged_df['CPI'].mean(),
        'Weekly_Sales': merged_df['Weekly_Sales'].mean(),
        'Unemployment': merged_df['Unemployment'].mean()
    }, inplace=True)

    # Convert "Date" column to datetime format, coercing errors to NaT (Not a Time)
    merged_df['Date'] = pd.to_datetime(merged_df['Date'], errors='coerce')

    # Extract month from the "Date" column
    merged_df['Month'] = merged_df['Date'].dt.month

    # Filter rows where "Weekly_Sales" is greater than 10,000
    merged_df = merged_df[merged_df['Weekly_Sales'] > 10000]

    # Select relevant columns
    selected_columns = ['Store_ID', 'Month', 'Dept', 'IsHoliday', 'Weekly_Sales', 'CPI', 'Unemployment']
    merged_df = merged_df[selected_columns]

    return merged_df
clean_data = transform(merged_df)
clean_data
def avg_weekly_sales_per_month(clean_data):
    clean_data_up = clean_data[[ 'Month' , 'Weekly_Sales' ]] 
    clean_data_up = clean_data.groupby([ 'Month' ])[ 'Weekly_Sales' ].mean().reset_index()    
    clean_data_up.rename(columns={ "Weekly_Sales" : "Avg_Sales" },inplace= True ) 
    clean_data_up[ 'Avg_Sales' ] = round ( clean_data_up[ 'Avg_Sales' ], 2 ) 
    clean_data_up = clean_data_up[[ 'Month' , 'Avg_Sales' ]] 
    return clean_data_up     
agg_data = avg_weekly_sales_per_month(clean_data) 
agg_data.head()
def  load ( clean_data, agg_data,clean_data_path= 'clean_data.csv' , agg_data_path= 'agg_data.csv' ): 
    clean_data.to_csv(clean_data_path, index= False ) 
    agg_data.to_csv(agg_data_path, index= False ) 
load(clean_data, agg_data)
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
    # Check if the file exists using os.path.exists()
    file_exists = os.path.exists(file_path) 
    
    # Return True if the file exists, otherwise False 
    if file_exists: 
        print ( f" {file_path} exists." ) 
        return  True 
    else : 
        print ( f" {file_path} does not exist." ) 
        return  False 
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation( 'clean_data.csv' ) 
validation( 'agg_data.csv' )