We believe empowering engineers drives innovation.

No-Code Airflow DAGs

By David Maxson
May 9, 2022

Apache Airflow is a phenomenal tool for building data flows. It’s reliable, well-maintained, well-documented, and has plugins for just about everything. It’s easy to extend, has pre-built deployment patterns, and is readily available in a variety of cloud environments.

However, it’s also an aging tool, and has a few quirks that hearken to an earlier time in its development. Some issues include rigid DAG-centric dataflows, confusing differences between logical runtime and wall-clock execution time, and significant limitations in the size of individual DAGs.

Several tools have arisen to address these features (among which Prefect is a personal favorite). However, there’s something to be said for building on the rock-solid foundation that is Airflow, and at Rearc Data, we also have a substantial investment into making Airflow an efficient platform for our data flows. So what can we do within that environment to streamline the process of building maintainable dataflows?

One pattern we’ve started using is data-driven DAG definitions using operator factories. This is not necessarily an alternative to the traditional way of defining DAGs, nor necessarily exclusive of using Airflow’s new Task API. While this new pattern doesn’t fundamentally alter the limitations of Airflow, it does dramatically reduce the burden of developing new dataflows. Let’s dig in using an example to motivate the discussion.

Step 1: Functions to Wrap Operator Groups

As a motivating example, let’s say we want to print “hello world” in Docker container (I’m going to use AWS managed containers with ECS). Fortunately, there’s a pre-built operator that we can use:

hello_world = EcsOperator(
    task_id="hello_world",
    cluster="existing_cluster_name",
    task_definition="existing_task_definition_name",
    launch_type="FARGATE",
    aws_conn_id="aws_ecs",
    overrides={
        "containerOverrides": [
            {
                "name": "hello-world-container",
                "command": ["echo", "hello", "world"],
            },
        ],
    },
    network_configuration={
        "awsvpcConfiguration": {
            "securityGroups": [os.environ.get("SECURITY_GROUP_ID", "sg-123abc")],
            "subnets": [os.environ.get("SUBNET_ID", "subnet-123456ab")],
        },
    },
    awslogs_group="/ecs/hello-world",
    awslogs_stream_prefix="prefix_b/hello-world-container",
)

That’s a lot of code to say “Run echo hello world in a container on ECS”. Write that a few times, and very quickly you’ll decide you’re fed up with all the boilerplate. What do we do when we some repetitive code? Make it a function, of course:

def make_ecs_operator(task_id, task_definition, command):
    return EcsOperator(
        ...
    )

So now we can get our operator just by calling that function:

hello_world_operator = make_ecs_operator(
    task_id="hello_world",
    task_definition="existing_task_definition_name",
    command="echo hello world".split(),
)

Phew, that’s so much better. This might even tide you over for a while.

A few months later, someone comes along and asks you to run a command in a different docker image. They want to use:

hello_world_operator = make_ecs_operator(
    task_id="hello_world",
    docker_image="alpine:latest",  # <-- Docker Image, not Task Definition
    command="echo hello world".split(),
)

To avoid a detour into discussion ECS, it’s sufficient to summarize this change as turning our problem into a two-step task:

  1. Create a task definition for the custom docker image
  2. Run a container in ECS based on that task definition (what we were doing before)

Ok, so if we want to update make_ecs_operator to be able to run arbitrary Docker Images, we have two options. As one option, we could write a custom Airflow operator that pre-builds the task definition before running the ECS job:

12..CRruenatEeCStatsakskdefCiunsittoimonEcsOperator

Wrapping everything into a single operator, however, reduces visibility, fails to re-use standard operators (including future upgrades to those operators), and increases our technical debt. There are cases where custom operators are the best solution, but this isn’t one of them.

The alternative is to write a pattern of operators where the first one creates the task definition and the second is a standard ECS operator that runs that task definition, something like:

createtaskdefinitionrunecstask

Ok, that’s not so bad. We can whip up the first as a Python Operator, and use the regular ECS Operator for the second node. We get code that looks something like the following:

def create_task_definition(docker_image):
    # ...
    return task_definition_arn

def make_ecs_operator(task_id, docker_image, command):
    create_task_definition_operator = PythonOperator(
        # ...
        task_id=f"{task_id}_create",
        python_callable=create_task_definition,
        op_args=(docker_image,),
        # ...
    )
    run_ecs_task_operator = EcsOperator(
        # ...
        task_id=f"{task_id}_run",
        task_definition=XComArg(create_task_definition_operator, "return_value")
    )
    create_task_definition_operator >> run_ecs_task_operator
    return ... # which one?

The last improvement we’ll make here is to wrap these operators into a Task Group. Visually, this will make these two operators collapse into one node in the Airflow UI, since logically we’re doing just one thing here; functionally, it’ll also allow us to sequence this pair of operators as if they were a single operator, so our caller code doesn’t have to know about the guts of how we use ECS.

def make_ecs_operator(task_id, docker_image, command):
    with TaskGroup(group_id=task_id) as group:
        create_task_definition_operator = PythonOperator(
            # ...
            task_id="create",  # the task_id prefix is auto-added
        )
        run_ecs_task_operator = EcsOperator(
            # ...
            task_id="run",
        )
        create_task_definition_operator >> run_ecs_task_operator

    return group
create<task_id>run

Step 2: Using Factory Classes

The functional pattern above is fine, but can start to get convoluted when producing large or complicated patterns. You might end up with lots of function factories being called within other function factories, and passing arguments all over the place. At this point, taking an object-oriented approach begins to simplify things.

Let’s shift the example. Let’s say the ECS job we ran generated some data file, and now we want to publish that data to a Redshift SQL cluster. We’ll need to manage some administrivia along the way, and then do a variety of things with each table. Let’s say we end up wanting a pattern like the following:

VVVaaallliiidddaaattteeefffiiillleee123ReHsaunmdeleclausbtuenrchofadminEinssturrievisac.h.e.maexistsRRReeecccrrreeeaaattteeeTTTaaabbbllleeeLoaLLLdoooaaatdddheDDDaaadtttaaaata

Let’s just assume we’re going to need to do all this more than once, and we’re going to want another abstraction for this pattern. We could try to define this factory as a function:

def make_db_upload_operators(
    task_id,
    cluster_name,
    schema,
    tables_and_files
):
    ...

but clearly this is going to become a complicated function, or will depend on a lot of helper functions. You know what we call a pile of related functions that depend on the same data? A class!

class Table:
    # Tracks the table name, its column structure, and the file it's sourced from 
    def make_validate_operator(self):
        ...
    
    def make_recreate_operator(self):
        ...
    
    def make_load_data_operator(self):
        ...

class DbUploadFactory:
    def __init__(self, task_id, cluster_name, schema, tables: List[Table]):
        ...

    def make_resume_cluster_operator(self):
        ...

    def make_ensure_schema_exists_operator(self):
        ...

    def make_operator(self):
        with TaskGroup(group_id=self.task_id) as group_that_contains_everything:
            # Call the various methods of this class and the Tables
            # Hook those operators up correctly
            ...

        return group_that_contains_everything

This is much more maintainable than a pile of loosely related functions. If we later want to restructure how we interact with our database, we need only find the object responsible for that part of our DAG and update it accordingly. All of our operators are neatly wrapped up in little groups, so nothing outside the factory (in general) needs to know how the internals are structured.

This pattern decouples the complexity of defining the DAG from the complexity of the resulting dataflow that Airflow sees, and thus also decouples the top-level DAG definition from the challenges of maintaining complicated DAG logic. The following simple DAG definition might result in a data flow with advanced, precisely architected logic:

with DAG(...) as dag:
    generate_data_operator = ...

    upload_operator = DbUploadFactory(
        ...,
        tables=[
            Table(...),
            ...
        ],
    )

    generate_data_operator >> upload_operator.make_operator()

It is possible to eliminate the need to explicitly call .make_operator but I leave this as an exercise for the reader.

Step 3: Data-Defined Classes

This pattern, for the most part, results in classes that contain some data and then convert that data into an Airflow operator (or group of operators). These classes aren’t operators in their own right, and probably (in general) shouldn’t support mutation. They simply convert metadata into a DAG sub-structure. Seems like a perfect candidate for a dataclass:

@dataclass
class Table:
    name: str
    columns: List[str]
    source_file: str
    ...

@dataclass
class DbUploadFactory:
    task_id: str
    cluster_name: str
    schema: str
    tables: List[Table]
    ...

At first glance, this doesn’t seem much simpler than the original class. Why bother with dataclasses? Well, some possible advantages include:

This last bullet is something we have found particularly useful, so I’ll dig into that a bit further.

Step 4: No-Code DAGs

Dataclasses are so tight and well-defined that they’re pretty easy to directly serialize and deserialize into common markup formats, like JSON or YAML. Since each class is merely a mapping from metadata into a part of an Airflow DAG, it’s not hard to build objects that represent complete, functional DAGs.

I’ll note at this point that any additional classes shown below are just arbitrary abstractions; which particular abstractions are helpful for you will depend on the kinds of data flows you typically write. These particular abstractions are unlikely to be useful for you, but the pattern this follows can probably be of use to you.

At Rearc Data, we have a lot of data flows that look vaguely like:

oDfosctluuosfttfsomVaofluiitldpeausttePutbooluivtsaphruitdosautsa

Great, let’s define a top-level DAG object that looks like that:

@dataclass
class TypicalDag:
    jobs: JobCollection
    validators: ValidationCollection
    publishers: PublishCollection

    def make_dag(self) -> DAG:
        with DAG(...) as dag:
            jobs_op = self.jobs.make_operator()
            validators_op = self.validators.make_operator()
            publishers_op = self.publishers.make_operator()

            jobs_op >> validators_op >> publishers_op

        return dag

For the sake of brevity, I’ll leave the definition of the various Collection classes to your imagination.

Note that the above is general-purpose and isn’t specific to any single data flow. We might have a bunch of data flows that all “Run some code, generate some files, validate them, then publish them”, just maybe from different data sources or with differences in how we process the data. We can then write an adapter DAG file that loads metadata from a whole pile of metadata files and generates a DAG for each one. The following is a simple example of such an adapter:

# dags/load_yml_files.py

from pathlib import Path
from airflow import DAG
import yaml
import dacite
from our_own_code import TypicalDag

dag_dir = Path(__file__).parent

# For each YAML file in a particular directory...
for yaml_file_path in dag_dir.glob('typical_dags/**.yml'):
    with open(yaml_file_path) as f:
        dag_metadata = yaml.safe_load(f)
        
    # ... generate a DAG from that metadata
    dag_metadata_obj = dacite.from_dict(TypicalDag, dag_metadata)
    dag = dag_metadata_obj.make_dag()
    
    # See https://www.astronomer.io/guides/dynamically-generating-dags/
    dag_name = yaml_file_path.with_suffix('').name
    globals()[dag_name] = dag

That’s it! Well, almost. Let’s make an example YAML file (I’m just going to make stuff up for what goes into each Collection object):

# dags/typical_dags/sample_dag.yml

jobs:
  job_definitions:
    - name: download_data
      docker_image: my_docker_image
      command: "python download_data.py"
    - name: clean_data
      depends_on: [download_data]
      docker_image: my_docker_image
      command: "python clean_data.py"
validators:
  files:
    - path: path/to/output/file.csv
      validation_suite: file_expectations.json
publishers:
  sftp:
    host: sftp://my.file.server
    connection_id: sftp_default
    files:
      - path/to/output/file.csv
  s3: 
    bucket: my_public_bucket
    prefix: some/prefix/
    files:
      - path/to/output/file.csv
downloaddataJOBScleandataVfAiLlIeD.AcTsEvPUBsLfsIt3SpH

Cool, whatever all that means. But here’s the interesting part: that YAML file represents a no-code DAG. Instead of building each DAG as custom Python code, however simple that code might be, we now have an entire DAG without any DAG-specific code. We’ve entirely decoupled the process of writing new dataflows from the task of maintaining useful, functional abstractions. We’ve also accomplished that without writing any custom Airflow Operators or losing any visibility into our dataflow from the Airflow UI.

Critically, we’ve done all this without any loss of functionality: nothing about this approach prevents you from writing DAGs in Python, or even writing DAGs that bypass your abstractions altogether. We need very few additional dependencies, no plugins, and no alteration to our underlying data platform.

We can upgrade every single DAG by updating our code in one location. Our DAG definition files store all information that differentiates our DAGs in one location, while everything they share in common is elsewhere; that makes these data files extremely descriptive and compact, and makes our code easier to maintain.

And that’s not even to mention the utility of having your DAGs defined as data instead of as code. Lots of maintenance and self-analysis chores become simple when your dataflows are defined, themselves, as data rather than as arbitrary Python code.

Conclusion

In this article, we’ve discussed how to build maintainable DAGs, starting with simple factory functions, then with factory classes, then by simplifying those classes into Dataclasses, then finally by eliminating the need for custom DAG code altogether. The result is a pattern for supporting low-code or no-code DAGs in Airflow, making each individual DAG nothing more than its essential metadata.

At Rearc Data, we’ve leveraged this pattern to dramatically reduce development time and maintenance overhead for dozens of DAGs, allowing us to focus on the challenging parts of data engineering rather than re-writing boilerplate DAG code each time. We’ve been able to upgrade our DAGs in bulk, eliminating huge swaths of technical debt from the challenge of maintaining a large number of unrelated data flows. This has become a critical part of how we generate 440+ data products available on AWS Data Exchange here. If you are more interested in getting data than in engineering your own data flows and none of the products match exactly what you need, reach out to us at data@rearc.io, and we would love to work with you.