[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: PR/6343: Adding support for MustFollow

Thanks!  I (PR author) agree with all that.

On the unbounded triggering issue, I can see 2 reasonable desired behaviors:
  1) The collection to follow is bounded and the intent is to wait for the entire collection to be processed.
  2) The collection to follow has windows that in some flexible sense align with the windows of the collection that is supposed to be waiting, and the intent is for the waiting to apply within window.

If either or both of these behaviors can be supported by something like the proposed mechanism, then I think that's a reasonable thing to have provided it's well documented.

The other high-level issue I see is whether this should be A) a free PTransform on collections, or B) something like a call on ParDo.  In the PR it's (A), but I can imagine for future more native implementation on some runners it could be more natural as (B).

On Fri, Sep 7, 2018 at 11:58 AM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
A contributor opened a PR[1] to add support for a PTransform that forces one PTransform to be executed before another by using side input readiness as a way to defer execution.

They have provided this example usage:
# Ensure that output dir is created before attempting to write output files.
output_dir_ready = pipeline | beam.Create([output_path]) | CreateOutputDir()
output_pcoll | MustFollow(output_dir_ready) | WriteOutput()

The example the PR author provided works since they are working in the global window with no triggers that have early firings and they are only processing a single element so `CreateOutputDir()` is invoked only once. The unused side input access forces all runners to wait till `CreateOutputDir()` is invoked before `WriteOutput()`.

Now imagine a user wanted to use `MustFollow` but incorrectly setup the trigger for the side input and it has an early firing (such as an after count 1 trigger) and has multiple directories they want to create. The side input would become ready before **all** of `CreateOutputDir()` was invoked. This would mean that `MustFollow` would be able to access the side input and hence allow `WriteOutput()` to happen. The example above is still a bounded pipeline and a runner could choose to execute it as I described.

The contract for side inputs is that the side input PCollection must have at least one firing based upon the upstream windowing strategy for the "requested" window or the runner must be sure that the no such window could ever be produced before it can be accessed by a consumer.

My concern with the PR is two fold:
1) I'm not sure if this works for all runners in both bounded and unbounded pipelines and could use insights from others:

I believe if the user is using a trigger which only fires once and that they guarantee that the main input window mapping to side input window mapping is 1:1 then they are likely to get the expected behavior, WriteOutput() always will happen once CreateOutputDir() for a given window has finished.

2) If the solution works, the documentation around the limitations of how the PTransform works needs a lot more detail and potentially "error" checking of the windowing strategy.