My team ran into some behavior we did not expect
when we tried to get an existing Flink app to read from a
re-sized Kafka. Here are the highlights:
- We are using the FlinkKafkaConsumer010.
- We re-partitioned (added partitions to) an
existing topic that our Flink app reads so that it the topic has
8 partitions. Following that, we re-deployed our task managers.
We thought that the task managers would start reading new
- 8 task managers read from the topic, but they did
NOT read all of the partitions. 3 of the partitions had 2 task
managers reading from them and 3 of the partitions had 0 task
managers reading from them. My team had expected that Flink
would automatically read from all partitions, 1 task manager per
- To force the app to read from all partitions, we
added this property to our kafka consumer properties: flink.partition-discovery.interval-millis and
re-deployed the task managers. We expected this flag to cause
Flink to discover (and start reading) all partitions.
- We did not see a change in the Kafka readers —
there were still 3 topics not being read.
- Finally, we changed the ID of the Flink operator
that reads the Kafka topic and re-deployed the task managers
- After changing the ID, the app started reading
from all partitions.
What is the correct way to pick up partitions after
re-partitioning a Kafka topic?