[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

PR/6343: Adding support for MustFollow


A contributor opened a PR[1] to add support for a PTransform that forces one PTransform to be executed before another by using side input readiness as a way to defer execution.

They have provided this example usage:
# Ensure that output dir is created before attempting to write output files.
output_dir_ready = pipeline | beam.Create([output_path]) | CreateOutputDir()
output_pcoll | MustFollow(output_dir_ready) | WriteOutput()

The example the PR author provided works since they are working in the global window with no triggers that have early firings and they are only processing a single element so `CreateOutputDir()` is invoked only once. The unused side input access forces all runners to wait till `CreateOutputDir()` is invoked before `WriteOutput()`.

Now imagine a user wanted to use `MustFollow` but incorrectly setup the trigger for the side input and it has an early firing (such as an after count 1 trigger) and has multiple directories they want to create. The side input would become ready before **all** of `CreateOutputDir()` was invoked. This would mean that `MustFollow` would be able to access the side input and hence allow `WriteOutput()` to happen. The example above is still a bounded pipeline and a runner could choose to execute it as I described.

The contract for side inputs is that the side input PCollection must have at least one firing based upon the upstream windowing strategy for the "requested" window or the runner must be sure that the no such window could ever be produced before it can be accessed by a consumer.

My concern with the PR is two fold:
1) I'm not sure if this works for all runners in both bounded and unbounded pipelines and could use insights from others:

I believe if the user is using a trigger which only fires once and that they guarantee that the main input window mapping to side input window mapping is 1:1 then they are likely to get the expected behavior, WriteOutput() always will happen once CreateOutputDir() for a given window has finished.

2) If the solution works, the documentation around the limitations of how the PTransform works needs a lot more detail and potentially "error" checking of the windowing strategy.

1: https://github.com/apache/beam/pull/6343