Flink operator max parallelism and rescalable jobs


I want to collect some feedback on rescaling streaming Beam pipeline on Flink runner. Flink seems to be able to re-scale jobs, which in Beam terms means changing the parallelism in Beam. However, one have to make sure that state can rescale as well to the predefined MAX parallelism. Max parallelism must be set for job on FlinkRunner.

Flink supports fiddling with max parallelism on global, environment and operator level. Changes in operator level are not possible with beam. I found this JIRA which seems to be inconclusive if changes in operator parallelism make sense to adopt somehow in Beam

I did try to set max parallelism to environment via my local patch. My job did launch and not crash like before when I bumped parallelism += 1. But there was one drawback as far as I know. My test job reads from kafka and after launching job from savepoint point, one partition does not continue from offset in savepoint but according to what is defined by auto.offset.reset (my case 'latest') which is not great.

My questions:

1. Should re-scale work for beam if runner does support it or there can be some incompatibilities in general depending on how particular runner works

2. Did anyone have a success with Flink and rescale? Honestly, not sure how well it behaves in native Flink. Never tried it 

3. Why does kafka not redistribute stored partition offsets after chenging parallelism?

4. Is BEAM-68 still relevant?

Many thanks,