Re: hooks & operators improvement proposal
seems that I was a bit unclear
The DAG ETL spans across multiple tasks. and usually looks like kickoff >>
source_to_staging >> staging_to_warehouse >> warehouse_post_process.
I'm not proposing changes to operators they are gr8 , what i am proposing
is to borrow the same concept to the smaller building blocks.
I argue that the task anatomy (in ETL flows) is usually comprised of
'mini' flows that usually looks like read source > serialize > dump (example
, example 2
. you can see that sometimes its written in the operator and sometimes in
the hook , the code is not shared and handles same cases each time.
On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jpayne@xxxxxxxxxxx> wrote:
> So, in your scenario, the ETL pipeline happens inside the single
> If so, would it not make sense for the pipeline to span multiple tasks and
> provide a standard set of functions/decorators/etc for defining the
> input/output to/from each task? That way you would leverage the ability to
> rerun the DAG from a particular step of the ETL pipeline in case of a
> recoverable failure. Or am I misunderstanding...
> Get Outlook for Android<https://aka.ms/ghei36>
> From: Daniel Cohen <daniel.cohen@xxxxxxxxxxxxxx>
> Sent: Wednesday, September 26, 2018 12:27:29 PM
> To: dev@xxxxxxxxxxxxxxxxxx
> Subject: hooks & operators improvement proposal
> Some thoughts about operators / hooks:
> Operators are composable, typical ETL flow looks like `kickoff >>
> source_to_staging >> staging_to_warehouse >> warehouse_post_process` where
> tasks use shared state (like s3) or naming conventions to continue work
> where upstream task left off.
> hooks on the other hand are not composable and a lot of ETL logic is
> written ad hoc in the operator each time.
> i propose a lightweight, in process, ETL framework that allows
> - hook composition
> - shared general utilities (compression / file management / serialization)
> - simplifies operator building
> how it looks from the operator's side
> def execute(self, context):
> # initialize hooks
> self.s3 = S3Hook...
> self.mysql = MySqlHook...
> # setup operator state
> query = 'select * from somewhere'
> # declare your ETL process
> self.mysql.yield_query(query) >> \
> pipes.clear_keys(keys=self.scrubbed_columns) >> \
> pipes.ndjson_dumps() >> \
> pipes.batch(size=1024) >> \
> pipes.gzip() >> \
> pipes.tempfile() >> \
> how it looks from the hook's side
> @pipes.producer # decorate
> def yield_query(self, query):
> for row in cursor:
> yield row
> *pipes is a module with a set of operations that are generic and
> potentially reused between hooks / operators
> the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl packsges)
> and implementation based on on generators and decorators.
> we (cloudinary.com) are planning to open source it , is it something that
> would be interesting to integrate into airflow ,or as a 3rd party ? or not
> at all ? any thoughts suggestions ?
> thanks ,
> daniel cohen