This tutorial is adapted from Web Age course Workflow Management with Apache Airflow.
1.1 A Traditional ETL Approach
- Normally, an ETL job would involve the following steps:
- Create a script automating such activities as downloading a dataset from the Internet, transforming it, and inserting the resulting data into a database
- Schedule a job to run the script daily, every other hour, etc., using some existing scheduling systems, like cron
- In most cases, existing scheduling systems won’t allow you (while Airflow will)
- Automatically re-try any processing failures (missing data or data in the wrong format, the back-end database is down, etc.) either at the job’s level or job’s task level
- Scale on demand
- React to a data arrival event (Airflow does it with sensors)
- … and more …
1.2 Apache Airflow Defined
1.3 Airflow Core Components
- Scheduler
- Sends tasks defined in the scheduled DAG for execution
- Executor
- There are several kinds of Executors, specific for the processing domain; the default one is called SequentialExecutor
- Web server (Airflow’s Web UI)
- A Flask app with role-based access control (RBAC)
- Metadata database
- The default DB engine is SQLite; in production: MySQL, PostgresDB, etc.
1.4 The Component Collaboration Diagram
1.5 Workflow Building Blocks and Concepts
- DAG
- Defines the workflow tasks and their order of execution/dependencies
- Specifies error/failure processing and re-try procedures.
- Operator (Worker)
- Defines a task’s code to be executed </li Maintains state in environment variables.
- Task
- Specific job to be done by an Operator (Worker).
- Connection</
- External system access configuration details (access points, passwords, keys, and other credentials).
- Hook
- Abstracts external system interfaces.
- XCom (Cross-Communication facility)
- Pub/Sub-like messaging model for inter-task communication, where one operator acts as an XCom sender (message publisher), and other(s) are designated to receive the message by the sender’s id and message key.
1.6 Airflow CLI
- It is provided through the airflow tool using a wide range of commands for managing workflows.
- Syntax:
airflow <command> <arguments>, e.g. airflow clear dag_1 -s 2020-9-14 -e 2020-9-17
Notes:
An abbreviated list of airflow CLI commands: checkdb Check if the database can be reached. clear Clear a set of task instance, as if they never ran config Show current application configuration connections List/Add/Delete connections create_user Create an account for the Web UI (FAB-based) dag_state Get the status of a dag run delete_dag Delete all DB records related to the specified DAG delete_user Delete an account for the Web UI flower Start a Celery Flower info Show information about current Airflow and environment initdb Initialize the metadata database kerberos Start a kerberos ticket renewer list_dags List all the DAGs list_tasks List the tasks within a DAG list_users List accounts for the Web UI next_execution Get the next execution datetime of a DAG. pause Pause a DAG pool CRUD operations on pools resetdb Burn down and rebuild the metadata database run Run a single task instance scheduler Start a scheduler instance serve_logs Serve logs generate by worker shell Runs a shell to access the database show_dag Displays DAG's tasks with their dependencies task_state Get the status of a task instance test Test a task instance. This will run a task without checking for dependencies or recording its state in the database trigger_dag Trigger a DAG run unpause Resume a paused DAG variables CRUD operations on variables version Show the version webserver Start a Airflow webserver instance worker Start a Celery worker node
1.7 Main Configuration File
- $AIRFLOW_HOME/airflow.cfg controls various aspects of airflow runtime and integrations.
- A few file extracts:
# The folder where your airflow pipelines live, most likely a # subfolder in a code repository. This path must be absolute. dags_folder = /home/wasadmin/airflow/dags # The folder where airflow should store its log files # This path must be absolute base_log_folder = /home/wasadmin/airflow/logs ... # Log filename format log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log log_processor_filename_template = {{ filename }}.log ... # Default timezone in case supplied date times are naive # can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) default_timezone = utc # The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor executor = SequentialExecutor
1.8 Extending Airflow
- Airflow offers a wide range of additional software packages to extend the existing functionality and system integration capabilities.
- The list of certified packages you can install alongside your core packages can be found here:
Notes:
Extra packages that can be installed alongside the core Airflow packages:
aws azure celery cloudant crypto devel devel_hadoop druid gcp github_enterprise google_auth hashicorp hdfs hive jdbc kerberos kubernetes ldap mssql mysql oracle password postgres presto qds rabbitmq redis samba slack ssh vertica
See the details of the installation at https://airflow.apache.org/docs/stable/installation.html#extra-packages
1.9 Jinja Templates
- Airflow DAGs can use the Jinja templating engine (jinja2) to declare a dictionary of variables, e.g.:
dict(foo='bar')
- It can be later referenced through the Jinja template as follows:
{{ foo }} # gets evaluated to 'bar'
log_filename_template = \ {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
Notes:
According to https://github.com/pallets/jinja and https://jinja.palletsprojects.com/en/master/,
“Jinja is a fast, expressive, and extensible templating engine. Special placeholders in the template allow writing code similar to Python syntax. Then the template is passed data to render the final document.”
Jinja is the default template engine used in Flask, a micro (with no dependencies) web framework written in Python.
1.10 Variables and Macros
- Airflow automatically provisions a set of variables and macros [https://airflow.apache.org/docs/stable/macros-ref] that are automatically injected in the DAG Run’s execution context and made available through Jinja templates using the double curly braces: {{ }}
-
- The variable ds gets evaluated to the execution date as YYYY-MM-DD string, which you can use as {{ ds }}
- The variable ts represents a timestamp in ISO format, e.g. 2020-08-30T00:00:00+00:00
- ts_nodash removes dashes and the time zone portion, e.g. 20200830T000000
- Note: You can also define system-wide variables using the Web UI
- Macros give you access to programmatic objects, e.g. macros.random.
1.11 Summary
- In this tutorial, we discussed the following topics related to Apache Airflow:
- Concepts
- Main components
- CLI
- Extending Airflow with sub-packages
- Jinja templates and variables