OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Lineage


Ok, I’ve put the most interesting pieces into a small Gist:

https://gist.github.com/ms32035/f85cfbaff132f0d0ec8c309558330b7d

The solution is based on SQLAlchemy’s declarative layer, which I found to
be closest to what you can find in a metadata repository in commercial ETL
tools. Luckily, most of the parameters there are optional, and it works
equally well without a database connection, so it can be used to store
information about other objects, like files.

The starting point is MetaData class (
http://docs.sqlalchemy.org/en/latest/core/metadata.html), which has been
extended to store Airflow connection ID. I treat it as an equivalent of a
single database, file location or anything else that stores a collection of
input/output datasets.

Next comes an extension of Table. On top of standard SQLAlchemy objects
like columns, datatypes, constraints (which don’t need to exist in the
database, and are processed by a dedicated operators), it stores additional
attributes such as paths, encoding etc. The trick here was to plug that
into the base object, which has a bit complex initialization, hence the
__new__ method is overwritten. Since almost everything, including column
definitions, is optional, the class can also represent files or anything
else that has a structure. This is an equivalent of what you defined as a
DataSet. Because a table is linked to metadata, connection ids are not
needed any more in operators.

There are two other classes, which are not related directly to your PR, but
leverage the framework and are commonly used in ETL/DQ:

·         HighWaterMark – using SQLAlchemy columns, automatically stores
HWM values for incremental loading in Airflow variables

·         SQLCheck – allows defining complex SQL/Python data verification
rules in metadata, which in the end come down to comparing 2 expressions

The metadata repository is simply a set of python files, with some
directory structure, stored and versioned together with dags.

You are right that not all operators are sql based, so not all of them were
rewritten. For those rewritten, I used the following logic/pattern:

·         2 parameters (input, output)

·         Output was usually a table object

·         Input could be (tried in order):

o   extraction SQL query saved as table attribute, stored in metadata

o   Table object – sql was automatically generated from metadata

o   SQL query as a string

It does not cover all sorts of complex transformations, but with tables
defined in metadata it’s much easier to track what’s used where.
Furthermore, the metadata definitions were equally useful for flat files,
for example to store header columns.

I didn’t implement any metadata exchange between the operators, so I can’t
comment on serialization.

Just a few final thoughts on what works good and what not so good:

+ all metadata in one place

+ support for basic data quality

- defining metadata (columns) without a wizard like in commercial tools is
time consuming

- the table class has become a bit cluttered due to a lot of various SQL
query types implemented, and will probably need some restructuring (split)
at some time



Best

Marcin



On Sun, May 6, 2018 at 10:15 AM Bolke de Bruin <bdbruin@xxxxxxxxx> wrote:

> Forgot to answer your question for S3 it could look like:
>
> s3_file = File("s3a://bucket/key")
> Inlets = {"datasets:" [s3_file,]}
>
> Obviously if you do something with the s3 file outside of Airflow you need
> to track lineage yourself somehow.
>
> B.
>
> Sent from my iPhone
>
> > On 6 May 2018, at 11:05, Bolke de Bruin <bdbruin@xxxxxxxxx> wrote:
> >
> > Hi Gerardo,
> >
> > Any lineage tracking system is dependent on how much data you can give
> it. So if you do transfers outside of the 'view' such a system has then
> lineage information is gone. Airflow can help in this area by tracking its
> internal lineage and providing that to those lineage systems.
> >
> > Apache Atlas is agnostic and can receive lineage info by rest API (used
> in my implementation) and Kafk topic. It does also come with a lot of
> connectors out of the box that tie into the hadoop ecosystem and make your
> live easier there. The Airflow Atlas connector supplies Atlas with
> information that it doesn't know about yet closing the loop further.
> >
> > Also you can write your own connector and put it on the Airflow class
> path and use that one.
> >
> > Bolke
> >
> > Sent from my iPhone
> >
> >> On 6 May 2018, at 09:13, Gerardo Curiel <gerardo@xxxxxxxx> wrote:
> >>
> >> Hi Bolke,
> >>
> >> Data lineage support sounds very interesting.
> >>
> >> I'm not very familiar with Atlas but first sight seems like a tool
> specific
> >> to the Hadoop ecosystem. How would this look like if the files (inlets
> or
> >> outlets) were stored on s3?.
> >>
> >> An example of a service that manages a similar use case is AWS Glue[1],
> >> which creates a hive metastore based on the schema and other metadata it
> >> can get from different sources (amongst them, s3 files).
> >>
> >>
> >>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <bdbruin@xxxxxxxxx>
> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> I have made a first implementation that allows tracking of lineage in
> >>> Airflow and integration with Apache Atlas. It was inspired by
> Jeremiah’s
> >>> work in the past on Data Flow pipelines, but I think I kept it a
> little bit
> >>> simpler.
> >>>
> >>> Operators now have two new parameters called “inlets” and “outlets”.
> These
> >>> can be filled with objects derived from “DataSet”, like “File” and
> >>> “HadoopFile”. Parameters are jinja2 templated, which
> >>> means they receive the context of the task when it is running and get
> >>> rendered. So you can get definitions like this:
> >>>
> >>> f_final = File(name="/tmp/final")
> >>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
> >>>   inlets={"auto": True},
> >>>   outlets={"datasets": [f_final,]})
> >>>
> >>> f_in = File(name="/tmp/whole_directory/")
> >>> outlets = []
> >>> for file in FILE_CATEGORIES:
> >>>   f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
> >>>   outlets.append(f_out)
> >>> run_this = BashOperator(
> >>>   task_id='run_after_loop', bash_command='echo 1', dag=dag,
> >>>   inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
> >>>   outlets={"datasets": outlets}
> >>>   )
> >>> run_this.set_downstream(run_this_last)
> >>>
> >>> So I am trying to keep to boilerplate work down for developers.
> Operators
> >>> can also extend inlets and outlets automatically. This will probably
> be a
> >>> bit harder for the BashOperator without some special magic, but an
> update
> >>> to the DruidOperator can be relatively quite straightforward.
> >>>
> >>> In the future Operators can take advantage of the inlet/outlet
> definitions
> >>> as they are also made available as part of the context for templating
> (as
> >>> “inlets” and “outlets”).
> >>>
> >>> I’m looking forward to your comments!
> >>>
> >>> https://github.com/apache/incubator-airflow/pull/3321
> >>>
> >>> Bolke.
> >>
> >>
> >>
> >> [1] https://aws.amazon.com/glue/
> >>
> >> Cheers,
> >>
> >> --
> >> Gerardo Curiel // https://gerar.do
>