OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink CEP Watermark Exception


Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.

Thanks,
Austin

On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards <austin.cawley@xxxxxxxxx> wrote:
Hi there,

We have a streaming application that uses CEP processing but are getting this error fairly frequently after a checkpoint fails, though not sure if it is related. We have implemented both  `hashCode` and `equals()` using `Objects.hash(...properties)` and basic equality, respectively. Has anyone seen this before using CEP?

Here is  the full exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Could not find previous entry with key: alertOne, value: {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} and timestamp: 1539700799999. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has been already pruned.
	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)


Best,
Austin