OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Lineage


Hi All,

I have made a first implementation that allows tracking of lineage in Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s work in the past on Data Flow pipelines, but I think I kept it a little bit simpler. 

Operators now have two new parameters called “inlets” and “outlets”. These can be filled with objects derived from “DataSet”, like “File” and “HadoopFile”. Parameters are jinja2 templated, which
means they receive the context of the task when it is running and get rendered. So you can get definitions like this:

f_final = File(name="/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag, 
    inlets={"auto": True},
    outlets={"datasets": [f_final,]})

f_in = File(name="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
    outlets.append(f_out)
run_this = BashOperator(    
    task_id='run_after_loop', bash_command='echo 1', dag=dag,
    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
    outlets={"datasets": outlets}
    )
run_this.set_downstream(run_this_last)

So I am trying to keep to boilerplate work down for developers. Operators can also extend inlets and outlets automatically. This will probably be a bit harder for the BashOperator without some special magic, but an update to the DruidOperator can be relatively quite straightforward.

In the future Operators can take advantage of the inlet/outlet definitions as they are also made available as part of the context for templating (as “inlets” and “outlets”).

I’m looking forward to your comments!

https://github.com/apache/incubator-airflow/pull/3321

Bolke.