osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Discuss] Outer join support and timestamp assignment for IntervalJoin


Hello Community,

I’ve recently been working on adding support for outer joins [1] and timestamp assignment [2] to the IntervalJoin in the DataStream API.
As this is a public API and it should be simple and understandable for users I wanted to gather some feedback on some variations that I drafted up:

1. Add outer joins

Approach A

keyedStreamA.intervalJoin(keyedStreamB)
		.leftOuter() // .rightOuter, .fullOuter()
		.between(<Time>, <Time>)
		.process(new ProcessJoinFunction() { /* … */ }

Approach B

keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin, intervalFullOuterJoin
		.between(<Time>, <Time>)
		.process(new ProcessJoinFunction() { /* … */ }

Approach C

keyedStreamA.intervalJoin(keyedStreamB)
		.joinType(JoinType.INNER) // Reuse existing (internally used) JoinType


Personally I feel like C is the cleanest approach, but it has the problem that checking for invalid timestamp strategy & join combinations can only be done during runtime, whereas A and B would allow us to express valid combinations through the type system.

2. Assign timestamps to the joined pairs

When two elements are joined together, this will add support for specifying which of the elements timestamps should be assigned as the results timestamp.
The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the minimum of the two elements timestamps, MAX the maximum, LEFT the left elements timestamp and RIGHT the right elements timestamp.

Approach A

keyedStreamA.intervalJoin(streamB)
		.between(<Time>, <Time>)
		.assignLeftTimestamp() // assignRightTimestamp(), assignMinTimestamp(), assignMaxTimestamp()
		.process(new ProcessJoinFunction() { /* … */ }

Approach B

keyedStreamA.intervalJoin(keyedStreamB)
		.between(<Time>, <Time>)
		.assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN, .MAX

Again I feel like B is the cleanest approach, but has the same caveat with runtime vs. type system checks as the approach above. This could be especially interesting when it comes to combinations of join types and timestamp assignments, where we will have a few combinations that are not possibly. 

Any feedback would be greatly appreciated. I also updated the design doc at [3] if anyone wants to hop in on further discussions!

Florian

[1] https://issues.apache.org/jira/browse/FLINK-8483 <https://issues.apache.org/jira/browse/FLINK-8483>
[2] https://issues.apache.org/jira/browse/FLINK-8482 <https://issues.apache.org/jira/browse/FLINK-8482>
[3] https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c <https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c>