[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Cleanup resources on pipeline cancelation

Romain is correct, you would need some global reference counting here to use the close() callback. The problem is that the input subscription is a pipeline-wide resource, it's not a per-reader resource.

On Thu, Aug 2, 2018 at 10:07 AM Romain Manni-Bucau <rmannibucau@xxxxxxxxx> wrote:

Le jeu. 2 août 2018 18:32, Andrew Pilloud <apilloud@xxxxxxxxxx> a écrit :
The subscriptions I want to clean up are ones that are implicitly created by the PubsubIO. These subscriptions are created then leaked, they aren't reused in future pipelines so the data loss issues are moot here. I agree that we don't want to tear down user supplied subscriptions.

I've been doing some more digging, it looks like the Source.Reader interface has a close() callback. Is that a place I might be able to do cleanup? (It appears this is hooked up to RichFunction.close() callback on Flink and called from the Direct Runner but possibly not called from other runners.)

It is after the parallelization (you can have N>1 readers in parallel) so if you have some global reference counting to cleanup once yes, otherwise it will be hard.


On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <relax@xxxxxxxxxx> wrote:
Actually I think SDF is the right way to fix this. The SDF can set a timer at infinity (which will only fires when the pipeline shuts down). I believe that SDF support is being added to the portability layer now, so eventually all portable runners will support it, and maybe we can live with the status quo until then.

On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rmannibucau@xxxxxxxxx> wrote:
I agree Reuven. But leaking in a source doesnt give any guarantee regarding the execution since it will depends the runner and current API will not provide you that feature. Using a reference counting state can work better but would require a sdf migration (and will hit runner support issues :().

Le jeu. 2 août 2018 05:39, Reuven Lax <relax@xxxxxxxxxx> a écrit :
Hi Romain,

Andrew's example actually wouldn't work for that. With Google Cloud Pub/Sub (the example source he referenced), if there is no subscription to a topic, all publishes to that topic are dropped on the floor; if you don't want to lose data, your are expected to keep the subscription around continuously. In this example, leaking a subscription is probably preferable to losing date (especially since Pub/Sub itself garbage collects subscriptions that have been inactive for a long time).

The answer might be that Beam does not have a good lifecycle story here, and something needs to be built.


On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <rmannibucau@xxxxxxxxx> wrote:
Hi Andrew,

IIRC sources should clean up their resources per method since they dont have a better lifecycle. Readers can create anything longer and release it at close time.

Le mer. 1 août 2018 00:31, Andrew Pilloud <apilloud@xxxxxxxxxx> a écrit :
Some of our IOs create external resources that need to be cleaned up when a pipeline is terminated. It looks like the org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but there is no call for cleanup. For example, PubsubIO creates a Pubsub subcription in createReader()/split() and it should be deleted at shutdown. Does anyone have ideas on how I might make this happen?

(I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the PubSub specific issue.)