Skip to content

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database with the following features:

grocery_sales

  • "index" - unique ID of the row
  • "Store_ID" - the store number
  • "Date" - the week of sales
  • "Weekly_Sales" - sales for the given store

Also, you have the extra_data.parquet file that contains complementary data:

extra_data.parquet

  • "IsHoliday" - Whether the week contains a public holiday - 1 if yes, 0 if no.
  • "Temperature" - Temperature on the day of sale
  • "Fuel_Price" - Cost of fuel in the region
  • "CPI" – Prevailing consumer price index
  • "Unemployment" - The prevailing unemployment rate
  • "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4" - number of promotional markdowns
  • "Dept" - Department Number in each store
  • "Size" - size of the store
  • "Type" - type of the store (depends on Size column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data variable containing the following columns:

  • "Store_ID"
  • "Month"
  • "Dept"
  • "IsHoliday"
  • "Weekly_Sales"
  • "CPI"
  • ""Unemployment""

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data variable that should look like:

MonthWeekly_Sales
1.033174.178494
2.034333.326579
......

Finally, you should save the clean_data and agg_data as the csv files.

It is recommended to use pandas for this project.

Spinner
DataFrameas
grocery_sales
variable
-- Write your SQL query here
SELECT * FROM grocery_sales
import pandas as pd
import os

# Extract function is already implemented for you 
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

# Call the extract() function and store it as the "merged_df" variable
merged_df = extract(grocery_sales, "extra_data.parquet")
def transform(raw_data):
    """
    Transform the merged DataFrame by:
    - Filling missing numerical values with the median of each numerical column.
    - Keeping only rows where 'Weekly_Sales is over $10,000'.
    - Dropping unnecessary columns, keeping only:
         "Store_ID", "Month", "Dept", "IsHoliday", "Weekly_Sales", "CPI", "Unemployment"
    
    :param raw_data: DataFrmae resulting from the extract() step (merged_df)
    :return: A cleaned pandas DataFrame stored as clean_data
    """
    # Drop rows missing the 'Date' column since its needed to extract the month
    raw_data = raw_data.dropna(subset=["Date"])
    
    # Convert the 'Date' column to datetime
    raw_data["Date"] = pd.to_datetime(raw_data["Date"])
    
    # Fill missing numerical values using the median for each numeric column
    num_cols = raw_data.select_dtypes(include="number").columns
    raw_data[num_cols] = raw_data[num_cols].fillna(raw_data[num_cols].median())
    
    # Create a "Month" column by extracting the month from the "Date" column
    raw_data["Month"] = raw_data["Date"].dt.month.astype(float)
    
    # Filter rows where "Weekly_Sales" is greater than $10,000
    filtered_data  = raw_data[raw_data["Weekly_Sales"] > 10000]
    
    # Keep only the columns needed for analysis
    clean_data = filtered_data[["Store_ID", "Month", "Dept", "IsHoliday", "Weekly_Sales", "CPI", "Unemployment"]]
    
    return clean_data
# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)
def avg_weekly_sales_per_month(clean_data):
    """
    Calculate the average weekly sales per month.
    
    This function selects the "Month" and "Weekly_Sales" columns from the cleaned data,
    groups the data by "Month", calculates the average "Weekly_Sales" per month, 
    resets the index to start a new index order and rounds the results to two decimal places.
    
    :param clean_data: A pandas DataFrame containing cleaned e-commerce data.
    :return: A DataFrame with "Month" and average "Weekly_Sales" per month.
    """
    # Rename "Weekly_Sales" to "Avg_Sales"
    clean_data = clean_data.rename(columns={"Weekly_Sales": "Avg_Sales"})
    
    # Aggregate Data: group by "Month" and calculate the mean of "Avg_Sales"
    agg_data = (
        clean_data[["Month", "Avg_Sales"]]
        .groupby("Month", as_index=False)
        .agg({"Avg_Sales": "mean"})
        .round(2)
    )
    return agg_data
# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
agg_data = avg_weekly_sales_per_month(clean_data)
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    """
    Save the cleaned and aggregated DataFrames as CSV files without an index.
    
    :param full_data: DataFrame containing the cleaned e-commerce data.
    :param full_data_file_path: File path to save the cleaned DataFrame (e.g., "clean_data.csv").
    :param agg_data: DataFrame containing the aggregated monthly sales data.
    :param agg_data_file_path: File path to save the aggregated DataFrame (e.g., "agg_data.csv").
    :return: None
    """
    full_data.to_csv(full_data_file_path, index=False)
    agg_data.to_csv(agg_data_file_path, index=False)
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths    
load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')
import os


def validation(file_path):
    """
    Validate whether the file specified by file_path exists in the current working directory.
    
    :param file_path: The file path to the CSV file to validate.
    :return: True if the file exists, False otherwise.
    """
    exists = os.path.exists(file_path)
    if exists:
        print(f"Validation passed: {file_path} exists.")
    else:
        print(f"Validation failed: {file_path} does not exist.")
    return exists
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
print(validation('clean_data.csv'))
print(validation('agg_data.csv'))