andrewslotnick.com


Introducing Azflow

Azflow is a simple tool inspired by Airflow which makes it easier to write workflows for Azkaban. Workflows are collections of sequenced tasks that are used by data engineers to extract, transform, and load data.

Azkaban offers many standard features of a workflow management tool: GUI, scheduling, retries, alerting, logging, etc. However, its method of programming workflows is a little bit cumbersome. Each task is defined in its own .job file containing a list of other tasks it depends on. Creating these manually works well for small workflows, but becomes unwieldy when working with hundreds or thousands of tasks.

Azflow is one of several tools which try to address this issue, including LinkedIn Gradle DSL for Apache Hadoop, AzkabanCLI, and azkaban-rb. While these tools create their own abstractions, Azflow is based on the model created by Airflow. Workflows are defined in Python code as Task objects which form a DAG (directed acyclic graph) object. For simple workflows this is only sightly more compact than Azkaban's native format, but this model is much more powerful when applied to complex jobs.

For example, if you have many similar tasks with slightly different parameters you can maintain these as a list and loop through it to generate as many tasks as are needed. This minimizes the number of source code files to maintain and follows the principle of D.R.Y.

Example_dag_loop.py demonstrates this concept:

In [1]:
from azflow.DAG import DAG
from azflow.BashOperator import BashOperator

loop_dag = DAG(dag_id='loop_dag')

task_1 = BashOperator(task_id='task_1', dag=loop_dag, 
                      bash_command='echo "begin"')

task_3 = BashOperator(task_id='task_3', dag=loop_dag, 
                      bash_command='echo "clean up"')

tasks_to_loop = ['do', 'all', 'these', 'in', 'no','particular', 'order']
for t in tasks_to_loop:
    task_2a = BashOperator(task_id=t+'_part_1', dag=loop_dag, 
                       bash_command='echo "start {}"'.format(t))
    task_2a.set_upstream(task_1)

    task_2b = BashOperator(task_id=t+'_part_2', dag=loop_dag, 
                       bash_command='echo "finish {}"'.format(t))
    task_2b.set_upstream(task_2a)
    task_3.set_upstream(task_2b)

Running the above through Azflow with the command

python -m azflow.render --dag_file example/example_dag_loop.py --output_folder example/example_flow_loop/

generates the 17 necessary .job files to represent this flow in Azkaban:

loop graph screeenshot

For more information see the project on Github. Some other interesting features of Azflow are using depth-first search to detect cycles and breadth-first search to print out DAGs when rendering.

Feedback and pull-requests are encouraged!