OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Basic modeling question


Alexis, do you mean you would have done this using an ExternalTaskSensor?
Or is there some other way to depend on a range of tasks?

On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <alexis.rolland@xxxxxxxxxxx>
wrote:

> Not sure if it’s optimal compared to what James proposes, but I would have
> simply made the weekly and monthly rollup tasks as downstream tasks of the
> daily log ingestion tasks they depend on. Then I would have used trigger
> rules ‘all_done’ to ensure those rollup tasks start when their parent tasks
> are completed.
>
> https://airflow.incubator.apache.org/concepts.html#trigger-rules
>
> (daily log ingestion) > (daily rollup)
> (daily log ingestion) > (weekly rollup + TriggerRule.all_done)
> (daily log ingestion) > (monthly rollup + TriggerRule.all_done)
>
> Cheers
>
> Alexis
>
> On 9 Aug 2018, at 02:57, James Meickle <jmeickle@xxxxxxxxxxxxxx.
> INVALID<mailto:jmeickle@xxxxxxxxxxxxxx.INVALID>> wrote:
>
> It sounds like you want something like this?
>
> root_operator = DummyOperator()
>
> def offset_operator(i):
>  my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
> }}}};".format(offset=i)
>  sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
> query=my_sql_query)
>  return sql_operator
>
> offset_operators = list(offset_operator(i) for i in range(7))
> root_operator >> offset_operators
>
> # Daily just waits on today, no offset
> do_daily_work = DummyOperator()
> offset_operators[0] >> do_daily_work
>
> # Weekly waits on today AND the six prior offsets
> do_weekly_work = DummyOperator()
> offset_operators >> do_weekly_work
>
> IOW, every day you wait for that day's data to be available, and then run
> the daily job; you also wait for the previous six days data to be
> available, and when it is, run the weekly job.
>
> n.b. - if you do it this way you will have up to 7 tasks polling the "same"
> data point, which is slightly wasteful. But it's also not much code or
> mental effort to write it this way.
>
> On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid<
> mailto:gsilk@xxxxxxxxxxx.invalid>>
> wrote:
>
> My main concern is how to express the fact that the weekly rollup depends
> on the previous 7 days worth of data, and ensure that it does not run until
> the tasks that generate those 7 days of data have run, assuming that tasks
> can run non-sequentially.
>
> It's easy enough when you have the following situation:
>
> (daily log ingestion) <-- (daily rollup)
>
> In any given DAG run, you are guaranteed to have the data needed for (daily
> rollup), because the dependency that generated its data just ran.
>
> But I'm not sure how best to model it when you have all of the following:
>
> (daily log ingestion) <-- (daily rollup)
> (daily log ingestion) <-- (weekly rollup)
> (daily log ingestion) <-- (monthly rollup)
>
>
>
> On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmiston@xxxxxxxxx
> <mailto:tedmiston@xxxxxxxxx>>
> wrote:
>
> Gabriel -
>
> Ah, I missed your earlier comment about weekly/monthly rollups also being
> on a daily cadence.  So is your concern e.g., more about reducing the
> redundant process of the weekly rollup tasks for the days of that range
> that already processed in the previous DAG run(s)?  Or mainly about the
> dependency of not executing the first weekly at all until the first 7
> daily
> rollups worth of data have built up?
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@xxxxxxxxxxxxxx<
> mailto:jmeickle@xxxxxxxxxxxxxx>.
> invalid> wrote:
>
> If you want to run (daily, rolling weekly, rolling monthly) backups on
> a
> daily basis, and they're mostly the same but have some additional
> dependencies, you can write a DAG factory method, which you call three
> times. Certain nodes only get added to the longer-than-daily backups.
>
> On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid<
> mailto:gsilk@xxxxxxxxxxx.invalid>
>
> wrote:
>
> Thanks Andy and Taylor for the suggestions --
>
> I see how that would work for the case where you want a weekly rollup
> that
> runs on a weekly cadence.
>
> But what about a rolling weekly or monthly rollup that runs each day?
>
> On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> andy.cooper@xxxxxxxxxxxxx<mailto:andy.cooper@xxxxxxxxxxxxx>>
> wrote:
>
> To expand on Taylor's idea
>
> I recently wrote a ScheduleBlackoutSensor that would allow you to
> prevent a
> task from running if it meets the criteria provided. It accepts an
> array
> of
> args for any number of the criteria so you could leverage this
> sensor
> to
> provide "blackout" runs for a range of days of the week.
>
> https://github.com/apache/incubator-airflow/pull/3702/files
>
> For example,
>
> task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
>
> Would prevent a task from running Monday - Saturday, allowing it to
> run
> on
> Sunday.
>
> You could leverage this Sensor as you would any other sensor or you
> could
> invert the logic so that you would only need to specify
>
> task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
>
> To "whitelist" a task to run on Sundays.
>
>
> Let me know if you have any questions
>
> On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
> tedmiston@xxxxxxxxx<mailto:tedmiston@xxxxxxxxx>>
> wrote:
>
> Gabriel -
>
> One approach I've seen for a similar use case is to have multiple
> related
> rollups in one DAG that runs daily, then have the non-daily tasks
> skip
> most
> of the time (e.g., weekly only actually executes on Sundays and
> is
> parameterized to look at the last 7 days).
>
> You could implement that not running part a few ways, but one
> idea
> is a
> sensor in front of the weekly rollup task.  Imagine a
> SundaySensor
> like
> return
> execution_date.weekday() == 6.  One thing to keep in mind here is
> dependence on the DAG's cron schedule being more granular than
> the
> tasks.
>
> I think this could generalize into a DayOfWeekSensor /
> DayOfMonthSensor
> that would be nice to have.
>
> Of course this does mean some scheduler inefficiency on the skip
> days,
> but
> as long as those skips are fast and the overall number of tasks
> is
> small, I
> can accept that.
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> <gsilk@xxxxxxxxxxx.invalid<mailto:gsilk@xxxxxxxxxxx.invalid>
>
> wrote:
>
> Hello Airflow community,
>
> I have a basic question about how best to model a common data
> pipeline
> pattern here at Dropbox.
>
> At Dropbox, all of our logs are ingested and written into Hive
> in
> hourly
> and/or daily rollups. On top of this data we build many weekly
> and
> monthly
> rollups, which typically run on a daily cadence and compute
> results
> over
> a
> rolling window.
>
> If we have a metric X, it seems natural to put the daily,
> weekly,
> and
> monthly rollups for metric X all in the same DAG.
>
> However, the different rollups have different dependency
> structures.
> The
> daily job only depends on a single day partition, whereas the
> weekly
> job
> depends on 7, the monthly on 28.
>
> In Airflow, it seems the two paradigms for modeling
> dependencies
> are:
> 1) Depend on a *single run of a task* within the same DAG
> 2) Depend on *multiple runs of task* by using an
> ExternalTaskSensor
>
> I'm not sure how I could possibly model this scenario using
> approach
> #1,
> and I'm not sure approach #2 is the most elegant or performant
> way
> to
> model
> this scenario.
>
> Any thoughts or suggestions?
>
>
>
>
>
>
>
>