[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [BEAM-6077] FlinkRunner: Make UnboundedSource state re-scale friendly

Hi Jozef,

I responded on JIRA today before I saw your mail here.

The splitting of the UnboundedSource is performed during translation of the Beam pipeline. It think it would be feasible to use Flink's maximum parallelism instead of the configured parallelism. That would enable to increase the parallelism at a later point in time.

Another option would be to split the sources again when scaling up; I'm not sure whether that would work for all sources. Scaling down should be easy because the wrapper supports reading from multiple sources.


On 20.11.18 11:38, Jozef Vilcek wrote:
I want to reach out for opinions on what would be the best way to proceed with https://issues.apache.org/jira/browse/BEAM-6077

The problem is, that when FlinkRunner job is being restored from checkpoint, it needs to resurrect source and it's readers given the checkpoint state. State element is represented by `UnboundedSource.CheckpointMark` which does not tell much information. Within CheckpointMark there might be already stored state per key, e.g. in case of Kafka it is list of PartitionMarks having each partition_id and offset.

UnboundedSource can create a reader per single CheckpointMark and reader can produce single CheckpointMark from it's state. Now at rescale, number of CheckpointMarks retrieved from state does not correspond to actual parallelism. Merge or flatten needs to be invoked over list of marks read from state. The question is, where such logic and knowledge should be.

It feels similar to UnboundedSource.split(parallelism, pipelineOptions) and also maybe related somehow to SplittableDoFn logic. Not sure.

My question is:
1. Is there a way to achieve such splitting / merging of checkpoint mark with current SDK?
2. If not and it make sense to add it where it would best go? Source?
3. Some other approach Beam rookie as me do not see?