env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
The problematic state that we tried to use was a checkpoint created with this conf.
> Are you using the local recovery feature?
Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.
This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.
Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.