Re: Graceful shutdown of long-running Beam pipeline on Flink

Thank you for sharing these, Lukasz!

Great question, Wayne!

As for pipeline shutdown, Flink users typically take a snapshot and then cancel the pipeline with Flink tools.

The Beam tooling needs to be improved to support cancelling as well. If snapshotting is enabled, the Beam job could also be restored from a snapshot instead of explicitly taking a savepoint.

Related issue for cancelling: https://issues.apache.org/jira/browse/BEAM-593 I think we should address this soon for the next release.


On 03.12.18 17:53, Lukasz Cwik wrote:
There are propoosals for pipeline drain[1] and also for snapshot and update[2] for Apache Beam. We would love contributions in this space.

1: https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8 2: https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY

On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins wrote:

    Hi JC,

    Thanks for the quick response!
    I had hoped for an in-pipeline solution for runner portability but
    it is nice to know we're not the only ones stepping outside to
    interact with runner management. :-)


    On 2018-12-03 01:23, Juan Carlos Garcia wrote:
    Hi Wayne,

    We have the same setup and we do daily updates to our pipeline.

    The way we do it is using the flink tool via a Jenkins.

    Basically our deployment job do as follow:

    1. Detect if the pipeline is running (it matches via job name)

    2. If found, do a flink cancel with a savepoint (we uses hdfs for
    checkpoint / savepoint) under a given directory.

    3. It uses the flink run command for the new job and specify the
    savepoint from step 2.

    I don't think there is any support to achieve the same from within
    the pipeline. You need to do this externally as explained above.

    Best regards,

    Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins geschrieben:
    <mailto:wayneco@xxxxxxxx>> geschrieben:

        Hi all,
        We have a number of Beam pipelines processing unbounded
        streams sourced from Kafka on the Flink runner and are very
        happy with both the platform and performance!

        The problem is with shutting down the pipelines...for version
        upgrades, system maintenance, load management, etc. it would
        be nice to be able to gracefully shut these down under
        software control but haven't been able to find a way to do so.
        We're in good shape on checkpointing and then cleanly
        recovering but shutdowns are all destructive to Flink or the
        Flink TaskManager.

        Methods tried:

        1) Calling cancel on FlinkRunnerResult returned from
        This would be our preferred method but p.run() doesn't return
        until termination and even if it did, the runner code simply
        "throw new UnsupportedOperationException("FlinkRunnerResult
        does not support cancel.");"
        so this doesn't appear to be a near-term option.

        2) Inject a "termination" message into the pipeline via Kafka
        This does get through, but calling exit() from a stage in the
        pipeline also terminates the Flink TaskManager.

        3) Inject a "sleep" message, then manually restart the cluster
        This is our current method: we pause the data at the source,
        flood all branches of the pipeline with a "we're going down"
        msg so the stages can do a bit of housekeeping, then hard-stop
        the entire environment and re-launch with the new version.

        Is there a "Best Practice" method for gracefully terminating
        an unbounded pipeline from within the pipeline or from the
        mainline that launches it?


Wayne Collins
        dades.ca Inc.

Wayne Collins
    dades.ca Inc.