OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Empty localReaders list with Flink + Kafka in parallel


Hello Nicolas,

Thank you for reporting this.

Looks like we have an issue when number of Kafka topic partitions is less than value of parallelism (number of task slots).
So, a workaround for now can be to set parallelism <= number of topic partitions - thus, if parallelism=2 then number_partitions should be >= 2

I created new Jira issue to track and work on this:


Alexey

On 12 Jul 2018, at 16:08, Nicolas Viard <nicolas.viard@xxxxxxxxxx> wrote:

Hello,

I'm trying to use Flink in streaming mode and get data from a Kafka topic.
It works without parallelism, but it doesn't when i set parallelism > 1 and I get this exception:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
	at java.util.ArrayList.get(ArrayList.java:433)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)

I'm using Beam 2.5.0, so this list must contain local readers, but I don't know why it is empty.
Does anyone have an idea about how I can fix this ?

This is the beginning of my pipeline :

PipelineOptions options = PipelineOptionsFactory.create();

options.setRunner(FlinkRunner.class);
options.as(FlinkPipelineOptions.class).setParallelism(-1);

pipeline = Pipeline.create(options);

pipeline
.apply(
KafkaIO.<String, Data>read()
.withBootstrapServers("*****")
.withTopic("*****")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(DataDeserializer.class)
.withoutMetadata()
)
;

pipeline.run();
Best Regards,

Nicolas Viard