Skip to main content

Moving Data with Python and dlt: A Guide for Data Engineers

Python dlt is a great choice for data engineers who want to move data fast and confidently. It integrates with almost any source and destination and lets you write your ELT/ETL pipelines quickly. This tutorial shows you how to control every step of the pipeline and work with local files, cloud, databases, and REST APIs.
Oct 1, 2024  · 26 min read

Data engineers spend most of their time moving data, an involved process often overlooked by data scientists and software engineers. 

On paper, you’re moving data from point A to point B. In reality, it’s more like solving a puzzle in which each piece has to fit perfectly to ensure smooth and error-free data flow. Almost forgotten, the puzzle shows a clear blue sky. Good luck putting 1000 pieces together.

Making things better (or worse) is the sheer amount of data warehousing tools you can use. You can go with mostly legacy low-code or no-code solutions (think Microsoft SSIS and Oracle Data Integrator) or choose something that relies entirely on code.  The latter is easier to share and version control in the long run.

If you’re looking for something lightweight and Python-based, look no further than dlt. In today’s article, I’ll guide you through everything you need to get started.

dlt, dlt+, dltHub: What’s it All About?

In short: dlt is an open-source Python library for moving data, dlt+ is a premium version with additional features, and dltHub is a broader umbrella term for the company behind it.

What you really care about today is dlt, so let me add some more details.

As a data engineer, you can use the dlt Python library to load data from different sources and organize them into datasets that automatically infer schemas and data types, normalize the data, and handle nested structures.

dlt is designed for ELT, but you can write your pipelines as ETL if you get a bit creative. More on that later.

You can load data from dozens of sources into dozens of destinations. The command line interface you’ll use in the following section automatically generates the starting project folder for you based on the source and destination of choice. This feature saves you time both searching for code and writing it.

The library also lets you easily deploy your pipelines (e.g., through Airflow or serverless functions), but that’s a topic for another time.

Today, I just want to cover the core concepts. These are:

  • Environment setup: How to install dlt in a virtual environment and create a project.
  • Data extraction: Pull data from REST APIs, databases, local file systems, and cloud storage.
  • Data transformation: Apply transformations of choice through SQL or pandas.
  • Data loading: Save data to a local file system, cloud storage, and databases.

In the following section, I’ll show you how to create an environment for dlt and run your first pipeline. Let’s dig in!

Become a Data Engineer

Become a data engineer through advanced Python learning
Start Learning for Free

Setting Up a dlt Environment From Scratch

A best practice when setting up a new Python project is to do so in a virtual environment. I’ll show you how to go about it with conda and venv.

Create a virtual environment

I’m using Anaconda to manage environments and dependencies. If you do as well, run the following snippet to create a new virtual environment named dlt_env based on Python 3.12:

conda create --name dlt_env python=3.12 -y
conda activate dlt_env

You should see an output similar to this one in your terminal:

Conda environment setup

Conda environment setup

On the other hand, if you’re not using Anaconda, run the following command to create and activate a new Python environment:

python -m venv ./env
source ./env/bin/activate

You’re now ready to install dlt.

Install dlt and create a project

Before proceeding, you should have a virtual environment created and activated

To install dlt, run the following inside the environment:

pip install dlt
dlt version

Installed dlt version

Installed dlt version

As long as you don’t get an error message or a message showing you that dlt is not recognized, you’re good to go!

You can now use the dlt command line interface to create new projects from a template.

This is done through the dlt init <source type> <destination type> command. In the following snippet, I show you how to create a pipeline that uses a REST API as a data source and DuckDB as a destination (target):

dlt init rest_api_test duckdb

Initializing a new dlt project with DuckDB

Initializing a new dlt project with DuckDB

There’s no one stopping you from creating the files manually - just make sure your folder structure resembles the one dlt CLI creates. For example, this is what the previous shell command created on my system:

.
├── requirements.txt
├── rest_api_test_pipeline.py
├── .dlt
│   ├── config.toml
│   ├── secrets.toml

The magic happens in the .dlt folder. It contains two files that cover everything regarding pipeline configuration and data sources and destinations, such as API keys and database credentials.

The rest_api_test_pipeline.py is an automatically generated Python file that shows you a couple of ways to connect a REST API with DuckDB.

For starters, you’ll want to install everything listed in requirements.txt. Do so by running the following command:

pip install -r requirements.txt

Once installed, you’re ready to run your first dlt pipeline!

Test the demo pipeline

Here are the contents of the rest_api_test_pipeline.py file:

"""The Default Pipeline Template provides a simple starting point for your dlt pipeline"""
# mypy: disable-error-code="no-untyped-def,arg-type"

import dlt
from dlt.common import Decimal

@dlt.resource(name="customers", primary_key="id")
def rest_api_test_customers():
    """Load customer data from a simple python list."""
    yield [
        {"id": 1, "name": "simon", "city": "berlin"},
        {"id": 2, "name": "violet", "city": "london"},
        {"id": 3, "name": "tammo", "city": "new york"},
    ]

@dlt.resource(name="inventory", primary_key="id")
def rest_api_test_inventory():
    """Load inventory data from a simple python list."""
    yield [
        {"id": 1, "name": "apple", "price": Decimal("1.50")},
        {"id": 2, "name": "banana", "price": Decimal("1.70")},
        {"id": 3, "name": "pear", "price": Decimal("2.50")},
    ]

@dlt.source(name="my_fruitshop")
def rest_api_test_source():
    """A source function groups all resources into one schema."""
    return rest_api_test_customers(), rest_api_test_inventory()

def load_stuff() -> None:
    # specify the pipeline name, destination and dataset name when configuring pipeline,
    # otherwise the defaults will be used that are derived from the current script name
    p = dlt.pipeline(
        pipeline_name='rest_api_test',
        destination='duckdb',
        dataset_name='rest_api_test_data',
    )
    load_info = p.run(rest_api_test_source())
    # pretty print the information on data that was loaded
    print(load_info)  # noqa: T201

if __name__ == "__main__":
    load_stuff()

Let me explain the decorators first, as you surely haven’t seen them before:

  • @dlt.resource: Used to create a generic resource, for example, a database table with a name and primary key column name. You’ll want to yield a dataset (e.g., a Python list, a pandas DataFrame) instead of returning it.
  • @dlt.source: Used to group multiple resources, for example, multiple tables into a single database schema. The function must return function calls to your resources.

Python functions in this file are pretty straightforward, but let me explain the logic:

  • rest_api_test_customers(): Generates random data on customer name and city.
  • rest_api_test_inventory(): Generates random data on product name and price.
  • rest_api_test_source(): Groups the previous two resources together into one schema.
  • load_stuff(): Creates and runs a pipeline that moves dummy Python data into a DuckDB database.

Note that the data isn’t actually fetched from a REST API, but its structure looks exactly like JSON that any REST API would return. Later in the article, I’ll show you how to work with actual APIs.

For now, run the pipeline with the following shell command:

python rest_api_test_pipeline.py

Running the pipeline

Running the pipeline

If you see a message similar to the one above, the pipeline execution was successful. 

You can connect to the local DuckDB file through any database tool (I’m using TablePlus). Once there, your data will be inside a schema specified under the dataset_name parameter in dlt.pipeline()

The customers table shows data generated with Python, with the addition of two data lineage columns:

Customers table contents

Customers table contents

The same holds for the inventory table. Note that the price column contains NULLs. There’s likely a problem with the Decimal data type that dlt used by default in this example:

Inventory table content

Inventory table content

Any time you run a pipeline, dlt will create (or append to) three metadata tables. 

The first one, _dlt_loads, tracks the history of data loads performed by the pipeline. It shows the name of the schema, load status, and time of the load, among other things:

History of data loads

History of data loads

The _dlt_pipeline_state table shows details on the pipeline’s configuration, checkpoints, and progress. It’s also helpful for cases when your pipeline stops for whatever reason, as the data from this table can resume it from where it left off:

Pipeline state table

Pipeline state table

Finally, the _dlt_version table stores information about the library itself to ensure compatibility:

dlt version table

dlt version table

When combined, these three metadata tables provide all the details you want to track over time to manage your data pipelines effectively.

You now have a high-level overview of how dlt works. Next, I’ll show you how to extract data from different sources.

Extracting Data with dlt and Python

I will cover four types of data sources in this section: REST APIs, databases, cloud storage, and local file system.

Fetch data from a REST API

REST APIs are at the core when it comes to sources for data pipelines. The dlt library makes working with them feel effortless.

For demonstration, I’ll use the JSONPlaceholder API that has endpoints for fetching posts, comments, and users, among other things. As for the data destination, I’ll go with DuckDB since it doesn’t require any configuration.

To connect to a REST API, use the rest_api_source() function. You can pass in a dictionary providing the base API URL and the pagination rules. You can add other properties, e.g., for authentication, but my API of choice is publicly accessible so it’s not needed.

The interesting part comes from the resources property. Here, you’ll provide a list of endpoints from which dlt should fetch the data. For example, the posts resource will connect to https://jsonplaceholder.typicode.com/posts. You can tweak individual resources in much more depth, but I’ll leave that up to you.

The rest of this Python file remains more or less unchanged:

import dlt
from dlt.sources.rest_api import rest_api_source

def load_rest_api_data() -> None:
    data_source = rest_api_source({
        "client": {
            "base_url": "https://jsonplaceholder.typicode.com/",
            "paginator": {
                "type": "json_response",
                "next_url_path": "paging.next"
            }
        },
        "resources": [
            "posts",
            "comments",
            "users"
        ]
    })
    pipeline = dlt.pipeline(
        pipeline_name="rest_api_jsonplaceholder",
        destination="duckdb",
        dataset_name="jsonplaceholder"
    )
    load_info = pipeline.run(data_source)
    print(load_info)

if __name__ == "__main__":
    load_rest_api_data()

After running the pipeline, you’ll see three data tables, one for each specified resource. 

The first one shows dummy posts: 

Posts table

Posts table

The second one shows post comments:

Comments table

Comments table

And the third shows users. This one is interesting because it contains nested JSON data. The dlt library flattened it, so it can be stored without issues in a single table:

Users table

users table

Neat, right? You no longer have to parse JSON manually.

Fetch data from a database

If there’s one data source more commonly used than REST APIs, it has to be a relational database.

For the sake of demonstration, I’ve provisioned a free-tier Postgres database on AWS:

PostgreSQL database on AWS

PostgreSQL database on AWS

And loaded the well-known Iris dataset into it as a table:

Data in a Postgres database

Data in the Postgres database

I’ll now show you how to connect to a Postgres source from dlt. It’s a more involved process than you might initially think.

First, start by installing a submodule for working with SQL databases:

pip install "dlt[sql_database]"

Then, initialize a pipeline that uses a generic SQL database as a source and DuckDB as a destination:

dlt init sql_database duckdb

This is the first time you’ll play around with the files in the .dtl folder. Inside config.toml, specify the source table name from which you want to extract the data:

[sources.sql_database]
table = "table" # please set me up!
[runtime]
dlthub_telemetry = true

Replace ”table” with “iris” and you’ll be good to go.

Inside secrets.toml, you’ll want to specify connection parameters for connecting to the source database. Note that you absolutely must add [sources.sql_database.credentials] name before specifying connection parameters. Otherwise, dlt won’t know what they refer to:

[sources.sql_database.credentials]
drivername = "drivername" # please set me up!
database = "database" # please set me up!
password = "password" # please set me up!
username = "username" # please set me up!
host = "host" # please set me up!
port = 0 # please set me up!

And that’s it regarding configuration. You can now switch to the Python pipeline file.

Long story short, you only need to call the sql_database() function to connect to the database and extract source data. Every configuration detail is specified in the TOML files, so you can call this function without any parameters.

The rest of the script introduces nothing new:

import dlt
from dlt.sources.sql_database import sql_database

def load_database_data() -> None:
    source = sql_database()
    pipeline = dlt.pipeline(
        pipeline_name="postgres_to_duckdb_pipeline",
        destination="duckdb",
        dataset_name="iris_from_postgres"
    )
    
    load_info = pipeline.run(source)
    print(load_info)

if __name__ == "__main__":
    load_database_data()

Iris table

Iris table

The target table in DuckDB has all the Iris data with the additional two data lineage columns.

Fetch data from cloud storage

More often than not, you’ll want your pipelines to connect to cloud storage, such as AWS S3 buckets.

I’ve created an S3 bucket for the sake of this article and uploaded a single Parquet file with datasets from the NYC taxi rides:

S3 bucket with a single Parquet file

S3 bucket with a single Parquet file

To connect a local or remote file system to dlt, you’ll want to install an additional submodule:

pip install "dlt[filesystem]"

You can now initialize a new pipeline that uses a file system as a source and DuckDB as a destination:

dlt init filesystem duckdb

The first order of business is to fill in the configuration files. The config.toml file needs the full URL of your S3 bucket:

[sources.filesystem]
bucket_url = "bucket_url" # please set me up!
[runtime]
dlthub_telemetry = true

And the secrets.toml file needs info on your IAM role (access key and secret access key), along with the region in which your bucket is provisioned:

[sources.filesystem.credentials]
aws_access_key_id = "aws_access_key_id" # please set me up!
aws_secret_access_key = "aws_secret_access_key" # please set me up!
region_name = "region_name" # please set me up!

Onto the pipeline code.

Use the readers() function to read data from your configured source. The file_glob parameter controls which files should be read. I’ve set it up in a way to return all Parquet files. Then, simply chain the read_parquet() function to read the data:

import dlt
from dlt.sources.filesystem import readers, read_parquet

def load_s3_data() -> None:
    source = readers(file_glob="*.parquet").read_parquet()
    pipeline = dlt.pipeline(
        pipeline_name="s3_to_duckdb_pipeline",
        destination="duckdb",
        dataset_name="nyc_data_from_s3"
    )
    load_info = pipeline.run(source.with_name("nyc_taxi_data"))
    print(load_info)

if __name__ == "__main__":
    load_s3_data()

NYC taxi rides table

NYC taxi rides table

And there you have it - tens of thousands of records loaded within seconds!

Fetch data from a local file system

Reading data from a local file system is almost identical to reading data from S3.

This time, I’m using the mtcars dataset saved locally into a localbucket folder:

File on a local system

File on a local system

Now in your config.toml file, provide an absolute path to your local folder. It’s important to prefix the path with file:///. That’s three slashes:

[sources.filesystem]
bucket_url = "file:///absolute/path/to/local/folder"
[runtime]
dlthub_telemetry = true

Since you’re not connecting to S3 anymore, you can delete the contents of secrets.toml.

The actual pipeline code will look almost identical to the one from the previous section. The only change is that in file_glob you’re now providing a file name, and using a different function for reading the data (CSV format):

import dlt
from dlt.sources.filesystem import filesystem, read_csv_duckdb

def load_local_data() -> None:
    source = filesystem(file_glob="mtcars.csv") | read_csv_duckdb()
    pipeline = dlt.pipeline(
        pipeline_name="local_filesystem_to_duckdb_pipeline",
        destination="duckdb",
        dataset_name="mtcars_data_from_local_disk"
    )
    load_info = pipeline.run(source.with_name("mtcars"))
    print(load_info)

if __name__ == "__main__":
    load_local_data()

The mtcars table

The mtcars table

Works like a charm!

That’s all I want to showcase regarding data sources. You can find many additional ones to suit your specific needs on the dlt documentation page for data sources.

Become a Data Engineer

Build Python skills to become a professional data engineer.

Data Transformation: The Essence of ETL/ELT Tools

There’s no point in just moving data from point A to point B. You typically want to apply transformation before writing data to the destination (ETL) or after (ELT).

If the acronyms ETL and ELT don’t ring a bell, we recommend going through our data pipeline course with Python.

In this section, I’ll show you how to apply transformations to your data with SQL and Python.

Transform data with SQL

Data engineers live and breathe SQL. In this section, I’ll show you how to load a local file into a Postgres database, transform it, and write it yet again into another table (ELT).

Start by tweaking the secrets.toml file to include database connection details for source and destination. The values will be identical, you just need to specify them two times:

[sources.sql_database.credentials]
drivername = "drivername" # please set me up!
database = "database" # please set me up!
password = "password" # please set me up!
username = "username" # please set me up!
host = "host" # please set me up!
port = 0 # please set me up!

[destinations.sql_database.credentials]
drivername = "drivername" # please set me up!
database = "database" # please set me up!
password = "password" # please set me up!
username = "username" # please set me up!
host = "host" # please set me up!
port = 0 # please set me up!

Inside config.toml, write an absolute path to a folder located on your file system:

[sources.filesystem]
bucket_url = "file:///absolute/path/to/local/folder"
[runtime]
dlthub_telemetry = true

The Python pipeline file will have two functions:

  • load_source_data(): It loads a local CSV file and saves it into a table in a Postgres database.
  • transform_data(): It extracts data from a Postgres table and creates a new table based on the original one. I’ll only add a couple of columns to prove a point. The target table is first deleted if it exists.

You’ll want to call the functions one after the other:

import dlt
from dlt.sources.filesystem import filesystem, read_csv

def load_source_data() -> None:
    source = filesystem(file_glob="iris.csv") | read_csv()
    pipeline = dlt.pipeline(
        pipeline_name="load_source_data",
        destination="postgres",
        dataset_name="data"
    )
    load_info = pipeline.run(source.with_name("iris_src"), write_disposition="replace")
    print(load_info)

def transform_data() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="transform_data",
        destination="postgres",
        dataset_name="data"
    )
    try:
        with pipeline.sql_client() as client:
            client.execute_sql("""
                DROP TABLE iris_tgt;
            """)
            print(f"Table iris_tgt deleted!")
    except Exception as e:
        print(f"Table iris_tgt does not exists, proceeding!")
    try:
        with pipeline.sql_client() as client:
            client.execute_sql("""
                CREATE TABLE iris_tgt AS (
                    SELECT
                        sepal_length,
                        sepal_width,
                        petal_length,
                        petal_width,
                        sepal_length + sepal_width AS total_sepal,
                        petal_length + petal_width AS total_petal,
                        UPPER(species) AS species_upper
                    FROM iris_src
                );
            """)
            print(f"Table iris_tgt created and filled!")
    except Exception as e:
        print(f"Unable to transform data! Error: {str(e)}")

if __name__ == "__main__":
    load_source_data()
    transform_data()

The source table contains the Iris dataset with two additional data lineage columns:

Iris source table

Iris source table

And the target table contains the transformed version of the Iris dataset:

Iris target table

Iris target table

This transformation example is pretty basic but explains how to transform your data when it’s already loaded to the destination.

Transform data with pandas

What if you want to transform the data before writing it to the database? In other words, if you don’t want to save raw and transformed data? That’s basically a description of an ETL pipeline. I’ll show you how to implement one with pandas.

Your TOML files remain unchanged.

In the pipeline code, you’ll now need to implement a couple of additional functions:

  • extract_data(): Uses pandas to read a local CSV file.
  • transform_data(): Applies data transformations using pandas rather than SQL.
  • data(): A function decorated with @dlt.resource that yields the transformed dataset.
  • load_data(): Loads the dlt.resource into the iris_tgt table.

The code of this pipeline will feel familiar if you’re experienced with pandas:

import dlt
import pandas as pd

def extract_data() -> pd.DataFrame:
    return pd.read_csv("iris.csv")

def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    df["sepal_sum"] = df["sepal_length"] + df["sepal_width"]
    df["petal_sum"] = df["petal_length"] + df["petal_width"]
    df = df.drop(["sepal_length", "sepal_width", "petal_length", "petal_width"], axis=1)
    return df

@dlt.resource
def data():
    yield transform_data(df=extract_data())

def load_data() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="pandas_etl_pipeline",
        destination="duckdb",
        dataset_name="data"
    )
    load_info = pipeline.run(data(), table_name="iris_tgt")
    print(load_info)

if __name__ == "__main__":
    load_data()

Iris target table

Iris target table

Once again, dlt was designed with ELT in mind, but ETL workflows don’t require too many code changes.

Other data transformation options to consider

Both SQL and pandas are easy ways to transform data before and after loading to the destination.

Authors of dlt recommend that you use Python’s dbt instead. It’s a far better and more versatile option, but it’s too complex of a tool for us to cover in a single article. If you or your team are already using dbt, then using it in dlt pipelines will feel like a natural extension.

If you want to learn more about dbt in Python, we have a full 4-module long course available.

On the other hand, if you’re a beginner with basic SQL and pandas knowledge, the two transformation approaches I’ve shown are enough to get you started.

Loading Data with dlt

So far, I’ve used DuckDB as a destination of choice because it’s simple and requires no setup. In this section, I’ll show you how to work with a couple of more destinations, including a local file system, cloud storage, and databases.

Load data to disk

Sometimes, you want to save the result of a data pipeline to disk. Now, dlt isn’t the perfect candidate for the job as it also stores a lot of metadata, but you can use it if you absolutely need to.

Start by tweaking the config.toml file. Provide paths to source and destination folders on your local system:

[sources.filesystem]
bucket_url = "file:///path/to/source/folder"
[destination.filesystem]
bucket_url = "file:///path/to/target/folder"
[runtime]
dlthub_telemetry = true

Now in the Python file, simply load the source file from a file system and run it through the pipeline. I haven’t applied any data transformations for the sake of simplicity:

import dlt
from dlt.sources.filesystem import filesystem, read_csv

def load_local_data() -> None:
    source = filesystem(file_glob="iris.csv") | read_csv()
    pipeline = dlt.pipeline(
        pipeline_name="local_to_local",
        destination="filesystem",
        dataset_name="data"
    )
    load_info = pipeline.run(source.with_name("iris"), loader_file_format="csv")
    print(load_info)

if __name__ == "__main__":
    load_local_data()

This is the content of the target folder:

Local folder contents

Local folder contents

It’s a mess, and that’s just the result of a single pipeline. Imagine you had dozens of them.

Databases are more natural places to store this information.

Load data into a database

I’m using a Postgres database provisioned on AWS, but you don’t have to. Feel free to use any other database and cloud vendors.

Inside config.toml, you’ll want to provide a path to the folder on your local file system:

[sources.filesystem]
bucket_url = "file:///your-local-bucket"
[runtime]
dlthub_telemetry = true

And in secrets.toml, write the database connection details:

[destination.postgres.credentials]
drivername = "drivername" # please set me up!
database = "database" # please set me up!
password = "password" # please set me up!
username = "username" # please set me up!
host = "host" # please set me up!
port = 0 # please set me up!

Loading source data into the database now boils down to changing the value for the destination parameter in dlt.pipeline(). The with_name() function called on the data source controls how the database table will be named:

import dlt
from dlt.sources.filesystem import filesystem, read_csv

def load_to_database() -> None:
    source = filesystem(file_glob="iris.csv") | read_csv()
    pipeline = dlt.pipeline(
        pipeline_name="local_to_db",
        destination="postgres",
        dataset_name="local_load"
    )
    
    load_info = pipeline.run(source.with_name("iris_from_local"))
    print(load_info)

if __name__ == "__main__":
    load_to_database()

Loaded iris data

Loaded iris data

Let’s explore one more destination option before discussing more advanced dlt stuff.

Load data to cloud storage

Just like with your local file system, cloud storage (e.g., S3) will also store metadata in separate files and folders.

If you want to try it, start by adjusting the config.toml file to include paths to a local folder and the S3 bucket:

[sources.filesystem]
bucket_url = "file:///path/to/local/folder"
[destination.filesystem]
bucket_url = "s3://bucket-name"
[runtime]
dlthub_telemetry = true

As for the secrets.toml, enter your IAM credentials and S3 bucket region name:

[destination.filesystem.credentials]
aws_access_key_id = "aws_access_key_id" # please set me up!
aws_secret_access_key = "aws_secret_access_key" # please set me up!
region_name = "region_name" # please set me up!

Running the pipeline will transfer the local CSV file to S3, and store it in JSONL (JSON lines) format:

import dlt
from dlt.sources.filesystem import filesystem, read_csv

def load_to_s3() -> None:
    source = filesystem(file_glob="iris.csv") | read_csv()
    pipeline = dlt.pipeline(
        pipeline_name="local_to_db",
        destination="filesystem",
        dataset_name="iris_data"
    )
    
    load_info = pipeline.run(source.with_name("iris"))
    print(load_info)

if __name__ == "__main__":
    load_to_s3()

S3 bucket contents

S3 bucket contents

You now know the basics of moving data around and transforming it along the way. But what happens when you run the pipeline multiple times? That’s what I’ll discuss next.

Advanced: Insert and Update of New Data

You’ll almost never run the pipeline just once. It’s more typical to schedule pipeline runs, for example, to run once a day or once every week.

If that’s the case, you’ll probably want to take more control of how new data is inserted, and how the existing data is updated.

New data is appended by default

Every time you run your pipeline, new data will be appended to your table of choice.

Let me demonstrate that by writing a simple pipeline that inserts a dummy pandas DataFrame into a DuckDB table:

import dlt
import pandas as pd

@dlt.resource(primary_key="index")
def data():
    data = pd.DataFrame({
        "index": [1, 2, 3, 4, 5],
        "a": [10, 15, 20, 25, 30],
        "b": [22.5, 30.5, 35.5, 50.5, 10.5],
        "c": ["row 1", "row 2", "row 3", "row 4", "row 5"]
    })
    yield data

def load_local_data() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="pipeline_incremental_load",
        destination="duckdb",
        dataset_name="data"
    )
    load_info = pipeline.run(data=data(), table_name="dataset")
    print(load_info)

if __name__ == "__main__":
    load_local_data()

Running the pipeline once will insert five rows of data:

First append run

First append run

And running it again inserts the same five rows:

Second append run

Second append run

With this sort of data it’s easy to conclude one thing - it’s duplicated. Let me show you what you can do about it.

How to replace existing rows

If you want to delete existing records and replace them with new ones, add write_disposition=”replace” to pipeline.run():

load_info = pipeline.run(data=data(), table_name="dataset", write_disposition="replace")

After doing this, I’ve run the pipeline four times, as you can see from the _dlt_loads metadata table:

Running the pipeline multiple times

Running the pipeline multiple times

And the dataset table contains only 5 records:

Dataset contents

dataset table contents

In short, this approach will always replace the existing data.

How to implement an upsert operation

But what if you have a mix of new and updated data? That’s where the upsert operation comes in. It’s implemented through the merge write disposition in dlt.

For reference, I’ve run the pipeline once more with the data from the previous section to ensure there are 5 rows present.

Then, I updated the data to add one more row and to change the values for some records with the existing index values.

The dlt package will look at the primary_key column you’ve specified and compare it to the data present on the destination system. Records with the existing primary key column will be updated, and the new ones will be inserted:

import dlt
import pandas as pd

@dlt.resource(primary_key="index")
def data():
    data = pd.DataFrame({
        "index": [1, 2, 3, 4, 5, 6],
        "a": [10, 15, 200000, 25, -3000, 50],
        "b": [22.5, 30.5, 3555555.5, 50.5, -100.5, 15.5],
        "c": ["row 1", "row 2", "row 3 updated", "row 4", "row 5 updated", "row 6 new"]
    })
    yield data

def load_local_data() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="pipeline_incremental_load",
        destination="duckdb",
        dataset_name="data"
    )
    load_info = pipeline.run(data=data(), table_name="dataset", write_disposition="merge")
    print(load_info)

if __name__ == "__main__":
    load_local_data()

Dataset after the upsert operation

Dataset after the upsert operation

In short, you end up with one new row and two updated ones. 

This is probably the behavior you want for most of your pipelines, especially when you want to avoid data duplication.

Wrapping Up

To conclude, there’s more to data engineering than meets the eye. Data movement is just one part. Nevertheless, it’s a vital part and most other aspects of data engineering revolve around it.

If you’re already familiar with Python, you probably aren’t looking to learn a GUI tool for data moving from scratch, or migrate to a new language entirely. You want to stick to a familiar environment. Python’s dlt library is all you and your team need to modernize legacy systems and reduce cloud costs. It’s tried and tested by many big players, such as Hugging Face, Taktile, Untitled Data Company, and Harness, so I believe it will suit your needs as well.

That being said, dlt likely isn’t the only tool you need to implement efficient data pipelines. You’ll want to learn additional data tools such as dbt to take data processing to the next level.

If you’re looking to start a career as a data engineer, our 3-module course is the perfect way to get started.

Get certified in your dream Data Engineer role

Our certification programs help you stand out and prove your skills are job-ready to potential employers.

Get your Certification
Timeline mobile.png

FAQs

What’s the difference between dlt and dltHub?

 dlt is an open-source Python library for moving data. dltHub is the company behind dlt.

Do I have to use dlt init every time I want to create a new pipeline?

No, but the dlt init command has the convenience of creating and prepopulating configuration files for you. You can spend an extra couple of minutes and add them manually.

Is dlt the only tool I need as a data engineer?

Probably not. dlt is best used to move data from point A to point B. You can apply some transformations with SQL or pandas, but for the most flexibility, you’ll want to use Data Build Tool (dbt).

Can I implement SCD2 when loading data with dlt?

Yes, it’s a special type of merge strategy. I have covered data merging in this article, but didn’t touch on SCD2 specifically. Refer to the documentation example to learn more about it.

How can I deploy my dlt pipelines?

There are numerous options to deploy your pipelines, such as GitHub actions, serverless functions, Airflow, Kestra, Dagster, Perfect, and many others.


Dario Radečić's photo
Author
Dario Radečić
LinkedIn
Senior Data Scientist based in Croatia. Top Tech Writer with over 700 articles published, generating more than 10M views. Book Author of Machine Learning Automation with TPOT.
Topics

Learn more about data engineering with these courses!

course

Understanding Data Engineering

2 hr
242K
Discover how data engineers lay the groundwork that makes data science possible. No coding involved!
See DetailsRight Arrow
Start Course
See MoreRight Arrow
Related

blog

What is Reverse ETL? A Helpful Guide

Reverse ETL (extract, transform, load) moves and transforms data from warehouses into operational tools like CRMs, marketing platforms, and SaaS apps. Learn more about the specific tools and strategic importance of Reverse ETL in modern data management.
Oluseye Jeremiah's photo

Oluseye Jeremiah

6 min

blog

ELT Explained: Data Integration for the Cloud Era

Discover the ELT process (Extract, Load, Transform) and its role in modern data architecture. Learn how ELT enhances data integration and processing.
Austin Chia's photo

Austin Chia

19 min

tutorial

DuckDB for Data Engineers: Speed Up Your Data Pipelines 10x and More

DuckDB is a powerful analytical engine that lives on your laptop. You can use it to speed up data reading and processing and reduce your pipeline runtimes from minutes to seconds. Follow this hands-on guide to learn how.
Dario Radečić's photo

Dario Radečić

27 min

tutorial

Building an ETL Pipeline with Airflow

Master the basics of extracting, transforming, and loading data with Apache Airflow.
Jake Roach's photo

Jake Roach

15 min

code-along

Getting Started with Data Pipelines for ETL

In this session, you'll learn fundamental concepts of data pipelines, like what they are and when to use them, then you'll get hands-on experience building a simple pipeline using Python.
Jake Roach's photo

Jake Roach

code-along

Full Stack Data Engineering with Python

In this session, you'll see a full data workflow using some LIGO gravitational wave data (no physics knowledge required). You'll see how to work with HDF5 files, clean and analyze time series data, and visualize the results.
Blenda Guedes's photo

Blenda Guedes

See MoreSee More