We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable sources to be able to gracefully exit from the job with Yarn state "finished/succeeded".
This works fine, however after creating a savepoint, stopping the job (stop event) and restarting it we remarked that the RocksDB state hasn't been recovered. It looks like that it's because the state directory on HDFS was emptied after issueing a stop event. This isn't the case when we cancel the job, but we'd like to distinguish between job failures and stop events. After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not clear why this is the intended behavior.
Should we use cancel instead?
When we backup the local state directory, stop the job, copy back the directory and start a new job from the savepoint then it works fine.
Another issue is that when we restart the job with different source (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from savepoint doesn't fail but the local state isn't restored. Is there any trick besides setting allowNonRestoredState?