hooks & operators improvement proposal
Some thoughts about operators / hooks:
Operators are composable, typical ETL flow looks like `kickoff >>
source_to_staging >> staging_to_warehouse >> warehouse_post_process` where
tasks use shared state (like s3) or naming conventions to continue work
where upstream task left off.
hooks on the other hand are not composable and a lot of ETL logic is
written ad hoc in the operator each time.
i propose a lightweight, in process, ETL framework that allows
- hook composition
- shared general utilities (compression / file management / serialization)
- simplifies operator building
how it looks from the operator's side
def execute(self, context):
# initialize hooks
self.s3 = S3Hook...
self.mysql = MySqlHook...
# setup operator state
query = 'select * from somewhere'
# declare your ETL process
self.mysql.yield_query(query) >> \
pipes.clear_keys(keys=self.scrubbed_columns) >> \
pipes.ndjson_dumps() >> \
pipes.batch(size=1024) >> \
pipes.gzip() >> \
pipes.tempfile() >> \
how it looks from the hook's side
@pipes.producer # decorate
def yield_query(self, query):
for row in cursor:
*pipes is a module with a set of operations that are generic and
potentially reused between hooks / operators
the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl packsges)
and implementation based on on generators and decorators.
we (cloudinary.com) are planning to open source it , is it something that
would be interesting to integrate into airflow ,or as a 3rd party ? or not
at all ? any thoughts suggestions ?