Skip to content

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database with the following features:

grocery_sales

  • "index" - unique ID of the row
  • "Store_ID" - the store number
  • "Date" - the week of sales
  • "Weekly_Sales" - sales for the given store

Also, you have the extra_data.parquet file that contains complementary data:

extra_data.parquet

  • "IsHoliday" - Whether the week contains a public holiday - 1 if yes, 0 if no.
  • "Temperature" - Temperature on the day of sale
  • "Fuel_Price" - Cost of fuel in the region
  • "CPI" – Prevailing consumer price index
  • "Unemployment" - The prevailing unemployment rate
  • "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4" - number of promotional markdowns
  • "Dept" - Department Number in each store
  • "Size" - size of the store
  • "Type" - type of the store (depends on Size column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data variable containing the following columns:

  • "Store_ID"
  • "Month"
  • "Dept"
  • "IsHoliday"
  • "Weekly_Sales"
  • "CPI"
  • ""Unemployment""

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data variable that should look like:

MonthWeekly_Sales
1.033174.178494
2.034333.326579
......

Finally, you should save the clean_data and agg_data as the csv files.

It is recommended to use pandas for this project.

Spinner
DataFrameas
grocery_sales
variable
-- Write your SQL query here
select *
from grocery_sales
import pandas as pd
import numpy as np
import logging
import os

def extract(
    grocery_sales_df: pd.DataFrame,
    extra_data_path: str
) -> pd.DataFrame:
    """
    Extracts and merges data from a grocery sales DataFrame with additional data
    from a parquet file.

    Parameters:
    - grocery_sales_df (pd.DataFrame): DataFrame containing grocery sales data.
    - extra_data_path (str): File path to the additional data in parquet format.

    Returns:
    pd.DataFrame: Merged DataFrame containing the combined information from
    grocery_sales_df and extra_data_df based on the 'index' column.
    """
    # load in file system data
    extra_data_df = pd.read_parquet(extra_data_path)

    # merge the two datasets
    merged_df = grocery_sales_df.merge(extra_data_df,on='index')
    
    return merged_df

def transform(
    merged_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Transforms the input DataFrame by performing the following operations:
    1. Converts the 'Date' column to datetime format with the specified format.
    2. Adds a new 'Month' column extracted from the 'Date' column.
    3. Selects specific columns to keep in the DataFrame.
    4. Fills missing values in 'Weekly_Sales', 'CPI', and 'Unemployment' columns
       with their respective means.
    5. Filters the DataFrame to include only rows where 'Weekly_Sales' is greater
       than 10000.

    Parameters:
    - merged_df (pd.DataFrame): DataFrame containing merged grocery sales data.

    Returns:
    pd.DataFrame: Transformed DataFrame with the specified operations applied.
    """
    merged_df.Date = pd.to_datetime(merged_df.Date, format = "%Y-%m-%d")
    
    merged_df["Month"] = merged_df.Date.dt.month
    
    cols_to_keep = [
        "Store_ID",
        "Month",
        "Dept",
        "IsHoliday",
        "Weekly_Sales",
        "CPI",
        "Unemployment",
    ]

    merged_df = merged_df[cols_to_keep]
    
    merged_df = merged_df.fillna(
        {
            "Weekly_Sales" : merged_df["Weekly_Sales"].mean(),
            "CPI" : merged_df["CPI"].mean(),
            "Unemployment" : merged_df["Unemployment"].mean(),
            
        }
    ).copy()
    
    merged_df = merged_df.loc[merged_df.Weekly_Sales > 10000]
    

    
    return merged_df


def avg_monthly_sales(
    clean_data: pd.DataFrame
) -> pd.DataFrame:
    """
    Calculates the average weekly sales for each month based on the input
    DataFrame containing cleaned grocery sales data.

    Parameters:
    - clean_data (pd.DataFrame): DataFrame containing cleaned grocery sales data.

    Returns:
    pd.DataFrame: DataFrame with the average weekly sales for each month,
    rounded to 2 decimals and labeled as 'Avg_Sales'. The result is aggregated
    and sorted by month.
    """
    agg_data = clean_data.groupby("Month")["Weekly_Sales"].mean().reset_index()
    # Round to 2 decimals and create a DataFrame
    agg_data.Weekly_Sales = agg_data.Weekly_Sales.round(2)
    
    agg_data = agg_data.rename(columns={"Weekly_Sales": "Avg_Sales"}).copy()
    
    return agg_data

def load(
    clean_data: pd.DataFrame,
    clean_data_path: str,
    agg_data: pd.DataFrame,
    agg_data_path: str,
) -> None:
    """
    Writes the cleaned grocery sales data and aggregated average monthly sales
    data to CSV files.

    Parameters:
    - clean_data (pd.DataFrame): DataFrame containing cleaned grocery sales data.
    - clean_data_path (str): File path to save the cleaned data in CSV format.
    - agg_data (pd.DataFrame): DataFrame containing aggregated average monthly
      sales data.
    - agg_data_path (str): File path to save the aggregated data in CSV format.

    Returns:
    None: The function writes the dataframes to the specified CSV files
    without returning any value.
    """
    clean_data.to_csv(clean_data_path,index=False)
    agg_data.to_csv(agg_data_path,index=False)
    
def validation(path: str):
    """
    Validates the existence of a file or directory at the specified path.

    Parameters:
    - path (str): The file path or directory path to be validated.

    Raises:
    Exception: Raises an exception if the specified path does not exist.

    Returns:
    None: The function does not return any value but raises an exception
    if the path does not exist.
    """
    if os.path.exists(path):
        pass
    else:
        raise Exception
    
merged_df = extract(
    grocery_sales_df=grocery_sales.copy(),
    extra_data_path="extra_data.parquet"
)
clean_data = transform(
    merged_df=merged_df.copy()
)
agg_data = avg_monthly_sales(
    clean_data=clean_data.copy()
)

load(
    clean_data=clean_data.copy(),
    clean_data_path="clean_data.csv",
    agg_data=agg_data.copy(),
    agg_data_path="agg_data.csv",
)

validation("clean_data.csv")
validation("agg_data.csv")