[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Stopping of a streaming job empties state store on HDFS


> Am 08.06.2018 um 01:16 schrieb Peter Zende <peter.zende@xxxxxxxxx>:
> Hi all,
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable sources to be able to  gracefully exit from the job with Yarn state "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop event) and restarting it we remarked that the RocksDB state hasn't been recovered. It looks like that it's because the state directory on HDFS was emptied after issueing a stop event. This isn't the case when we cancel the job, but we'd like to distinguish between job failures and stop events. After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, checkpoints should be cleaned up. Where are you storing the created savepoints? They should not go into the checkpoint directory. Stop is intended to be a more „graceful“ variant of cancel, but I think it is rarely used with Flink. I would prefer cancel except if you really require to use stoppable for some particular reason.

> When we backup the local state directory, stop the job, copy back the directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from savepoint doesn't fail but the local state isn't restored. Is there any trick besides setting allowNonRestoredState?

I need to clarify here, when you say „each having uids set“, do you set the same uids for both types of sources? The uid must match, because Flink will reassign the state in a restore based on the uids, i.e. state x goes to the operator with the same uid as the uid of the operator that created it in the previous job. The flag allowNonRestoredState has the purpose to tolerate that some state from a checkpoint/savepoint does not find a matching operator to which it should be assigned (no operator with matching uid exists in the jobgraph). For example, you want this if you removed operators from the job.