osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Could not extract key Exception only on runtime not in dev environment


yes you can also see it in the message (and also it should have crushed also on the ide) further more to be sure I added a filter that looks like 
env
    .addSource(kafka_source)
    .filter(_.id != null)
    .keyBy{ r =>
        val h = fastHash(r.id) % partitionFactor
        math.abs(h)
    }
    .map(...)

and still the same 

On Tue, Nov 20, 2018 at 5:31 PM miki haiat <miko5054@xxxxxxxxx> wrote:
What r.id  Value ? 
Are you sure that is not null ?

Miki.


On Tue, 20 Nov 2018, 17:26 Avi Levi <avi.levi@xxxxxxxxxxxxxx wrote:
I am running flink locally on my machine , I am getting the exception below when reading from kafka topic. when running from the ide (intellij) it is running perfectly. however when I deploy my jar to flink runtime (locally) using 
/bin/flink run ~MyApp-1.0-SNAPSHOT.jar
my class looks like this 
case class Foo(id: String, value: String, timestamp: Long, counter: Int) 
I am getting this exception 
java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 22 more
Caused by: java.lang.NullPointerException
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:41)
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:40)
	at org.apache.flink.streaming.api.scala.DataStream$$anon$2.getKey(DataStream.scala:411)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
	... 26 more

my key partition is simple (partitionFactor = some number) 
.keyBy{ r =>
        val h = fastHash(r.id) % partitionFactor
        math.abs(h)
    }
again, this happens only on runtime not when I run it from intellij 
this so frustrating, any advice ?