Thanks! I (PR author) agree with all that.
On the unbounded triggering issue, I can see 2 reasonable desired behaviors:
1) The collection to follow is bounded and the intent is to wait for
the entire collection to be processed.
2) The collection to follow has windows that in some flexible sense
align with the windows of the collection that is supposed to be waiting,
and the intent is for the waiting to apply within window.
If either or both of these behaviors can be supported by something like
the proposed mechanism, then I think that's a reasonable thing to have
provided it's well documented.
The other high-level issue I see is whether this should be A) a free
PTransform on collections, or B) something like a call on ParDo. In the
PR it's (A), but I can imagine for future more native implementation on
some runners it could be more natural as (B).
On Fri, Sep 7, 2018 at 11:58 AM Lukasz Cwik <lcwik@xxxxxxxxxx
<mailto:lcwik@xxxxxxxxxx>> wrote:
A contributor opened a PR[1] to add support for a PTransform that
forces one PTransform to be executed before another by using side
input readiness as a way to defer execution.
They have provided this example usage:
# Ensure that output dir is created before attempting to write
output files.
output_dir_ready = pipeline | beam.Create([output_path]) |
CreateOutputDir()
output_pcoll | MustFollow(output_dir_ready) | WriteOutput()
The example the PR author provided works since they are working in
the global window with no triggers that have early firings and they
are only processing a single element so `CreateOutputDir()` is
invoked only once. The unused side input access forces all runners
to wait till `CreateOutputDir()` is invoked before `WriteOutput()`.
Now imagine a user wanted to use `MustFollow` but incorrectly setup
the trigger for the side input and it has an early firing (such as
an after count 1 trigger) and has multiple directories they want to
create. The side input would become ready before **all** of
`CreateOutputDir()` was invoked. This would mean that `MustFollow`
would be able to access the side input and hence allow
`WriteOutput()` to happen. The example above is still a bounded
pipeline and a runner could choose to execute it as I described.
The contract for side inputs is that the side input PCollection must
have at least one firing based upon the upstream windowing strategy
for the "requested" window or the runner must be sure that the no
such window could ever be produced before it can be accessed by a
consumer.
My concern with the PR is two fold:
1) I'm not sure if this works for all runners in both bounded and
unbounded pipelines and could use insights from others:
I believe if the user is using a trigger which only fires once and
that they guarantee that the main input window mapping to side input
window mapping is 1:1 then they are likely to get the expected
behavior, WriteOutput() always will happen once CreateOutputDir()
for a given window has finished.
2) If the solution works, the documentation around the limitations
of how the PTransform works needs a lot more detail and potentially
"error" checking of the windowing strategy.
1: https://github.com/apache/beam/pull/6343