[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

RE: Lazily load input for Airflow operators

Sorry for the late response.

Using a derived class is the solution we actually ended up implementing. But, it would certainly be nice to have a more generic solution that does not put the burden on end-users.

Using a mixin class seems like a neat idea to do this. And, from what I have seen it is a widely used idiom in Airflow. However, I suppose it would require changing every individual operator. In a way, this is positive as it lets maintainers/contributors of each individual operator to decide whether they want to adopt the lazy load mechanism. This would let users set or override some of the parameters just before executing the operator. On the negative side, it might mean that the lazy load mechanism will not be available for many operators.

Actually, it turns out that `BaseOperator` already has a `pre_execute` method. Following, there is an example on how to use it (the example assumes the operator contains a `json` attributes with all the parameters, but it also works for operators having the different parameters as individual attributes):

class LazyLoadFooOperator(FooOperator):
    def __init__(self, json_generator, **kwargs):
        super(LazyLoadFooOperator, self).__init__(**kwargs)
        self._json_generator = json_generator

    def pre_execute(self, context):
        self.json = self._json_generator(context)

def json_generator(params):
    def generate(context):
        job_description = { ... }  # create job description based on the result of a SQL query, HTTP request, etc.
        return job_description

    return generate

task = LazyLoadFooOperator (

Using kwargs to override the arguments is also a good idea. It looks like it could be implemented in `BaseOperator`, and the different individual operators would not need to be modified. Also, end users would not need to create a derived operator. So, it might be the best approach. The following code shows a (very) simplified implementation:

class BaseOperator:
    def __init__(self, *args, **kwargs):
        self.kwargs_overrides_callable = kwargs.get('kwargs_overrides_callable')

    def run(self):
        for k, v in self.kwargs_overrides_callable().items():
            if hasattr(self, k):
                setattr(self, k, v)


    def execute(self):
        raise NotImplementedError()

What do you think? Is there any approach that seems to be superior from all perspectives?


From: Maxime Beauchemin <maximebeauchemin@xxxxxxxxx>
Sent: Tuesday, August 28, 2018 3:26 AM
To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
Cc: andrewchen@xxxxxxxxxxxxxx
Subject: Re: Lazily load input for Airflow operators

This is reasonable, it could be nice to have a generic way to replace
operators kwargs with callables. In the meantime you can try this hack
deriving an operator inline with your DAG definition. In this hack, the
callable receives the operator's context object which is nice, it provides
a handle on a lot of things defined here:
as what's in the jinja template context).

class DerivedFooOperator(FooOperator):

def _bar(self, context):
return datetime.datetime() # only gets evaluated at run time

def execute(self, context):
self.bar<http://self.bar> = self._bar(context)
super(DerivedFooOperator, self).execute(context)

If `bar` is a required arg, you'll have to pass a dummy static value on
initialization, but it will get overwritten at runtime.

You can imagine having a more generic class mixin to do this. Or maybe
BaseOperator could have a `kwarg_overrides_callables` that would be a dict
of string: callable that would execute somewhere in between `__init__` and
`execute` and do the magic. Or how about a `pre_execute(context): pass`
BaseOperator method as a nice hook to allow for this kind of stuff without
having to call `super`.


On Mon, Aug 27, 2018 at 2:29 PM Victor Jimenez <vjimenez@xxxxxxxxxxxxxx<mailto:vjimenez@xxxxxxxxxxxxxx>>

> TL;DR Is there any recommended way to lazily load input for Airflow
> operators?
> I could not found a way to do this. While I faced this limitation while
> using the Databricks operator, it seems other operators might potentially
> lack such a functionality. Please, keep reading for more details.
> ---
> When instantiating a DatabricksSubmitRunOperator (
> https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/databricks_operator.py<https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/databricks_operator.py>)
> users need to pass the description of the job that will later be executed
> on Databricks.
> The job description is only needed at execution time (when the hook is
> called). However, the json parameter must already have the full job
> description when constructing the operator. This may present a problem if
> computing the job description needs to execute expensive operations (e.g.,
> querying a database). The expensive operation will be invoked every single
> time the DAG is reprocessed (which may happen quite frequently).
> It would be good to have an equivalent mechanism to the python_callable
> parameter in the PythonOperator. In this way, users could pass a function
> that would generate the job description only when the operator is actually
> executed. I discussed this with Andrew Chen (from Databricks), and he
> agrees it would be an interesting feature to add.
> Does this sound reasonable? Is this use case supported in some way that I
> am unaware of?
> You can find the issue I created here:
> https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-2964<https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-2964>