osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Discuss] Outer join support and timestamp assignment for IntervalJoin


Thanks for starting the discussion Florian.

I'm also in favor of both A options.
Option A for the outer joins is also is closest to the join syntax of the
DataSet API.

Thanks,
Fabian



2018-08-13 20:50 GMT+02:00 Elias Levy <fearsome.lucidity@xxxxxxxxx>:

> As a developer, while not quite a succinct, I feel that option A in both
> cases is easier to read.
>
> On Mon, Aug 13, 2018 at 4:18 AM Florian Schmidt <florian@xxxxxxxxxxxxxxxxx
> >
> wrote:
>
> > Hello Community,
> >
> > I’ve recently been working on adding support for outer joins [1] and
> > timestamp assignment [2] to the IntervalJoin in the DataStream API.
> > As this is a public API and it should be simple and understandable for
> > users I wanted to gather some feedback on some variations that I drafted
> up:
> >
> > 1. Add outer joins
> >
> > Approach A
> >
> > keyedStreamA.intervalJoin(keyedStreamB)
> >                 .leftOuter() // .rightOuter, .fullOuter()
> >                 .between(<Time>, <Time>)
> >                 .process(new ProcessJoinFunction() { /* … */ }
> >
> > Approach B
> >
> > keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin,
> > intervalFullOuterJoin
> >                 .between(<Time>, <Time>)
> >                 .process(new ProcessJoinFunction() { /* … */ }
> >
> > Approach C
> >
> > keyedStreamA.intervalJoin(keyedStreamB)
> >                 .joinType(JoinType.INNER) // Reuse existing (internally
> > used) JoinType
> >
> >
> > Personally I feel like C is the cleanest approach, but it has the problem
> > that checking for invalid timestamp strategy & join combinations can only
> > be done during runtime, whereas A and B would allow us to express valid
> > combinations through the type system.
> >
> > 2. Assign timestamps to the joined pairs
> >
> > When two elements are joined together, this will add support for
> > specifying which of the elements timestamps should be assigned as the
> > results timestamp.
> > The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the
> > minimum of the two elements timestamps, MAX the maximum, LEFT the left
> > elements timestamp and RIGHT the right elements timestamp.
> >
> > Approach A
> >
> > keyedStreamA.intervalJoin(streamB)
> >                 .between(<Time>, <Time>)
> >                 .assignLeftTimestamp() // assignRightTimestamp(),
> > assignMinTimestamp(), assignMaxTimestamp()
> >                 .process(new ProcessJoinFunction() { /* … */ }
> >
> > Approach B
> >
> > keyedStreamA.intervalJoin(keyedStreamB)
> >                 .between(<Time>, <Time>)
> >                 .assignTimestamp(TimestampStrategy.LEFT) // .RIGHT,
> .MIN,
> > .MAX
> >
> > Again I feel like B is the cleanest approach, but has the same caveat
> with
> > runtime vs. type system checks as the approach above. This could be
> > especially interesting when it comes to combinations of join types and
> > timestamp assignments, where we will have a few combinations that are not
> > possibly.
> >
> > Any feedback would be greatly appreciated. I also updated the design doc
> > at [3] if anyone wants to hop in on further discussions!
> >
> > Florian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-8483 <
> > https://issues.apache.org/jira/browse/FLINK-8483>
> > [2] https://issues.apache.org/jira/browse/FLINK-8482 <
> > https://issues.apache.org/jira/browse/FLINK-8482>
> > [3]
> > https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> > <
> > https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> > >
> >
> >
>