osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Watermark alignment during unit tests


Hi devs

During the work on https://issues.apache.org/jira/browse/FLINK-10050 I've
found unstable behaviour of unit tests for unioned streams (which are used
in CoGroupedStream/JoinedStream under the hood).
Let's assume we have late elements in one of the stream. The thing is we
have no guarantees which source will be read first, and in which order
watermark alignment will be applied. So, the following example produce
different results for different invocation:

     val s1 = env.addSource(new SourceFunction[(String, String)] {
        override def run(ctx: SourceFunction.SourceContext[(String,
String)]): Unit = {
          ctx.collectWithTimestamp(("a", "a1"), 1)
          //wmAllignmentLock.synchronized {
              //wmAllignmentLock.wait()
          //}
          ctx.emitWatermark(new Watermark(4))
          ctx.collectWithTimestamp(("a", "a2"), 2)
        }

        override def cancel(): Unit = {}
      })

      val s2 = env.addSource(new SourceFunction[(String, String)] {
        override def run(ctx: SourceFunction.SourceContext[(String,
String)]): Unit = {
          ctx.collectWithTimestamp(("a", "b1"), 1)
          ctx.emitWatermark(new Watermark(4))
          //wmAllignmentLock.synchronized {
              //wmAllignmentLock.notifyAll()
          //}
        }

        override def cancel(): Unit = {}
      })

      val joined = s1.join(s2).where(_._1).equalTo(_._1)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(3)))
        .apply((x, y) => s"$x:$y")
For some invocations (when Flink decide to process 2nd source before
1st), ("a", "a2") is considered to be late and dropped; and vice
versa.Here is the rate for 1000 invocations:
Run JOIN periodic
iteration [50] contains late total = 22, this iter = 22
iteration [100] contains late total = 51, this iter = 29
iteration [150] contains late total = 78, this iter = 27
iteration [200] contains late total = 101, this iter = 23
iteration [250] contains late total = 124, this iter = 23
iteration [300] contains late total = 155, this iter = 31
iteration [350] contains late total = 184, this iter = 29
iteration [400] contains late total = 210, this iter = 26
iteration [450] contains late total = 233, this iter = 23
iteration [500] contains late total = 256, this iter = 23
iteration [550] contains late total = 274, this iter = 18
iteration [600] contains late total = 303, this iter = 29
iteration [650] contains late total = 338, this iter = 35
iteration [700] contains late total = 367, this iter = 29
iteration [750] contains late total = 393, this iter = 26
iteration [800] contains late total = 415, this iter = 22
iteration [850] contains late total = 439, this iter = 24
iteration [900] contains late total = 459, this iter = 20
iteration [950] contains late total = 484, this iter = 25
iteration [1000] contains late total = 502, this iter = 18
contains late = 502


It doesn't matter Periodic or Punctuated watermark assigner is used.
As well as syncronization mechanism (commented in the code snippet
above) doesn't help to align records in particular order.

While this behaviour is totally fine for Production case, I just
wonder how to write stable unit test scenario to cover late elements
processing.
I didn't find any suitable test harness from utils.

Any feedback is appreciated!

Regards,
Eugen