OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Multiple stream operator watermark handling


Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are working on to resolve in the future. Usually recommended workarounds are to send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
Please check it and If you have any further suggestions you are welcome to make a comments in the PR. I hope it clarifies the behaviour.

Piotrek

On 25 May 2018, at 00:03, Elias Levy <fearsome.lucidity@xxxxxxxxx> wrote:

On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucidity@xxxxxxxxx> wrote:
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> wrote:
From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

That seems the safer, but more complicated path.

As we had already implemented the business logic in a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit = processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
      .connect(stream2)
      .flatMap( new BusinessCoFlatMapFunction(params) )
        .name("Operator")
        .uid("op")

with:

stream1
      .connect(stream2)
      .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
      .uid("op")