Skip to content
New Workbook
Sign up
Getting Started with Data Pipelines for ETL

Getting Started with Data Pipelines for ETL

Data pipelines are everywhere! More than ever, data practitioners find themselves needing to extract, transform, and load data to power the work they do. During this code-along, I'll walk through the basics of building a data pipeline using Python, pandas, and sqlite.

Throughout the tutorial, I'll be using the "Google Play Store Apps" dataset, available in DataCamp Workspaces. The two datasets we'll be using is made available as .csv files, and will be transformed throughout the code-along before being loaded into a sqlite database.

Extracting Data

Extracting data is almost always the first step when building a data pipelines. There are tons of shapes and sizes that data can be extracted from. Here are just a few:

  • API's
  • SFTP sites
  • Relational databases
  • NoSQL databases (columnar, document, key-value)
  • Flat-files

In this code-along, we'll focus on extracting data from flat-files. A flat file might be something like a .csv or a .json file. The two files that we'll be extracting data from are the apps_data.csv and the review_data.csv file. To do this, we'll used pandas. Let's take a closer look!

  1. After importing pandas, read the apps_data.csv DataFrame into memory. Print the head of the DataFrame.
  2. Similar to before, read in the DataFrame stored in the review_data.csv file. Take a look at the first few rows of this DataFrame.
  3. Print the column names, shape, and data types of the apps DataFrame.
# Import pandas
import pandas as pd

# Read the dataset into memory, and take a look at the first few rows (store as apps)
apps = pd.read_csv("apps_data.csv")
reviews = pd.read_csv("review_data.csv")

# Print out the head of the DataFrame
#reviews

# Perform some basic checks (column names, number of records, types, etc)
print(apps.columns)
print(apps.shape)
print(apps.dtypes)

The code above works perfectly well, but this time let's try using DRY-principles to build a function to extract data.

  1. Create a function called extract, with a single parameter of name file_path.
  2. Sprint the number of rows and columns in the DataFrame, as well as the data type of each column. Provide instructions about how to use the value that will eventually be returned by this function.
  3. Return the variable data.
  4. Call the extract function twice, once passing in the apps_data.csv file path, and another time with the review_data.csv file path. Output the first few rows of the apps_data DataFrame.
# We can do this a little better. Let's try writing a function to extract the data, and print some important information
def extract(file_path):
    # Read the file into memory
    data = pd.read_csv(file_path)
    
    # Now, print the details about the file
    print(f"Here is a little bit of information about the data stored in {file_path}:")
    print(f"\nThere are {data.shape[0]} rows and {data.shape[1]} columns in this DataFrame.")
    print("\nThe columns in this DataFrame take the following types: ")
    
    # Print the type of each column
    print(data.dtypes)
    
    # Finally, print a message before returning the DataFrame
    print(f"\nTo view the DataFrame extracted from {file_path}, display the value returned by this function!\n\n")
    
    return data
    

# Call the function (create apps_data and reviews_data)
apps_data = extract("apps_data.csv")
reviews_data = extract("review_data.csv")

# apps_data.csv

# Take a peek at one of the DataFrames
reviews_data

Transforming Data

We're interested in working with the apps and their corresponding reviews in the"FOOD_AND_DRINK" category. We'd like to do the following:

  1. Define a function with name transform. This function will have five parameters; apps, review, category, min_rating, and min_reviews.
  2. Drop duplicates from both DataFrames.
  3. For each of the apps in the desired category, find the number of positive reviews, and filter the columns.
  4. Join this back to the apps dataset, only keeping the following columns:
    • App
    • Rating
    • Reviews
    • Installs
    • Sentiment_Polarity
  5. Filter out all records that don't have at least the min_rating, and more than the min_reviews.
  6. Order by the rating and number of installs, both in descending order.
  7. Call the function for the "FOOD_AND_DRINK" category, with a minimum average rating of 4 stars, and at least 1000 reviews.

Alright, let's give it a shot!

# Define a function to transform data
def transform(apps, reviews, category, min_rating, min_reviews):
    # Print statement for observability
    print(f"Transforming data to curate a dataset with all {category} apps and their "
          f"corresponding reviews with a rating of at least {min_rating} and "
          f"{min_reviews} reviews\n")
    
    # Drop any duplicates from both DataFrames (also have the option to do this in-place)
    reviews = reviews.drop_duplicates()
    apps = apps.drop_duplicates(["App"])
    
    # Find all of the apps and reviews in the food and drink category
    subset_apps = apps.loc[apps["Category"] == category, :]
    subset_reviews = reviews.loc[reviews["App"].isin(subset_apps["App"]), ["App", "Sentiment_Polarity"]]
    
    # Aggregate the subset_reviews DataFrame
    aggregated_reviews = subset_reviews.groupby(by="App").mean()
    
    # Join it back to the subset_apps table
    joined_apps_reviews = subset_apps.join(aggregated_reviews, on="App", how="left")
    
    # Keep only the needed columns
    filtered_apps_reviews = joined_apps_reviews.loc[:, ["App", "Rating", "Reviews", "Installs", "Sentiment_Polarity"]]
    
    # Convert reviews, keep only values with an average rating of at least 4 stars, and at least 1000 reviews
    filtered_apps_reviews = filtered_apps_reviews.astype({"Reviews": "int32"})
    top_apps = filtered_apps_reviews.loc[(filtered_apps_reviews["Rating"] > min_rating) & (filtered_apps_reviews["Reviews"] > min_reviews), :]
    
    # Sort the top apps, replace NaN with 0, reset the index (drop, inplace)
    top_apps.sort_values(by=["Rating", "Reviews"], ascending=False, inplace=True)
    top_apps.reset_index(drop=True, inplace=True)
     
    # Persist this DataFrame as top_apps.csv file
    top_apps.to_csv("top_apps.csv")
    
    print(f"The transformed DataFrame, which includes {top_apps.shape[0]} rows "
          f"and {top_apps.shape[1]} columns has been persisted, and will now be "
          f"returned")
    
    # Return the transformed DataFrame
    return top_apps


# Call the function
top_apps_data = transform(
    apps=apps_data,
    reviews=reviews_data,
    category="FOOD_AND_DRINK",
    min_rating=4.0,
    min_reviews=1000
)

# Show
top_apps_data

Loading Data

Next, we'd like to load the transformed dataset into a SQL database. We'll be using pandas along with sqlite to do just that!

  1. After importing sqlite3, create a function with name load. The function will have four parameters; dataframe, database_name, table_name.
  2. Connect to the database using the connect() function.
  3. Write the DataFrame to the provided table name. Replace the table if it exists, and do not include the index.
  4. Now, we'll validate that the data was loaded correctly. Use the read_sql() function to return the DataFrame that was just loaded.
  5. Assert that the number of rows and columns match in the original and loaded DataFrame.
  6. Return the DataFrame read from the sqlite database.
  7. Call the function for the top_apps_data DataFrame, for the "market_research" database and the top_apps table.
import sqlite3

# Now, create a function to do this
def load(dataframe, database_name, table_name):
    # Create a connection object
    con = sqlite3.connect(database_name)
    
    # Write the data to the specified table (table_name)
    dataframe.to_sql(name=table_name, con=con, if_exists="replace", index=False)
    print("Original DataFrame has been loaded to sqlite\n")
    
    # Read the data, and return the result (it is to be used)
    loaded_dataframe = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=con)
    print("The loaded DataFrame has been read from sqlite for validation\n")
    
    try:
        assert dataframe.shape == loaded_dataframe.shape
        print(f"Success! The data in the {table_name} table have successfully been "
              f"loaded and validated")

    except AssertionError:
        print("DataFrame shape is not consistent before and after loading. Take a closer look!")


# Call the function
load(
    dataframe=top_apps_data,
    database_name="market_research",
    table_name="top_apps"
)
    

Running the Pipeline

Now that our functions have been defined and tested, we'll run this pipeline end-to-end!

  1. For verbosity, import pandas and sqlite3.
  2. Extract data from the apps_data.csv and review_data.csv functions.
  3. Transform the data by passing in the following:
    • category="FOOD_AND_DRINK"
    • min_rating=4.0
    • min_reviews=1000
  4. Load the transformed DataFrame to the top_apps table in the market_research database.
  5. Check out the output!
# Import modules
import pandas as pd
import sqlite3

# Extract the data
apps_data = extract("apps_data.csv")
reviews_data = extract("review_data.csv")
Hidden output
# Transform the data
top_apps_data = transform(
    apps=apps_data,
    reviews=reviews_data,
    category="FOOD_AND_DRINK",
    min_rating=3.0,
    min_reviews=1000
)
# Load the data
load(
    dataframe=top_apps_data,
    database_name="market_research",
    table_name="top_apps"
)