Re: hooks & operators improvement proposal
I see what your looking for and I think this is the purpose of XCom. We've
used xcom in some of our custom operators to get this type of
Though, we tend to avoid putting a lot of data into xcom, I believe
somewhere in the docs it talks about how that's an anti pattern. The
pattern was to lean on external systems for exchanging data.
On Wed, Sep 26, 2018 at 4:26 PM Jeff Payne <jpayne@xxxxxxxxxxx> wrote:
> Ah, OK. Thanks for the clarification.
> Get Outlook for Android<https://aka.ms/ghei36>
> From: Daniel Cohen <daniel.cohen@xxxxxxxxxxxxxx>
> Sent: Wednesday, September 26, 2018 1:15:16 PM
> To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
> Subject: Re: hooks & operators improvement proposal
> Hi Jeff,
> seems that I was a bit unclear
> The DAG ETL spans across multiple tasks. and usually looks like kickoff >>
> source_to_staging >> staging_to_warehouse >> warehouse_post_process.
> I'm not proposing changes to operators they are gr8 , what i am proposing
> is to borrow the same concept to the smaller building blocks.
> I argue that the task anatomy (in ETL flows) is usually comprised of
> 'mini' flows that usually looks like read source > serialize > dump
> , example 2
> . you can see that sometimes its written in the operator and sometimes in
> the hook , the code is not shared and handles same cases each time.
> On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jpayne@xxxxxxxxxxx> wrote:
> > So, in your scenario, the ETL pipeline happens inside the single
> > operator/task?
> > If so, would it not make sense for the pipeline to span multiple tasks
> > provide a standard set of functions/decorators/etc for defining the
> > input/output to/from each task? That way you would leverage the ability
> > rerun the DAG from a particular step of the ETL pipeline in case of a
> > recoverable failure. Or am I misunderstanding...
> > Get Outlook for Android<https://aka.ms/ghei36>
> > ________________________________
> > From: Daniel Cohen <daniel.cohen@xxxxxxxxxxxxxx>
> > Sent: Wednesday, September 26, 2018 12:27:29 PM
> > To: dev@xxxxxxxxxxxxxxxxxx
> > Subject: hooks & operators improvement proposal
> > Some thoughts about operators / hooks:
> > Operators are composable, typical ETL flow looks like `kickoff >>
> > source_to_staging >> staging_to_warehouse >> warehouse_post_process`
> > tasks use shared state (like s3) or naming conventions to continue work
> > where upstream task left off.
> > hooks on the other hand are not composable and a lot of ETL logic is
> > written ad hoc in the operator each time.
> > i propose a lightweight, in process, ETL framework that allows
> > - hook composition
> > - shared general utilities (compression / file management /
> > - simplifies operator building
> > how it looks from the operator's side
> > def execute(self, context):
> > # initialize hooks
> > self.s3 = S3Hook...
> > self.mysql = MySqlHook...
> > # setup operator state
> > query = 'select * from somewhere'
> > # declare your ETL process
> > self.mysql.yield_query(query) >> \
> > pipes.clear_keys(keys=self.scrubbed_columns) >> \
> > pipes.ndjson_dumps() >> \
> > pipes.batch(size=1024) >> \
> > pipes.gzip() >> \
> > pipes.tempfile() >> \
> > self.s3.file_writer(s3_key=self.s3_key,
> > bucket_name=self.s3_bucket,
> > replace=True)
> > how it looks from the hook's side
> > @pipes.producer # decorate
> > def yield_query(self, query):
> > cursor.execute(query)
> > for row in cursor:
> > yield row
> > *pipes is a module with a set of operations that are generic and
> > potentially reused between hooks / operators
> > the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl
> > and implementation based on on generators and decorators.
> > we (cloudinary.com) are planning to open source it , is it something
> > would be interesting to integrate into airflow ,or as a 3rd party ? or
> > at all ? any thoughts suggestions ?
> > thanks ,
> > d
> > --
> > daniel cohen
> > +972-(0)54-4799-147
> daniel cohen