OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Wait.on() - "Do this, then that" transform


Cool !!!

I guess we can leverage this in IOs with SDF.

Thanks
Regards
JB

On 14/05/2018 23:48, Eugene Kirpichov wrote:
Hi folks,

Wanted to give a heads up about the existence of a commonly requested feature and its first successful production usage.

The feature is the Wait.on() transform [1] , and the first successful production usage is in Spanner [2] .

The Wait.on() transform allows you to "do this, then that" - in the sense that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the PCollection "signal" is "done" in the same window (i.e. when no more elements can arrive into the same window of "signal"). The PCollection "signal" is typically a collection of results of some operation - so Wait.on(signal) allows you to wait until that operation is done. It transparently works correctly in streaming pipelines too.

This may sound a little convoluted, so the example from documentation should help.

PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
      // the respective window of firstWriteResults closes.
      .apply(ParDo.of(...write to second database...));

This is indeed what Spanner folks have done, and AFAIK they intend this for importing multiple dependent database tables - e.g. first import a parent table; when it's done, import the child table - all within one pipeline. You can see example code in the tests [3].

Please note that this kind of stuff requires support from the IO connector - IO.write() has to return a result that can be waited on. The code of SpannerIO is a great example; another example is FileIO.write().

People have expressed wishes for similar support in Bigtable and BigQuery connectors but it's not there yet. It would be really cool if somebody added it to these connectors or others (I think there was a recent thread discussing how to add it to BigQueryIO).

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
[2] https://github.com/apache/beam/pull/4264
[3] https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158