OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Flink does not read from some Kafka Partitions


Hello,

My team ran into some behavior we did not expect when we tried to get an existing Flink app to read from a re-sized Kafka. Here are the highlights: 
- We are using the FlinkKafkaConsumer010.
- We re-partitioned (added partitions to) an existing topic that our Flink app reads so that it the topic has 8 partitions. Following that, we re-deployed our task managers. We thought that the task managers would start reading new partitions.
- 8 task managers read from the topic, but they did NOT read all of the partitions. 3 of the partitions had 2 task managers reading from them and 3 of the partitions had 0 task managers reading from them. My team had expected that Flink would automatically read from all partitions, 1 task manager per partition.
- To force the app to read from all partitions, we added this property to our kafka consumer properties: flink.partition-discovery.interval-millis and re-deployed the task managers. We expected this flag to cause Flink to discover (and start reading) all partitions. 
- We did not see a change in the Kafka readers — there were still 3 topics not being read.
- Finally, we changed the ID of the Flink operator that  reads the Kafka topic and re-deployed the task managers again. 
- After changing the ID, the app started reading from all partitions. 

 
What is the correct way to pick up partitions after re-partitioning a Kafka topic? 

Thanks,
Ruby