I want to reach out for opinions on what would be the best way to proceed with https://issues.apache.org/jira/browse/BEAM-6077
The problem is, that when FlinkRunner job is being restored from checkpoint, it needs to resurrect source and it's readers given the checkpoint state. State element is represented by `UnboundedSource.CheckpointMark` which does not tell much information. Within CheckpointMark there might be already stored state per key, e.g. in case of Kafka it is list of PartitionMarks having each partition_id and offset.
UnboundedSource can create a reader per single CheckpointMark and reader can produce single CheckpointMark from it's state. Now at rescale, number of CheckpointMarks retrieved from state does not correspond to actual parallelism. Merge or flatten needs to be invoked over list of marks read from state. The question is, where such logic and knowledge should be.
It feels similar to UnboundedSource.split(parallelism, pipelineOptions) and also maybe related somehow to SplittableDoFn logic. Not sure.
My question is:
1. Is there a way to achieve such splitting / merging of checkpoint mark with current SDK?
2. If not and it make sense to add it where it would best go? Source?
3. Some other approach Beam rookie as me do not see?