Built to Scale:  Running highly-concurrent ETL with Apache Airflow

November 7th, 2018

Apache Airflow has seemingly taken the data engineering world by storm. It was originally created and maintained by Airbnb, and has been part of the Apache Foundation for several years now. After heavily leveraging it for a couple years (over 2 million tasks) and seeing its full potential (but numerous drawbacks), I was tasked with streamlining the deployment and operation of the system.

The first step? Sit down and note everything I loved about Airflow.... and everything I hated.

The list went something like this.

Apache Airflow ❤️'s

Code-defined DAGs

It is incredibly easy to define, manage, and communicate complex ETL dependencies to stakeholders.

The UI and its operational functionality

The breakdown of dags, dag runs, tasks, task log output, etc, is very nice. The UI also includes features like Gantt charts, task duration visualizations, immediately-visible DAG definitions, and more.

System composition

As an engineer, the associated subsystems involved with running an Airflow instance are very clean. The system consists of a scheduler process, a worker process, a webserver process, a message broker, and a database backend. It's a real application.

Retry functionality

Since stuff happens and systems go down, ETL should almost always be wrapped in retry functionality. Airflow makes this simple, as its BaseOperator class (from which operators inherit) includes the ability to specify various retry strategies. Want to simply retry a few times on a specified delay? You can. Want to declare an exponential backoff? You can do that too.

ETL backoffs can be cleanly implemented with retrying and/or tenacity, but Airflow includes this functionality out of the box.

Airflow Contexts

template_context is a relatively-undocumented piece of Airflow magic, and provides rich metadata at the task instance level. It is highly useful for stamping load metadata into the contents of the data itself, or notifying when a particular job fails or goes into a retry loop.

Whatever you do, set provide_context: True when defining your DAGs.

"It's just Python"

Airflow is a (very) complex system, and the fact that "it's just Python" speeds up the process of coming up to speed with how the thing works. Digging around in the source code is often necessary, and made much easier by the fact that it's written in a readable language.

Apache Airflow ?'s

Limited deployment documentation

I say limited, but I mean very little. You will need to dig around to find systemd documentation and a few unit files, but these will need to be modified according to your host os.

Limited community information on good deployment strategies

After running Airflow for months, scheduler and worker downtime on each code deploy was something I came to consider unacceptable - especially as our data engineering team grew. Airflow was getting in the way of getting code out; every deployment meant that all dags had to be manually disabled, running task instances bled off, the system shut down, new code pushed, system turned back on, and then all dags re-activated... by hand (?).

Most of the "production" Airflow deployment articles I read included dropping the system into the background with nohup, docker-compose, or even running in a screen session. While these are interesting strategies, I wanted something I could run as a real service. I wanted Airflow to run as natively as possible, with real logging, real Restart=always, and completed managed by systemd.

The tightly-bound nature of "plugins" code

This one still puzzles me. In most cases, Airflow does a wonderful job loosely binding components of its architecture. The system's DAG-discovery capability is lovely. Copy a dag to the dag_folder of all nodes, and the system will pick it up according to the configured dag_dir_list_interval.

In other cases, it is very tightly bound. Just because the scheduling system is written in Python does not mean that your Python-based ETL code should be interwoven throughout global scope.

Does your JenkinsCI implementation (written in Java) depend on your Java application, in order to build and deploy said Java application? Do you need to redeploy Jenkins, whenever new code is pushed to your Java app?

I hope not.

Yet... this exact situation is all-too-easy to accomplish with Airflow.

Significant overhead when running multiple nodes

Many Airflow implementation posts I read only discussed running Airflow on a single node, with all components running in separate containers or separate nohup'd processes. This is great, but the system documentation indicates it is capable of much, much more. A LocalExecutor will work for a while, until you have hundreds of ETL tasks that must be run and stakeholders complaining that your system is down. Secondly, running all ETL on a single node is asking for trouble as it is a glaringly-obvious single point of failure.

Running jobs via a CeleryExecutor is the built-in way forward here, but several worker nodes must then be provisioned, identically-configured, and kept up to date on each code deploy. And this does not take into consideration the overhead of managing a (redundant, fault-tolerant) message broker.

It's a lot to keep track of!

ETL is bound to blocks of time

If you incorporate the various ts, execution_date, latest_date context values into your ETL code, I will guarantee that it bites you when processing data from different timezones or regions. Airflow timestamps are relative to the host, while your Salesforce instance could be configured to America/Los_Angeles and your mysql database with (tz-naive) datetime values could be configured to America/New_York. I repeat: this will bite you.

"It's just Python"

See "The tightly-bound nature of "plugins" code" above. The system being written in Python makes it very flexible. Often too flexible.

After thinking long and hard about how to make the system friction-free...

I came to the following requirements of a system refactor.

1. Infrastructure should be separated into functional components.

To minimize points of failure, Redis should not be running on the same instance as the scheduler process, the scheduler process should not be running on the same host as the worker process, etc. This infrastructure was to be deployed via AWS so I settled on ElastiCache Redis as message broker, RDS Mysql for the database backend, a (relatively small) EC2 instance for the webserver and scheduler, and separate (appropriately sized) EC2 instances for all worker pools

2. Infrastructure should be replicable across environments and each environment should be easy to scale.

In order to stamp out infrastructure and provision Airflow nodes, Terraform with remote S3 state was heavily leaned upon.

This makes it easy to terraform init and terraform apply for setting up alternative environments. It also makes it easy to change a variable corresponding to the number of airflow worker nodes from "2" to "3" and spin up/provision another worker node.

3. Airflow should run as a native service on the respective host machine.

Instead of nohup-ing individual processes or managing a docker-compose-based implementation, each Airflow process ("scheduler", "webserver", "flower", "worker") was set up with its own unit file, and enabled/started with systemd.

Task log output is configured via the base_log_folder configuration variable and handled accordingly.

4. Airflow jobs should be executed across a number of workers.

Apache Airflow ships with the ability to run a CeleryExecutor, even though it is not commonly discussed. It is the critical piece to distributing ETL tasks across a pool of workers. Provisioning and managing a broker adds overhead to the system, but is well worth the effort.

5. Airflow should not need to be restarted whenever new ETL code is deployed.

This is logically intuitive but relatively difficult to accomplish with Airflow. I chose to maintain clear separation between Airflow and task execution by running all tasks within docker containers. Airflow itself is set up as bare-bones as possible, and all task instances simply docker run img_name python3 some_entrypoint.py.

6. Fully-automated rolling deployments were necessary...

...without toggling a single switch or waiting for a single DAG to complete.

One of the biggest barriers to deploying new changes was the fact that we had to pause all DAGs, wait until all tasks were complete, and redeploy the system each time code was modified. This amounted to over 30 minutes for each deploy, for at least two engineers, and introduced large amounts of toil and human error.

The solution has worked very well and looks like:

  1. Any time code hits the master branch of our core ELT/ETL code, the repo is packaged into an analytics Docker container, which is then deployed to all Airflow worker nodes.
  2. A dags directory is scp'd to the respective dag_folder on all Airflow nodes. These changes are picked up by each node and represented via the UI.
  3. All dags use a BashOperator, which docker runs the most recent analytics container and calls the appropriate entrypoint.

Because all ETL runs via docker containers, preexisting processes do not need to be killed or monitored until completion. The changes are simply executed next time the process runs.

7. Insight into system state was necessary.

This primarily includes host-level metrics and system performance metrics, but is being continually built upon.

First and foremost, all machines are provisioned with node exporter by default and open up port 9100 as a scrape endpoint.

Secondly, (batch-based) ETL code leverages a push gateway for all metrics as these jobs are short-lived ephemeral processes.

Third, Flower runs as a service on the airflow scheduler node, so current Celery status is easily represented.

Performance thus far...

Like any initiative of reasonable size, this system refactor was measured by a number of critical metrics. I've used the following to represent my success in increasing reliability, increasing scalability scalability, and decreasing deployment overhead.

319,390 Airflow task executions have completed successfully after refactor, across a number of workers.

The Celery failure rate is 0.0005%.

1 line of code must be changed to add 1-100+ additional Airflow worker nodes.

0 deployments have required human interference, and 0 human errors have been introduced.

Deploying to 2x the infrastructure is 40x faster than it was previously (with fewer machines).

In conclusion

I've seen very good success when automating the deployment of multi-node Apache Airflow infrastructure, and the above guidelines were critical to the success of this effort. The next post will get into the setup details, so stay tuned.

Your cart