Skip to content

Introduction to Airflow with Python

# Simple DAG definition 

from airflow.models import DAG
from datetime import datetime

default_arguments = {'owner': 'jdoe','email': 
                     '[email protected]', 
                     'start_date': datetime(2020, 1, 20)
                    }
etl_dag = DAG( 'etl_workflow', default_args=default_arguments )

Here are some basics command

To run a simple task

airflow run <dag_id> <task_id> <start_date>

To get commands description

airflow -h

To get sub-commands descriptions

airflow -h

To start Airflow Webserver instance for default port 8080

airflow webserver airflow webserver -p 9090 (for port 9090)

Operators

Airflow operators represent a single task in a workflow. This can be any type of task from running a command, sending an email, running a Python script, and so on. Typically Airflow operators run independently - meaning that all resources needed to complete the task are contained within the operator. Generally, Airflow operators do not share information between each other. This is to simplify workflows and allow Airflow to run the tasks in the most efficient manner. It is possible to share information between operators, but the details of how are beyond this course. Airflow contains many various operators to perform different tasks.

Bash operator

The BashOperator executes a given Bash command or script. This command can be pretty much anything Bash is capable of that would make sense in a given workflow. The BashOperator requires three arguments: the task id which is the name that shows up in the UI, the bash command (the raw command or script), and the dag it belongs to. Before using the BashOperator, it must be imported from airflow.operators.bash_operator

from airflow.operators.bash_operator import BashOperator

BashOperator(task_id='bash_example',    
             bash_command='echo "Example!"',    
             dag=ml_dag
            )

Tasks

Within Airflow, tasks are instantiated operators. It basically is a shortcut to refer to a given operator within a workflow. Tasks are usually assigned to a variable within Python code. Using a previous example, we assign the BashOperator to the variable example underscore task. Note that within the Airflow tools, this task is referred by its task id, not the variable name.

Task dependencies in Airflow define an order of task completion. While not required, task dependencies are usually present. If task dependencies are not defined, task execution is handled by Airflow itself with no guarantees of order. Task dependencies are referred to as upstream or downstream tasks. An upstream task means that it must complete prior to any downstream tasks. Since Airflow 1.8, task dependencies are defined using the bitshift operators. The upstream operator is two greater-than symbols(>>). The downstream operator is two less-than symbols(<<).

It's easy to get confused on when to use an upstream or downstream operator. The simplest analogy is that upstream means before and downstream means after. This means that any upstream tasks would need to complete prior to any downstream ones.

Let's look at a simple example involving two bash operators. We define our first task, and assign it to the variable task1. We then create our second task and assign it to the variable task2. Once each operator is defined and assigned to a variable, we can define the task order using the bitshift operators. In this case, we want to run task1 before task2. The most readable method for this is using the upstream operator, two greater-than symbols, as task1 upstream operator task2. Note that you could also define this in reverse using the downstream operator to accomplish the same thing. In this case, it'd be task2 two less-than symbols task1.

# Define the tasks
task1 = BashOperator(task_id='first_task', 
                     bash_command='echo 1',  
                     dag=example_dag)
task2 = BashOperator(task_id='second_task',                     
                     bash_command='echo 2',                     
                     dag=example_dag)
# Set first_task to run before second_task
task1 >> task2   # or task2 << task1
Multiple dependencies

Dependencies can be as complex as required to define the workflow to your needs. We can chain a dependency, in this case setting task1 upstream of task2 upstream of task3 upstream of task4.

task1>>task2>>task3>>task4

The Airflow graph view shows a dependency view indicating this order. You can also mix upstream and downstream bitshift operators in the same workflow. If we define task1 upstream of task2 then downstream of task3, we get a configuration different than what we might expect.

task1>>task2<<task3

This creates a DAG where first underscore task and third underscore task must finish prior to second underscore task. This means we could define the same dependency graph on two lines, in a possibly clearer form. task1 upstream of task2. task3 upstream of task2. Note that because we don't require it, either task1 or task3 could run first depending on Airflow's scheduling.