OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Wait.on() - "Do this, then that" transform


Nice!

Will user@ mostly enjoy the benefits through enhanced IOs? If you think it has more broad applicability in end-user pipelines, I guess the time to announce this to user@ is as part of the release notes? Do we have a suitable format/venue for email-length blurbs like this one (too long for a one-liners, too short and in-the-weeds for a blog post)? I didn't just forward it because I wanted to let you & others consider.

Kenn

On Mon, May 14, 2018 at 4:58 PM Tyler Akidau <takidau@xxxxxxxxxx> wrote:
Nice! I like the clean integration with streaming.

-Tyler

On Mon, May 14, 2018 at 2:48 PM Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
Hi folks,

Wanted to give a heads up about the existence of a commonly requested feature and its first successful production usage.

The feature is the Wait.on() transform [1] , and the first successful production usage is in Spanner [2] .

The Wait.on() transform allows you to "do this, then that" - in the sense that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the PCollection "signal" is "done" in the same window (i.e. when no more elements can arrive into the same window of "signal"). The PCollection "signal" is typically a collection of results of some operation - so Wait.on(signal) allows you to wait until that operation is done. It transparently works correctly in streaming pipelines too.

This may sound a little convoluted, so the example from documentation should help. 

PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

This is indeed what Spanner folks have done, and AFAIK they intend this for importing multiple dependent database tables - e.g. first import a parent table; when it's done, import the child table - all within one pipeline. You can see example code in the tests [3].

Please note that this kind of stuff requires support from the IO connector - IO.write() has to return a result that can be waited on. The code of SpannerIO is a great example; another example is FileIO.write().

People have expressed wishes for similar support in Bigtable and BigQuery connectors but it's not there yet. It would be really cool if somebody added it to these connectors or others (I think there was a recent thread discussing how to add it to BigQueryIO).