Skip to main content
HomeTutorialsMachine Learning

ML Workflow Orchestration With Prefect

Learn everything about a powerful and open-source workflow orchestration tool. Build, deploy, and execute your first machine learning workflow on your local machine and the cloud with this simple guide.
Updated Apr 2024

In this tutorial, we will delve into Prefect by exploring its key features, benefits, and components. We'll examine how to create tasks and flows, manage deployments, and execute flow runs. Finally, we will guide you through deploying your workflow on Prefect Cloud and running it using cloud infrastructure.

For those interested in how to automate machine learning pipelines using GitHub Actions and Data Version Control, consider taking our CI/CD for Machine Learning course.

What is Prefect?

Prefect is a powerful and open-source workflow orchestration tool that lets users design, monitor, and respond to data and machine learning pipelines using Python code.

Prefect comes equipped with a range of features, such as automatic retries, scheduling, and caching, to name a few. It is a powerful tool that ultimately allows you to construct resilient and dynamic workflows.

Prefect Cloud Dashboard.

Image Source

With Prefect, you can easily transform any Python function into a unit of work that can be observed and orchestrated by simply adding a few decorators to your code.

Prefect is ideal for standardizing workflow development and deployment across organizations. It provides complete observability into workflows and allows users to manage their code without the need for strict DAGs or boilerplate code.

Prefect Key Features

Lightweight

With a single command, users can effortlessly set up their own orchestration server for development purposes. This streamlined process simplifies local development and ensures smooth deployment to production environments.

Intuitive UI and Dashboard

Prefect provides a generally intuitive user interface, enhancing the user experience by offering a visually appealing and user-friendly way to run, monitor, debug, and manage workflows.

Prefect offers features like scheduling, retries, native secrets management, scaling capabilities, concurrency control, flexible infrastructure and storage configuration options, and robust logging functionality.

Open-source Flexibility

Users have the freedom to either self-host an open-source Prefect server or leverage advanced features, high availability, and enhanced security by utilizing Prefect Cloud.

Prefect Cloud

Prefect Cloud provides all the capabilities of the open-source Prefect server along with additional features like automations, event feeds, events, webhooks, workspaces, organizations, and more, tailored for a hosted environment. You can either use Prefect Cloud for free or pay for more robust features.

Pure Python

Prefect allows users to initiate workflow orchestration with a single decorator, streamlining the process without unnecessary abstractions for efficient workflow management.

Notification

Send information about Prefect workflows and pipelines to multiple messaging services like Slack, Discord, emails, Microsoft Teams, and more. This can also alert you if the workflow fails.

Core Components of Prefect

This section will cover the key components used to build tasks and flows, deploy and execute them, and manage them. It is crucial to review these components before delving into the Python tutorial.

Prefect Task

A Prefect Task is a Python function decorated with the @task decorator that represents discrete units of work within a Prefect workflow. We can also customize the task decorator with optional arguments like name, description, tags, cache settings, retries, and more.

In the code below, there are three machine learning tasks created with @task decorator that can be reused across flows.

@task
def train_model(X_train, X_test, y_train):
    # Selecting the best features
    ...

    # Train the model
    ...

    return model


@task
def get_prediction(X_test, model: LogisticRegression):
    ...
	return prediction

@task
def evaluate_model(y_test, prediction: pd.DataFrame):
    ...

Prefect Flow

A Prefect Flow is a Python function decorated with the @flow decorator that encapsulates workflow logic. This makes it easier for users to define, configure, and execute complex data pipelines with flexibility and ease.

In the code below, we have created a Flow using the @flow decorator. The flow will run all the tasks sequentially, taking inputs from one to another.

@flow
def ml_workflow():
    model = train_model(X_train, X_test, y_train)
    predictions = get_prediction(X_test, model)
    evaluate_model(y_test, predictions)

if __name__ == "__main__":
    ml_workflow()

Prefect Deployments

Deployments are flows stored on the local server or on the cloud and include important information for orchestrating your workflow remotely, such as scheduling and execution details.

Prefect Work Pools

Work pools serve as a mediator between orchestration and execution environments, enabling efficient flow run scheduling and execution. Users can set up their Agents (worker) for running workflows locally or leverage cloud infrastructure for seamless workflow execution.

Similar to a dedicated compute instance, Work Pools efficiently manage work allocation and prioritize tasks, offering flexibility in choosing the execution environment for optimal workflow performance and scalability.

Orchestrating ML Workflow

In this section, we will create, test, deploy, and execute a simple machine learning machine learning workflow using Prefect. We will also set up notifications so that we are notified on our Discord server about the status of the flow runs.

Setting Up Prefect

Install the Python package using PIP

pip install -U prefect

If you want to use the latest version of Prefect, you can get the code directly from GitHub.

pip install -U git+https://github.com/PrefectHQ/prefect

Create a Prefect Flow

We will preprocess the Bank Churn dataset from Kaggle, then use scikit-learn to train and evaluate the Random Forest Classifiers model.

The code we'll use is a modified version of the code from: Streamline Your Machine Learning Workflow with Scikit-learn Pipelines. Have a look at that tutorial if you’re interested in learning more about how the model was trained. Our machine learning Prefect Flow consists of seven tasks:

  1. load_data: for loading and processing the CSV file using pandas
  2. preprocessing: preparing the data for training, filling missing values, encoding, and scaling.
  3. data_split: splitting the data into testing and training sets
  4. train_model: selecting the top features and training the model
  5. get_prediction: generating the prediction using a testing set
  6. evaluate_model: calculator the accuracy and f1 score
  7. save_model: saving the model weights using skops

We have now assembled all the necessary tasks into a @flow function named ml_workflow. Additionally, we have included extra arguments in the decorator to enable detailed logging.

The ml_workflow function takes the data file location as input and then connects various tasks in a chain. We'll be able to better visualize this workflow when we deploy and execute it on the Prefect server.

You can also track our machine learning experiments using MLflow. Take the Introduction to MLflow course to learn about MLflow tracking, projects, models, and model registry.

import pandas as pd
import skops.io as sio
from prefect import flow, task
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, OrdinalEncoder


@task
def load_data(filename: str):
    bank_df = pd.read_csv(filename, index_col="id", nrows=1000)
    bank_df = bank_df.drop(["CustomerId", "Surname"], axis=1)
    bank_df = bank_df.sample(frac=1)
    return bank_df


@task
def preprocessing(bank_df: pd.DataFrame):
    cat_col = [1, 2]
    num_col = [0, 3, 4, 5, 6, 7, 8, 9]

    # Filling missing categorical values
    cat_impute = SimpleImputer(strategy="most_frequent")
    bank_df.iloc[:, cat_col] = cat_impute.fit_transform(bank_df.iloc[:, cat_col])

    # Filling missing numerical values
    num_impute = SimpleImputer(strategy="median")
    bank_df.iloc[:, num_col] = num_impute.fit_transform(bank_df.iloc[:, num_col])

    # Encode categorical features as an integer array.
    cat_encode = OrdinalEncoder()
    bank_df.iloc[:, cat_col] = cat_encode.fit_transform(bank_df.iloc[:, cat_col])

    # Scaling numerical values.
    scaler = MinMaxScaler()
    bank_df.iloc[:, num_col] = scaler.fit_transform(bank_df.iloc[:, num_col])
    return bank_df


@task
def data_split(bank_df: pd.DataFrame):
    # Splitting data into training and testing sets
    X = bank_df.drop(["Exited"], axis=1)
    y = bank_df.Exited

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=125
    )
    return X_train, X_test, y_train, y_test
    # Identify numerical and categorical columns


@task
def train_model(X_train, X_test, y_train):
    # Selecting the best features
    KBest = SelectKBest(chi2, k="all")
    X_train = KBest.fit_transform(X_train, y_train)
    X_test = KBest.transform(X_test)

    # Train the model
    model = LogisticRegression(max_iter=1000, random_state=125)
    model.fit(X_train, y_train)

    return model


@task
def get_prediction(X_test, model: LogisticRegression):
    return model.predict(X_test)


@task
def evaluate_model(y_test, prediction: pd.DataFrame):
    accuracy = accuracy_score(y_test, prediction)
    f1 = f1_score(y_test, prediction, average="macro")

    print("Accuracy:", str(round(accuracy, 2) * 100) + "%", "F1:", round(f1, 2))


@task
def save_model(model: LogisticRegression):
    sio.dump(model, "bank_model.skops")


@flow(log_prints=True)
def ml_workflow(filename: str = "train.csv"):
    data = load_data(filename)
    prep_data = preprocessing(data)
    X_train, X_test, y_train, y_test = data_split(prep_data)
    model = train_model(X_train, X_test, y_train)
    predictions = get_prediction(X_test, model)
    evaluate_model(y_test, predictions)
    save_model(model)


if __name__ == "__main__":
    ml_workflow()

To test our Prefect Flow, run the Python file in the terminal.

$ python main.py

The flow has been successfully completed. It has displayed the model metrics and saved the model weights.

Running ML workflow using Prefect

If you are new to machine learning and want to learn how to preprocess your data, train your model, optimize it, evaluate it, and save your model, then you should consider taking the Machine Learning Scientist with Python career track. This course will teach you everything you need to know from the very beginning.

Deploying the Flow

Building and running a machine learning pipeline manually is a simple task that can be accomplished even with scikit-learn pipelines.

However, there are multiple benefits to using Prefect. These include:

  • the ability to monitor the pipeline,
  • schedule it using CRON,
  • automatically retry in case of failure,
  • enable logging and observability,
  • receive notifications,
  • and create automated workflows that can run without human intervention.

Now, we will learn to automate our workflow by deploying it to the local Prefect server.

First, we must build the “Deployment” by providing the file and flow function names. We are also adding the tag “dev” to our Deployment.

Note: tags in Prefect can be used to categorize and organize flows, tasks, and deployments in the Prefect ecosystem.

$ prefect deployment build main.py:ml_workflow -n 'ml_workflow_bank_churn' -a --tag dev

Deployment has been successfully created. Now, it gives us instructions on how to execute the flow using the Prefect Agent.

Found flow 'ml-workflow'
Deployment YAML created at 
'C:\Repository\GitHub\ML-Workflow-Orchestration-With-Prefect\ml_workflow-deployment.yaml'.
Deployment storage None does not have upload capabilities; no files uploaded.  Pass --skip-upload to 
suppress this warning.
Deployment 'ml-workflow/ml_workflow_bank_churn' successfully created with id 
'ce30d1c7-f454-4064-870c-429a8039c194'.

To execute flow runs from this deployment, start an agent that pulls work from the
'default-agent-pool' work pool:
$ prefect agent start -p 'default-agent-pool'

Running the Deployment Locally

Prefect agents are responsible for monitoring work pools to identify available flow runs and deploying them to the execution environment.

A single work pool can have multiple agents, with each agent having a unique ID to prevent overlap and ensure seamless scaling.

We will now initialize the Prefect agents in a new terminal with the default work pool name.

$ prefect agent start -p 'default-agent-pool'

Running Prefect Agent

Go to a new terminal and run the deployment by providing it with “flow_function/deployment name.” You can also just use the deployment name to execute the flow.

$ prefect deployment run 'ml-workflow/ml_workflow_bank_churn'

It has created a flow run for the deployment that will check for workers and start running the workflow.

Creating flow run for deployment 'ml-workflow/ml_workflow_bank_churn'...
Created flow run 'cryptic-potoo'.
└── UUID: 77833ee3-0f7e-42f9-8002-0a562c7d3ed4
└── Parameters: {}
└── Scheduled start time: 2024-03-14 21:30:40 PKT (now)
└── URL: <no dashboard available>

Switch to the terminal where we initiated the agent to view logs of our flow run. The Prefect agent has successfully executed all tasks in the flow.

This is a simple example. We can also schedule or automate the workflow by providing additional arguments to our Deployment build.

Prefect UI

Up until now, we have been observing our workflow in terminals. Now, let's launch a Prefect dashboard where we can see all the flow runs, deployments, work pool, and notifications.

Start the Perfect server in a new terminal and click on the local link provided to access the dashboard.

$ prefect server start

Initiating the Prefect server

To view all the previous run flows, click on the "Flow" option in the left panel. You may notice that some flows were not executed due to a lack of work pools, while others were executed without the "dev" tag during the experimentation phase.

Prefect Dashboard: Flows Runs

Clicking on the Flows option displays all available flows. Multiple flows can exist within a single deployment.

Prefect Dashboard: Flows

In the Deployment section, we can view our current Deployment along with its activity and tags. In this section, we can easily run the flow using either the quick run option or schedule the run using the user interface.

Prefect Dashboard: Deployments

In the Work Pools, you will see our default Prefect Agent. You can even set up a cloud work pool too by using the Prefect CLI.

Prefect Dashboard: Workpools

Adding Discord Notification

Let's begin with the interesting part.

Nobody wants to monitor their workflow around the clock. Instead, we want to receive notifications about the workflow status of our preferred messaging application. This way, we can resolve any issues quickly.

To achieve this, we will create a Discord webhook. Using the webhook ID and token, we will set up Prefect notification. It will send all the logging information to the Discord server.

To create a webhook, go to your Discord server settings, click "Integrations," and select "View Webhooks." Then, click on "New Webhook."

Prefect Dashboard: Integrations

Change the display image, name, and channel. Then, copy the Webhook URL.

Creating Discord Webhook

The webhook URL comprises two parts, namely, the Webhook Id and the Webhook token: https://discord.com/api/webhooks/<webhook_id>/<webhook_token>. To create a Prefect notification, we need to manually separate these two parts from the link.

To create a Discord notification on the Prefect UI, go to the "Notifications" option and click the "+" button on the top. Then, select the desired run states, tags, Discord webhook, and notification type.

Next, provide the Webhook ID and Webhook token and click on the blue "Create" button.

Creating Prefect Notification

We need to execute the deployment to see notifications on our Discord server's main channel.

$ prefect deployment run 'ml-workflow/ml_workflow_bank_churn'

Amazing. Using the same method, we can set up various messaging services such as Microsoft Teams, Slack, SMS, and email, and we can even create our own custom webhook.

Discord Prefect Bot showing all the logs

Take the Designing Machine Learning Workflows in Python course to learn how to build pipelines that stand the test of time.

Running the Deployment on the Cloud

After deploying our machine learning flow locally, it's time to deploy it on the Prefect Cloud. This is a much easier process than local deployments, as you don't have to start any servers or agents or execute the runs using the CLI tool. Everything is managed byPrefect.

Pushing the Code to GitHub Repository

Before deploying the current flow, it's important to note that it will throw an error as it attempts to access files and data from a local directory that isn't accessible in the cloud. To make these files available to Prefect Cloud, we must create and push our code to a public GitHub repository.

This can be done easily through the GitHub website or by using the VSCode Git extension to create the remote repository and push the code using the user interface.

VSCode for the Prefect Workspace.

Here is the project repository that we are going to use for our cloud deployments: kingabzpro/ML-Workflow-Orchestration-With-Prefect

Setting Up Prefect Cloud

Create your Prefect Cloud account by going to https://app.prefect.cloud/auth/login and then log in using the Prefect CLI tool.

$ prefect cloud login

Simply choose "Log in with a web browser" and follow the steps to authenticate the local client with the Prefect Cloud.

Logging into Prefect Cloud

We will create a Prefect-managed work pool to fully utilize the infrastructure of Prefect Cloud instead of using our locally run agent.

$ prefect work-pool create DC-work-pool --type prefect:managed

For a free account, Prefect provides 10 free computer hours per month that you can use to experiment and play around.

Deployment of Python Code

We will now programmatically deploy our flow to the Prefect Cloud.

Create a new file called deployment.py.

Set up arguments for Prefect flow function:

  • source: link to the GitHub repository that contains the flow file.
  • entrypoint: file name and flow function name.
  • work_pool_name: provide it with the Perfect managed work pool name.
  • tags: for categorization.
  • job_variables: for installing necessary Python packages to run the code. If you are using a hybrid work pool like our Perfect agent, you won't need to provide job_variables.
from prefect import flow

if __name__ == "__main__":
    flow.from_source(
        source="https://github.com/kingabzpro/ML-Workflow-Orchestration-With-Prefect.git",
        entrypoint="main.py:ml_workflow",
    ).deploy(
        name="first-prefect-deployment",
        work_pool_name="DC-work-pool",
        tags=["dev"],
        job_variables={"pip_packages": ["pandas", "skops", "scikit-learn"]},
    )

Run the Python code.

$ python deployment.py

The flow is deployed on Prefect Cloud. It can be executed via CLI or the Prefect Cloud dashboard.

Prefect Deployment to the Cloud.

Run the Flow on the Cloud

To run the flow, we have to go to the website: https://app.prefect.cloud/. After that, go to the “Deployments” section and click on the vertical three-dotted button on your deployment, and select the “Quick run” option.

Prefect Cloud Dashboard: Deployments

To view how our run is performing, go to the “Flow Run” section and click on the most recent run to view the workflow graph.

The flow diagram presents an overview of the interconnection between the tasks and the time taken by each process.

Prefect Cloud Dashboard: Flow Runs diagram

For detailed logs, scroll down and view all the tasks performed with the time stamp.

Prefect Cloud Dashboard: Flow run Logs

If we go to the “Work Pools” section, we can clearly see that we have used 0.1 hours from the 10-hour limit for free users.

Prefect Cloud Dashboard: Work pool compute used

Take a look at the Machine Learning Workflow guide, which includes an infographic to help you understand the tasks involved in project setup, data preparation, modeling, and deployment.

Final Thoughts

Setting up a single machine-learning workflow was easy. You might need to run multiple machine learning and data workflows in real-world scenarios simultaneously. This is where Prefect stands out. It offers scalability and flexibility in terms of choosing whether to run it on a local or cloud environment for Prefect servers or work pools. It is a fantastic tool for scheduling, automating, and managing workflows.

In this tutorial, we have covered the main features and functionalities of the Prefect ecosystem. We also learned how to deploy a machine learning flow locally and on the cloud. To continue your MLOps journey, the next step is to learn how to build CI/CD for machine learning workflows by following "A Beginner's Guide to CI/CD for Machine Learning."

Prefect is just one tool in the MLOps field that is used for workflow orchestration. To learn about other important tools for data versioning, feature stores, model testing, model deployment and serving, model monitoring, runtime engines, and end-to-end MLOPs tools, read the article 25 Top MLOps Tools You Need to Know in 2024.


Photo of Abid Ali Awan
Author
Abid Ali Awan

I am a certified data scientist who enjoys building machine learning applications and writing blogs on data science. I am currently focusing on content creation, editing, and working with large language models.

Topics

Continue Your Machine Learning Journey Today!

Course

Designing Machine Learning Workflows in Python

4 hr
10.1K
Learn to build pipelines that stand the test of time.
See DetailsRight Arrow
Start Course
See MoreRight Arrow
Related

Python Absolute Value: A Quick Tutorial

Learn how to use Python's abs function to get a number's magnitude, ignoring its sign. This guide explains finding absolute values for both real and imaginary numbers, highlighting common errors.
Amberle McKee's photo

Amberle McKee

How to Check if a File Exists in Python

Learn how to check if a file exists in Python in this simple tutorial
Adel Nehme's photo

Adel Nehme

Writing Custom Context Managers in Python

Learn the advanced aspects of resource management in Python by mastering how to write custom context managers.
Bex Tuychiev's photo

Bex Tuychiev

How to Convert a List to a String in Python

Learn how to convert a list to a string in Python in this quick tutorial.
Adel Nehme's photo

Adel Nehme

A Comprehensive Tutorial on Optical Character Recognition (OCR) in Python With Pytesseract

Master the fundamentals of optical character recognition in OCR with PyTesseract and OpenCV.
Bex Tuychiev's photo

Bex Tuychiev

11 min

An Introduction to Vector Databases For Machine Learning: A Hands-On Guide With Examples

Explore vector databases in ML with our guide. Learn to implement vector embeddings and practical applications.
Gary Alway's photo

Gary Alway

8 min

See MoreSee More