Getting Started with Data Pipelines for ETL
Data pipelines are everywhere! More than ever, data practitioners find themselves needing to extract, transform, and load data to power the work they do. During this code-along, I'll walk through the basics of building a data pipeline using Python, pandas
, and sqlite
.
Throughout the tutorial, I'll be using the "Google Play Store Apps" dataset, available in DataCamp Workspaces. The two datasets we'll be using is made available as .csv
files, and will be transformed throughout the code-along before being loaded into a sqlite
database.
Extracting Data
Extracting data is almost always the first step when building a data pipelines. There are tons of shapes and sizes that data can be extracted from. Here are just a few:
- API's
- SFTP sites
- Relational databases
- NoSQL databases (columnar, document, key-value)
- Flat-files
In this code-along, we'll focus on extracting data from flat-files. A flat file might be something like a .csv
or a .json
file. The two files that we'll be extracting data from are the apps_data.csv
and the review_data.csv
file. To do this, we'll used pandas
. Let's take a closer look!
- After importing
pandas
, read theapps_data.csv
DataFrame into memory. Print the head of the DataFrame. - Similar to before, read in the DataFrame stored in the
review_data.csv
file. Take a look at the first few rows of this DataFrame. - Print the column names, shape, and data types of the
apps
DataFrame.
# Import pandas
import pandas as pd
# Read the dataset into memory, and take a look at the first few rows (store as apps)
apps = pd.read_csv("apps_data.csv")
reviews = pd.read_csv("review_data.csv")
# Print out the head of the DataFrame
#reviews
# Perform some basic checks (column names, number of records, types, etc)
print(apps.columns)
print(apps.shape)
print(apps.dtypes)
The code above works perfectly well, but this time let's try using DRY-principles to build a function to extract data.
- Create a function called
extract
, with a single parameter of namefile_path
. - Sprint the number of rows and columns in the DataFrame, as well as the data type of each column. Provide instructions about how to use the value that will eventually be returned by this function.
- Return the variable
data
. - Call the
extract
function twice, once passing in theapps_data.csv
file path, and another time with thereview_data.csv
file path. Output the first few rows of theapps_data
DataFrame.
# We can do this a little better. Let's try writing a function to extract the data, and print some important information
def extract(file_path):
# Read the file into memory
data = pd.read_csv(file_path)
# Now, print the details about the file
print(f"Here is a little bit of information about the data stored in {file_path}:")
print(f"\nThere are {data.shape[0]} rows and {data.shape[1]} columns in this DataFrame.")
print("\nThe columns in this DataFrame take the following types: ")
# Print the type of each column
print(data.dtypes)
# Finally, print a message before returning the DataFrame
print(f"\nTo view the DataFrame extracted from {file_path}, display the value returned by this function!\n\n")
return data
# Call the function (create apps_data and reviews_data)
apps_data = extract("apps_data.csv")
reviews_data = extract("review_data.csv")
# apps_data.csv
# Take a peek at one of the DataFrames
reviews_data
Transforming Data
We're interested in working with the apps and their corresponding reviews in the"FOOD_AND_DRINK"
category. We'd like to do the following:
- Define a function with name
transform
. This function will have five parameters;apps
,review
,category
,min_rating
, andmin_reviews
. - Drop duplicates from both DataFrames.
- For each of the apps in the desired category, find the number of positive reviews, and filter the columns.
- Join this back to the
apps
dataset, only keeping the following columns:App
Rating
Reviews
Installs
Sentiment_Polarity
- Filter out all records that don't have at least the
min_rating
, and more than themin_reviews
. - Order by the rating and number of installs, both in descending order.
- Call the function for the
"FOOD_AND_DRINK"
category, with a minimum average rating of 4 stars, and at least 1000 reviews.
Alright, let's give it a shot!
# Define a function to transform data
def transform(apps, reviews, category, min_rating, min_reviews):
# Print statement for observability
print(f"Transforming data to curate a dataset with all {category} apps and their "
f"corresponding reviews with a rating of at least {min_rating} and "
f"{min_reviews} reviews\n")
# Drop any duplicates from both DataFrames (also have the option to do this in-place)
reviews = reviews.drop_duplicates()
apps = apps.drop_duplicates(["App"])
# Find all of the apps and reviews in the food and drink category
subset_apps = apps.loc[apps["Category"] == category, :]
subset_reviews = reviews.loc[reviews["App"].isin(subset_apps["App"]), ["App", "Sentiment_Polarity"]]
# Aggregate the subset_reviews DataFrame
aggregated_reviews = subset_reviews.groupby(by="App").mean()
# Join it back to the subset_apps table
joined_apps_reviews = subset_apps.join(aggregated_reviews, on="App", how="left")
# Keep only the needed columns
filtered_apps_reviews = joined_apps_reviews.loc[:, ["App", "Rating", "Reviews", "Installs", "Sentiment_Polarity"]]
# Convert reviews, keep only values with an average rating of at least 4 stars, and at least 1000 reviews
filtered_apps_reviews = filtered_apps_reviews.astype({"Reviews": "int32"})
top_apps = filtered_apps_reviews.loc[(filtered_apps_reviews["Rating"] > min_rating) & (filtered_apps_reviews["Reviews"] > min_reviews), :]
# Sort the top apps, replace NaN with 0, reset the index (drop, inplace)
top_apps.sort_values(by=["Rating", "Reviews"], ascending=False, inplace=True)
top_apps.reset_index(drop=True, inplace=True)
# Persist this DataFrame as top_apps.csv file
top_apps.to_csv("top_apps.csv")
print(f"The transformed DataFrame, which includes {top_apps.shape[0]} rows "
f"and {top_apps.shape[1]} columns has been persisted, and will now be "
f"returned")
# Return the transformed DataFrame
return top_apps
# Call the function
top_apps_data = transform(
apps=apps_data,
reviews=reviews_data,
category="FOOD_AND_DRINK",
min_rating=4.0,
min_reviews=1000
)
# Show
top_apps_data
Loading Data
Next, we'd like to load the transformed dataset into a SQL database. We'll be using pandas
along with sqlite
to do just that!
- After importing
sqlite3
, create a function with nameload
. The function will have four parameters;dataframe
,database_name
,table_name
. - Connect to the database using the
connect()
function. - Write the DataFrame to the provided table name. Replace the table if it exists, and do not include the index.
- Now, we'll validate that the data was loaded correctly. Use the
read_sql()
function to return the DataFrame that was just loaded. - Assert that the number of rows and columns match in the original and loaded DataFrame.
- Return the DataFrame read from the
sqlite
database. - Call the function for the
top_apps_data
DataFrame, for the"market_research"
database and thetop_apps
table.
import sqlite3
# Now, create a function to do this
def load(dataframe, database_name, table_name):
# Create a connection object
con = sqlite3.connect(database_name)
# Write the data to the specified table (table_name)
dataframe.to_sql(name=table_name, con=con, if_exists="replace", index=False)
print("Original DataFrame has been loaded to sqlite\n")
# Read the data, and return the result (it is to be used)
loaded_dataframe = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=con)
print("The loaded DataFrame has been read from sqlite for validation\n")
try:
assert dataframe.shape == loaded_dataframe.shape
print(f"Success! The data in the {table_name} table have successfully been "
f"loaded and validated")
except AssertionError:
print("DataFrame shape is not consistent before and after loading. Take a closer look!")
# Call the function
load(
dataframe=top_apps_data,
database_name="market_research",
table_name="top_apps"
)
Running the Pipeline
Now that our functions have been defined and tested, we'll run this pipeline end-to-end!
- For verbosity, import
pandas
andsqlite3
. - Extract data from the
apps_data.csv
andreview_data.csv
functions. - Transform the data by passing in the following:
category="FOOD_AND_DRINK"
min_rating=4.0
min_reviews=1000
- Load the transformed DataFrame to the
top_apps
table in themarket_research
database. - Check out the output!
# Import modules
import pandas as pd
import sqlite3
# Extract the data
apps_data = extract("apps_data.csv")
reviews_data = extract("review_data.csv")
# Transform the data
top_apps_data = transform(
apps=apps_data,
reviews=reviews_data,
category="FOOD_AND_DRINK",
min_rating=3.0,
min_reviews=1000
)
# Load the data
load(
dataframe=top_apps_data,
database_name="market_research",
table_name="top_apps"
)