Skip to content
Project
  • AI Chat
  • Code
  • Report
  • Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.

    In this project, you have been tasked with creating a data pipeline for the analysis of demand and supply around the holidays and running preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database and extra_data.parquet file that contains complementary data.

    Here is information about all the available columns in the two data files:

    • "index" - unique ID of the row
    • "Store_ID" - the store number
    • "Date" - the week of sales
    • "Weekly_Sales" - sales for the given store
    • "IsHoliday" - Whether the week contains a public holiday - 1 if yes, 0 if no.
    • "Temperature" - Temperature on the day of sale
    • "Fuel_Price" - Cost of fuel in the region
    • "CPI" – Prevailing consumer price index
    • "Unemployment" - The prevailing unemployment rate
    • "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4" - number of promotional markdowns
    • "Dept" - Department Number in each store
    • "Size" - size of the store
    • "Type" - type of the store (depends on Size column)

    You will need to merge those files for further data manipulations and store the merged file in the clean_data.csv file that should contain the following columns:

    • "Store_ID"
    • "Month"
    • "Dept"
    • "IsHoliday"
    • "Weekly_Sales"
    • "CPI"
    • ""Unemployment""

    After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis in the agg_date.csv file that should look like:

    MonthWeekly_Sales
    1.033174.178494
    2.034333.326579
    ......

    It is recommended to use pandas for this project.

    Unknown integration
    DataFrameavailable as
    store_df
    variable
    -- Write your SQL query here
    SELECT * 
    FROM public.grocery_sales
    This query is taking long to finish...Consider adding a LIMIT clause or switching to Query mode to preview the result.
    # Import required packages
    import pandas as pd
    import numpy as np
    import logging
    import os
    
    # Start coding here...
    logging.basicConfig(filename='etl.log', encoding='utf-8', level=logging.DEBUG)
    
    def extract(store_df,extra_data_path):
        extra_df = pd.read_parquet(extra_data_path)
        logging.info('Reading extra_data.parquet and saving in extra_df completed')
        
        merged_df = store_df.merge(extra_df, on='index')
        logging.info('Merging store_df and extra_df completed')
        
        merged_df['Month'] = ""
        
        return merged_df
    
        
    def transform(df):   
        clean_data = df
        clean_data.fillna({
            'CPI' : clean_data['CPI'].mean(),
            'Weekly_Sales': clean_data['Weekly_Sales'].mean(),
            'Unemployment' : clean_data['Unemployment'].mean()
        },inplace=True)
        logging.info('Filling CPI and Unemployment columns with its respective average values completed')
        
        clean_data['Month'] = clean_data['Date'].dt.month
        logging.info('Extracting month from date column and create Month column completed')
        
        clean_data = clean_data[clean_data['Weekly_Sales'] > 10000]
        logging.info('Filtering Weekly_Sales Columns where are greater than 10000 completed')
        
        clean_data.drop(columns=['index'], inplace=True)
        logging.info('Drop index Column completed')
        
        clean_data = clean_data.loc[:,["Store_ID","Month","Dept","IsHoliday","Weekly_Sales","CPI","Unemployment"]]
        logging.info('Selecting columns to clean_data completed')
        
        return clean_data
        
    def avg_monthly_sales(df):
        agg_sales = df.groupby(by='Month', as_index=False).mean()[['Month','Weekly_Sales']].round(2)
        logging.info('Creating agg_data completed')
        agg_sales.rename(columns={'Weekly_Sales':'Avg_Sales'}, inplace=True)
        return agg_sales
        
    def load(c_df, c_path, agg_df, agg_path):
        c_df.to_csv(c_path, index=False)
        logging.info('Saving clean_data.csv compleated')
        agg_df.to_csv(agg_path, index=False)
        logging.info('Saving agg_data.csv completed')
        
    def validation(path):
        return os.path.exists(path)
        
    merged_df = extract(store_df, 'extra_data.parquet')
    #print(merged_df.info())
    clean_data = transform(merged_df)
    agg_data = avg_monthly_sales(clean_data)
    load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')
    validation('clean_data.csv')
    validation('agg_data.csv')