We have the same setup and we do daily updates
to our pipeline.
The way we do it is using the flink tool via a
Basically our deployment job do as follow:
1. Detect if the pipeline is running (it matches
via job name)
2. If found, do a flink cancel with a savepoint
(we uses hdfs for checkpoint / savepoint) under a given
3. It uses the flink run command for the new job
and specify the savepoint from step 2.
I don't think there is any support to achieve
the same from within the pipeline. You need to do this
externally as explained above.
We have a number of Beam pipelines processing
unbounded streams sourced from Kafka on the Flink
runner and are very happy with both the platform and
The problem is with shutting down the pipelines...for
version upgrades, system maintenance, load management,
etc. it would be nice to be able to gracefully shut
these down under software control but haven't been
able to find a way to do so. We're in good shape on
checkpointing and then cleanly recovering but
shutdowns are all destructive to Flink or the Flink
1) Calling cancel on FlinkRunnerResult returned from
This would be our preferred method but p.run() doesn't
return until termination and even if it did, the
runner code simply throws:
not support cancel.");"
so this doesn't appear to be a near-term option.
2) Inject a "termination" message into the pipeline
This does get through, but calling exit() from a stage
in the pipeline also terminates the Flink TaskManager.
3) Inject a "sleep" message, then manually restart the
This is our current method: we pause the data at the
source, flood all branches of the pipeline with a
"we're going down" msg so the stages can do a bit of
housekeeping, then hard-stop the entire environment
and re-launch with the new version.
Is there a "Best Practice" method for gracefully
terminating an unbounded pipeline from within the
pipeline or from the mainline that launches it?