[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: PR/6343: Adding support for MustFollow


This is a great idea but I share Lukasz' doubts about this being a universal solution for awaiting some action in a pipeline.

I wonder, wouldn't it work to not pass in a PCollection, but instead wrap a DoFn which internally ensures the correct triggering behavior? All runners which correctly materialize the side input with the first window triggering should support it correctly.

Apart from that, couldn't you simply use the @Setup method of a DoFn in your example?

-Max

On 07.09.18 23:12, Peter Li wrote:
Thanks!  I (PR author) agree with all that.

On the unbounded triggering issue, I can see 2 reasonable desired behaviors:
  1) The collection to follow is bounded and the intent is to wait for the entire collection to be processed.   2) The collection to follow has windows that in some flexible sense align with the windows of the collection that is supposed to be waiting, and the intent is for the waiting to apply within window.

If either or both of these behaviors can be supported by something like the proposed mechanism, then I think that's a reasonable thing to have provided it's well documented.

The other high-level issue I see is whether this should be A) a free PTransform on collections, or B) something like a call on ParDo.  In the PR it's (A), but I can imagine for future more native implementation on some runners it could be more natural as (B).

On Fri, Sep 7, 2018 at 11:58 AM Lukasz Cwik <lcwik@xxxxxxxxxx <mailto:lcwik@xxxxxxxxxx>> wrote:

    A contributor opened a PR[1] to add support for a PTransform that
    forces one PTransform to be executed before another by using side
    input readiness as a way to defer execution.

    They have provided this example usage:
    # Ensure that output dir is created before attempting to write
    output files.
    output_dir_ready = pipeline | beam.Create([output_path]) |
    CreateOutputDir()
    output_pcoll | MustFollow(output_dir_ready) | WriteOutput()

    The example the PR author provided works since they are working in
    the global window with no triggers that have early firings and they
    are only processing a single element so `CreateOutputDir()` is
    invoked only once. The unused side input access forces all runners
    to wait till `CreateOutputDir()` is invoked before `WriteOutput()`.

    Now imagine a user wanted to use `MustFollow` but incorrectly setup
    the trigger for the side input and it has an early firing (such as
    an after count 1 trigger) and has multiple directories they want to
    create. The side input would become ready before **all** of
    `CreateOutputDir()` was invoked. This would mean that `MustFollow`
    would be able to access the side input and hence allow
    `WriteOutput()` to happen. The example above is still a bounded
    pipeline and a runner could choose to execute it as I described.

    The contract for side inputs is that the side input PCollection must
    have at least one firing based upon the upstream windowing strategy
    for the "requested" window or the runner must be sure that the no
    such window could ever be produced before it can be accessed by a
    consumer.

    My concern with the PR is two fold:
    1) I'm not sure if this works for all runners in both bounded and
    unbounded pipelines and could use insights from others:

    I believe if the user is using a trigger which only fires once and
    that they guarantee that the main input window mapping to side input
    window mapping is 1:1 then they are likely to get the expected
    behavior, WriteOutput() always will happen once CreateOutputDir()
    for a given window has finished.

    2) If the solution works, the documentation around the limitations
    of how the PTransform works needs a lot more detail and potentially
    "error" checking of the windowing strategy.

    1: https://github.com/apache/beam/pull/6343