[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Cleanup resources on pipeline cancelation

Actually I think SDF is the right way to fix this. The SDF can set a timer at infinity (which will only fires when the pipeline shuts down). I believe that SDF support is being added to the portability layer now, so eventually all portable runners will support it, and maybe we can live with the status quo until then.

On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rmannibucau@xxxxxxxxx> wrote:
I agree Reuven. But leaking in a source doesnt give any guarantee regarding the execution since it will depends the runner and current API will not provide you that feature. Using a reference counting state can work better but would require a sdf migration (and will hit runner support issues :().

Le jeu. 2 août 2018 05:39, Reuven Lax <relax@xxxxxxxxxx> a écrit :
Hi Romain,

Andrew's example actually wouldn't work for that. With Google Cloud Pub/Sub (the example source he referenced), if there is no subscription to a topic, all publishes to that topic are dropped on the floor; if you don't want to lose data, your are expected to keep the subscription around continuously. In this example, leaking a subscription is probably preferable to losing date (especially since Pub/Sub itself garbage collects subscriptions that have been inactive for a long time).

The answer might be that Beam does not have a good lifecycle story here, and something needs to be built.


On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <rmannibucau@xxxxxxxxx> wrote:
Hi Andrew,

IIRC sources should clean up their resources per method since they dont have a better lifecycle. Readers can create anything longer and release it at close time.

Le mer. 1 août 2018 00:31, Andrew Pilloud <apilloud@xxxxxxxxxx> a écrit :
Some of our IOs create external resources that need to be cleaned up when a pipeline is terminated. It looks like the org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but there is no call for cleanup. For example, PubsubIO creates a Pubsub subcription in createReader()/split() and it should be deleted at shutdown. Does anyone have ideas on how I might make this happen?

(I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the PubSub specific issue.)