osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Empty localReaders list with Flink + Kafka in parallel


Hello,

I'm trying to use Flink in streaming mode and get data from a Kafka topic.
It works without parallelism, but it doesn't when i set parallelism > 1 and I get this exception:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
	at java.util.ArrayList.get(ArrayList.java:433)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)

I'm using Beam 2.5.0, so this list must contain local readers, but I don't know why it is empty.
Does anyone have an idea about how I can fix this ?

This is the beginning of my pipeline :

PipelineOptions options = PipelineOptionsFactory.create();

options.setRunner(FlinkRunner.class);
options.as(FlinkPipelineOptions.class).setParallelism(-1);

pipeline = Pipeline.create(options);

pipeline
.apply(
KafkaIO.<String, Data>read()
.withBootstrapServers("*****")
.withTopic("*****")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(DataDeserializer.class)
.withoutMetadata()
)
;

pipeline.run();
Best Regards,

Nicolas Viard