OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Late data before window end is even close


I don't understand why I'm getting some data discarded as late on my Flink stream job a long time before the window even closes.

I can not be 100% sure, but to me it seems like the kafka consumer is basically causing the data to be dropped as "late", not the window. I didn't expect this to ever happen?

I have a Flink stream job that gathers distinct values using a 24-hour window. It reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.

I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

.timeWindow(Time.days(1))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateDataTag)
.reduce(new DistinctFunction())

I have used accumulators to see that there is some late data. I have had multiple occurrences of those.

Now focusing on a particular case that I was investigating more closely. Around ~12:15 o-clock my late data accumulator started showing that 1 message had been late. That's in the middle of the time window – so why would this happen? I would expect late data to be discarded only sometime after 00:01 if some data is arriving late for the window that just closed at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.

To analyze the timestamps I read all messages in sequence separately from each kafka partition and calculated the difference in timestamps between consecutive messages. I had had exactly one message categorized as late by Flink in this case, and at the time i was using maxOutOfOrderness = 5 seconds. I found exactly one message in one kafka partition where the timestamp difference between messages was 5 seconds (they were out of order by 5 s), which makes me wonder, did Flink drop the event as late because it violated maxOutOfOrderness? Have I misunderstood the concept of late data somehow? I only expected late data to happen on window operations. I would expect kafka consumer to pass "late" messages onward even though watermark doesn't change.

Thank you very much if you can find the time to look at this!