Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.
In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales
table in PostgreSQL
database with the following features:
grocery_sales
grocery_sales
"index"
- unique ID of the row"Store_ID"
- the store number"Date"
- the week of sales"Weekly_Sales"
- sales for the given store
Also, you have the extra_data.parquet
file that contains complementary data:
extra_data.parquet
extra_data.parquet
"IsHoliday"
- Whether the week contains a public holiday - 1 if yes, 0 if no."Temperature"
- Temperature on the day of sale"Fuel_Price"
- Cost of fuel in the region"CPI"
– Prevailing consumer price index"Unemployment"
- The prevailing unemployment rate"MarkDown1"
,"MarkDown2"
,"MarkDown3"
,"MarkDown4"
- number of promotional markdowns"Dept"
- Department Number in each store"Size"
- size of the store"Type"
- type of the store (depends onSize
column)
You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the clean_data
variable containing the following columns:
"Store_ID"
"Month"
"Dept"
"IsHoliday"
"Weekly_Sales"
"CPI"
- "
"Unemployment"
"
After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the agg_data
variable that should look like:
Month | Weekly_Sales |
---|---|
1.0 | 33174.178494 |
2.0 | 34333.326579 |
... | ... |
Finally, you should save the clean_data
and agg_data
as the csv files.
It is recommended to use pandas
for this project.
-- 1. Zapytanie zbierające wszystkie dane z tabeli grocery_sales
SELECT *
FROM grocery_sales;
import pandas as pd
import os
import matplotlib.pyplot as plt
# 2. Funkcja do wyodrębnienia danych -> extract() z dwoma parametrami: jednym dla danych sklepu i drugim dla danych dodatkowych
# Odczytanie dodatkowych danych z pliku .parquet i połączenie DataFrames za pomocą kolumny "index".
def extract(store_data, extra_data):
extra_df = pd.read_parquet(extra_data)
merged_df = store_data.merge(extra_df, on = "index")
return merged_df
# Wywołanie funkcji extract() i zapisanie jej jako zmiennej "merged_df".
merged_df = extract(grocery_sales, "extra_data.parquet")
# 3. Utworzenie funkcji transform(), która będzie pobierać surowe dane i przetwarzać (uzupełnienie brakujących danych, nadanie odpowiedniego formatu dla daty, wyodrębnienie tygodniowej sprzedaży powyżej 10 tys. USD) na dane oczyszczone
def transform(raw_data):
# Zamiana brakujących wartości (NaNs) i zastąpienie ich wartościami średnimi ze wszystkich rekordów danej kolumny
raw_data.fillna(
{
'CPI': raw_data['CPI'].mean(),
'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
'Unemployment': raw_data['Unemployment'].mean(),
}, inplace = True
)
# Zdefiniowanie typu kolumny "Data" i jej formatu.
raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d")
# Wyodrębnienie wartości miesiąca z kolumny "Data", aby później obliczyć miesięczną sprzedaż
raw_data["Month"] = raw_data["Date"].dt.month
# Filtrowanie całego zbioru danych przy użyciu kolumny "Weekly_Sales". Aby uzyskać dostęp do grupy wierszy użyłem funkcji .loc
raw_data = raw_data.loc[raw_data["Weekly_Sales"] > 10000, :]
# Usunięcie niepotrzebnych kolumn. Ustawiamy axis = 1, aby określić, że kolumny powinny zostać usunięte.
raw_data = raw_data.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis = 1)
return raw_data
# Wywołanie funkcji transform()
clean_data = transform(merged_df)
# 4. Utworzenie funkcji avg_monthly_sales, która pobiera oczyszczone dane z ostatniego kroku
def avg_monthly_sales(clean_data):
# Wybór kolumny "Month" i "Weekly_Sales", ponieważ tylko one są potrzebne do tej analizy.
holidays_sales = clean_data[["Month", "Weekly_Sales"]]
# Tworzenie operacji łańcuchowej z funkcjami groupby(), agg(), reset_index() i round().
# Grupowanie według kolumny "Month" i obliczanie średniej miesięcznej sprzedaży
# Wywołanie funkcji reset_index(), aby rozpocząć nowe porządkowanie indeksu.
# Zaokrąglenie wyników do liczb całkowitych
holidays_sales = (holidays_sales.groupby("Month")
.agg(Avg_Sales = ("Weekly_Sales", "mean"))
.reset_index().round(0))
return holidays_sales
# Wywołanie funkcji avg_monthly_sales() i przekazanie wyczyszczonego zbioru danych do dalszych czynności
agg_data = avg_monthly_sales(clean_data)
# Wizualizacja wyników średniej tygodniowej sprzedaży w poszczególnych miesiącach
# Reprezentacja osi x oraz y na wykresie
x_axis = agg_data["Month"]
y_axis = agg_data["Avg_Sales"]
# Tworzenie wykresu słupkowego (Wskazanie osi, wycentrowanie kolumn, zmiana koloru kolumn na niebieski)
plt.bar(x_axis, y_axis, align='center', color=['tab:blue'])
# Podpisanie osi
plt.xlabel("Miesiące\n")
plt.ylabel("Średnia tygodniowa sprzedaż (USD$)\n")
# Tytuł wykresu
plt.title("Średnia tygodniowa sprzedaż w sklepach Walmart w poszczególnych miesiącach\n")
# Wartości dla osi x
plt.xticks([1,2,3,4,5,6,7,8,9,10,11,12])
# Pokaż wykres
plt.show()
# 5. Utworzenierz funkcji load(), która pobiera oczyszczony zbiór danych i zagregowany zbiór danych ze ścieżkami, w których będą one przechowywane
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
# Zapisz oba zbiory danych jako pliki csv. Ustaw index = False, aby usunąć kolumnę indeksu
full_data.to_csv(full_data_file_path, index = False)
agg_data.to_csv(agg_data_file_path, index = False)
# Wywołanie funkcji load() i przekazanie oczyszczonych i zagregowanych zbiorów danych wraz z ich ścieżkami
load(clean_data, "clean_data.csv", agg_data, "agg_data.csv")
# 6. Utworzenie funkcji validation() w celu sprawdzenia czy poprzednia funkcja została wykonana poprawnie
def validation(file_path):
# Użyj pakietu "os", aby sprawdzić, czy ścieżka istnieje
file_exists = os.path.exists(file_path)
# Zgłoszenie wyjątku, jeśli ścieżka nie istnieje, a więc jeśli nie znaleziono pliku na danej ścieżce.
if not file_exists:
raise Exception(f"There is no file at the path {file_path}")
# Wywołanie funkcji validation() i przekazanie najpierw oczyszczonego, a potem zagregowanego zbioru danych
validation("clean_data.csv")
validation("agg_data.csv")