osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Discuss] Outer join support and timestamp assignment for IntervalJoin


As a developer, while not quite a succinct, I feel that option A in both
cases is easier to read.

On Mon, Aug 13, 2018 at 4:18 AM Florian Schmidt <florian@xxxxxxxxxxxxxxxxx>
wrote:

> Hello Community,
>
> I’ve recently been working on adding support for outer joins [1] and
> timestamp assignment [2] to the IntervalJoin in the DataStream API.
> As this is a public API and it should be simple and understandable for
> users I wanted to gather some feedback on some variations that I drafted up:
>
> 1. Add outer joins
>
> Approach A
>
> keyedStreamA.intervalJoin(keyedStreamB)
>                 .leftOuter() // .rightOuter, .fullOuter()
>                 .between(<Time>, <Time>)
>                 .process(new ProcessJoinFunction() { /* … */ }
>
> Approach B
>
> keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin,
> intervalFullOuterJoin
>                 .between(<Time>, <Time>)
>                 .process(new ProcessJoinFunction() { /* … */ }
>
> Approach C
>
> keyedStreamA.intervalJoin(keyedStreamB)
>                 .joinType(JoinType.INNER) // Reuse existing (internally
> used) JoinType
>
>
> Personally I feel like C is the cleanest approach, but it has the problem
> that checking for invalid timestamp strategy & join combinations can only
> be done during runtime, whereas A and B would allow us to express valid
> combinations through the type system.
>
> 2. Assign timestamps to the joined pairs
>
> When two elements are joined together, this will add support for
> specifying which of the elements timestamps should be assigned as the
> results timestamp.
> The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the
> minimum of the two elements timestamps, MAX the maximum, LEFT the left
> elements timestamp and RIGHT the right elements timestamp.
>
> Approach A
>
> keyedStreamA.intervalJoin(streamB)
>                 .between(<Time>, <Time>)
>                 .assignLeftTimestamp() // assignRightTimestamp(),
> assignMinTimestamp(), assignMaxTimestamp()
>                 .process(new ProcessJoinFunction() { /* … */ }
>
> Approach B
>
> keyedStreamA.intervalJoin(keyedStreamB)
>                 .between(<Time>, <Time>)
>                 .assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN,
> .MAX
>
> Again I feel like B is the cleanest approach, but has the same caveat with
> runtime vs. type system checks as the approach above. This could be
> especially interesting when it comes to combinations of join types and
> timestamp assignments, where we will have a few combinations that are not
> possibly.
>
> Any feedback would be greatly appreciated. I also updated the design doc
> at [3] if anyone wants to hop in on further discussions!
>
> Florian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8483 <
> https://issues.apache.org/jira/browse/FLINK-8483>
> [2] https://issues.apache.org/jira/browse/FLINK-8482 <
> https://issues.apache.org/jira/browse/FLINK-8482>
> [3]
> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> <
> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> >
>
>