osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Table API Enhancement Outline


Hi Fabian/Xiaowei,

I am very sorry for my late reply! Glad to see your reply, and sounds
pretty good!
I agree that the approach with append() which can clearly defined the
result schema is better which Fabian mentioned.
In addition and append() and also contains non-time attributes, e.g.:

    tab('name, 'age, 'address, 'rowtime)
    tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
'address, 'rowtime)
    .window(Tumble over 5.millis on 'rowtime as 'w)
    .groupBy('w, 'address)

In this way the append() is very useful, and the behavior is very similar
to withForwardedFields() in DataSet.
So +1 to using append() approach for the map()&flatmap()!

But how about the agg() and flatAgg()? In agg/flatAgg case I agree
Xiaowei's approach that define the keys to be implied in the result table
and appears at the beginning, for example as follows:
  tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
    .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
    .select('k1, 'col1, 'w.rowtime as 'rtime)

What to you think? @Fabian @Xiaowei

Thanks,
Jincheng

Fabian Hueske <fhueske@xxxxxxxxx> 于2018年11月9日周五 下午6:35写道:

> Hi Jincheng,
>
> Thanks for the summary!
> I like the approach with append() better than the implicit forwarding as it
> clearly indicates which fields are forwarded.
> However, I don't see much benefit over the flatMap(Expression*) variant, as
> we would still need to analyze the full expression tree to ensure that at
> most (or exactly?) one Scalar / TableFunction is used.
>
> Best,
> Fabian
>
> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
> sunjincheng121@xxxxxxxxx>:
>
> > Hi all,
> >
> > We are discussing very detailed content about this proposal. We are
> trying
> > to design the API in many aspects (functionality, compatibility, ease of
> > use, etc.). I think this is a very good process. Only such a detailed
> > discussion, In order to develop PR more clearly and smoothly in the later
> > stage. I am very grateful to @Fabian and  @Xiaowei for sharing a lot of
> > good ideas.
> > About the definition of method signatures I want to share my points here
> > which I am discussing with fabian in google doc (not yet completed), as
> > follows:
> >
> > Assume we have a table:
> > val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string,
> > 'proctime.proctime)
> >
> > Approach 1:
> > case1: Map follows Source Table
> > val result =
> >   tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in
> > the output
> >   .window(Tumble over 5.millis on 'proctime as 'w)
> >
> > case2: FatAgg follows Window (Fabian mentioned above)
> > val result =
> >     tab.window(Tumble ... as 'w)
> >        .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >        .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
> >        .select('k1, 'col1, 'w.rowtime as 'rtime)
> >
> > Approach 2: Similar to Fabian‘s approach, which the result schema would
> be
> > clearly defined, but add a built-in append UDF. That make
> > map/flatmap/agg/flatAgg interface only accept one Expression.
> > val result =
> >     tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2,
> > 'long, 'proctime)
> >      .window(Tumble over 5.millis on 'proctime as 'w)
> >
> > Note: Append is a special UDF for built-in that can pass through any
> > column.
> >
> > So, May be we can defined the as  table.map(Expression)  first, If
> > necessary, we can extend to table.map(Expression*)  in the future ?  Of
> > course, I also hope that we can do more perfection in this proposal
> through
> > discussion.
> >
> > Thanks,
> > Jincheng
> >
> >
> >
> >
> >
> > Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月7日周三 下午11:45写道:
> >
> > > Hi Fabian,
> > >
> > > I think that the key question you raised is if we allow extra
> parameters
> > in
> > > the methods map/flatMap/agg/flatAgg. I can see why allowing that may
> > appear
> > > more convenient in some cases. However, it might also cause some
> > confusions
> > > if we do that. For example, do we allow multiple UDFs in these
> > expressions?
> > > If we do, the semantics may be weird to define, e.g. what does
> > > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even
> though
> > > not allowing it may appear less powerful, but it can make things more
> > > intuitive too. In the case of agg/flatAgg, we can define the keys to be
> > > implied in the result table and appears at the beginning. You can use a
> > > select method if you want to modify this behavior. I think that
> > eventually
> > > we will have some API which allows other expressions as additional
> > > parameters, but I think it's better to do that after we introduce the
> > > concept of nested tables. A lot of things we suggested here can be
> > > considered as special cases of that. But things are much simpler if we
> > > leave that to later.
> > >
> > > Regards,
> > > Xiaowei
> > >
> > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhueske@xxxxxxxxx>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > * Re emit:
> > > > I think we should start with a well understood semantics of full
> > > > replacement. This is how the other agg functions work.
> > > > As was said before, there are open questions regarding an append mode
> > > > (checkpointing, whether supporting retractions or not and if yes how
> to
> > > > declare them, ...).
> > > > Since this seems to be an optimization, I'd postpone it.
> > > >
> > > > * Re grouping keys:
> > > > I don't think we should automatically add them because the result
> > schema
> > > > would not be intuitive.
> > > > Would they be added at the beginning of the tuple or at the end? What
> > > > metadata fields of windows would be added? In which order would they
> be
> > > > added?
> > > >
> > > > However, we could support syntax like this:
> > > > val t: Table = ???
> > > > t
> > > >   .window(Tumble ... as 'w)
> > > >   .groupBy('a, 'b)
> > > >   .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as
> > 'rtime)
> > > >
> > > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn,
> > > wend,
> > > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
> > > >
> > > > * Re Multi-staged evaluation:
> > > > I think this should be an optimization that can be applied if the UDF
> > > > implements the merge() method.
> > > >
> > > > Best, Fabian
> > > >
> > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
> > > > wshaoxuan@xxxxxxxxx
> > > > >:
> > > >
> > > > > Hi xiaowei,
> > > > >
> > > > > Yes, I agree with you that the semantics of TableAggregateFunction
> > emit
> > > > is
> > > > > much more complex than AggregateFunction. The fundamental
> difference
> > is
> > > > > that TableAggregateFunction emits a "table" while AggregateFunction
> > > > outputs
> > > > > (a column of) a "row". In the case of AggregateFunction it only has
> > one
> > > > > mode which is “replacing” (complete update). But for
> > > > > TableAggregateFunction, it could be incremental (only emit the new
> > > > updated
> > > > > results) update or complete update (always emit the entire table
> when
> > > > > “emit" is triggered).  From the performance perspective, we might
> > want
> > > to
> > > > > use incremental update. But we need review and design this
> carefully,
> > > > > especially taking into account the cases of the failover (instead
> of
> > > just
> > > > > back-up the ACC it may also needs to remember the emit offset) and
> > > > > retractions, as the semantics of TableAggregateFunction emit are
> > > > different
> > > > > than other UDFs. TableFunction also emits a table, but it does not
> > need
> > > > to
> > > > > worry this due to the nature of stateless.
> > > > >
> > > > > Regards,
> > > > > Shaoxuan
> > > > >
> > > > >
> > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaoweij@xxxxxxxxx>
> > > wrote:
> > > > >
> > > > > > Hi Jincheng,
> > > > > >
> > > > > > Thanks for adding the public interfaces! I think that it's a very
> > > good
> > > > > > start. There are a few points that we need to have more
> > discussions.
> > > > > >
> > > > > >    - TableAggregateFunction - this is a very complex beast,
> > > definitely
> > > > > the
> > > > > >    most complex user defined objects we introduced so far. I
> think
> > > > there
> > > > > > are
> > > > > >    quite some interesting questions here. For example, do we
> allow
> > > > > >    multi-staged TableAggregate in this case? What is the
> semantics
> > of
> > > > > > emit? Is
> > > > > >    it amendments to the previous output, or replacing it? I think
> > > that
> > > > > this
> > > > > >    subject itself is worth a discussion to make sure we get the
> > > details
> > > > > > right.
> > > > > >    - GroupedTable.agg - does the group keys automatically appear
> in
> > > the
> > > > > >    output? how about the case of windowing aggregation?
> > > > > >
> > > > > > Regards,
> > > > > > Xiaowei
> > > > > >
> > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
> > > sunjincheng121@xxxxxxxxx>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi, Xiaowei,
> > > > > > >
> > > > > > > Thanks for bring up the discuss of Table API Enhancement
> Outline
> > !
> > > > > > >
> > > > > > > I quickly looked at the overall content, these are good
> > expressions
> > > > of
> > > > > > our
> > > > > > > offline discussions. But from the points of my view, we should
> > add
> > > > the
> > > > > > > usage of public interfaces that we will introduce in this
> > propose.
> > > > > So, I
> > > > > > > added the following usage description of  interface and
> operators
> > > in
> > > > > > > google doc:
> > > > > > >
> > > > > > > 1. Map Operator
> > > > > > >     Map operator is a new operator of Table, Map operator can
> > > apply a
> > > > > > > scalar function, and can return multi-column. The usage as
> > follows:
> > > > > > >
> > > > > > >   val res = tab
> > > > > > >      .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> > > > > > >      .select(‘a, ‘c)
> > > > > > >
> > > > > > > 2. FlatMap Operator
> > > > > > >     FaltMap operator is a new operator of Table, FlatMap
> operator
> > > can
> > > > > > apply
> > > > > > > a table function, and can return multi-row. The usage as
> follows:
> > > > > > >
> > > > > > >   val res = tab
> > > > > > >       .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> > > > > > >       .select(‘a, ‘c)
> > > > > > >
> > > > > > > 3. Agg Operator
> > > > > > >     Agg operator is a new operator of Table/GroupedTable, Agg
> > > > operator
> > > > > > can
> > > > > > > apply a aggregate function, and can return multi-column. The
> > usage
> > > as
> > > > > > > follows:
> > > > > > >
> > > > > > >    val res = tab
> > > > > > >       .groupBy(‘a) // leave groupBy-Clause out to define global
> > > > > > aggregates
> > > > > > >       .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> > > > > > >       .select(‘a, ‘c)
> > > > > > >
> > > > > > > 4.  FlatAgg Operator
> > > > > > >     FlatAgg operator is a new operator of Table/GroupedTable,
> > > FaltAgg
> > > > > > > operator can apply a table aggregate function, and can return
> > > > > multi-row.
> > > > > > > The usage as follows:
> > > > > > >
> > > > > > >     val res = tab
> > > > > > >        .groupBy(‘a) // leave groupBy-Clause out to define
> global
> > > > table
> > > > > > > aggregates
> > > > > > >        .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
> > > > > > >        .select(‘a, ‘c)
> > > > > > >
> > > > > > >   5. TableAggregateFunction
> > > > > > >      The behavior of table aggregates is most like
> > > > GroupReduceFunction
> > > > > > did,
> > > > > > > which computed for a group of elements, and output  a group of
> > > > > elements.
> > > > > > > The TableAggregateFunction can be applied on
> > > GroupedTable.flatAgg() .
> > > > > The
> > > > > > > interface of TableAggregateFunction has a lot of content, so I
> > > don't
> > > > > copy
> > > > > > > it here, Please look at the detail in google doc:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > > > > >
> > > > > > > I will be very appreciate to anyone for reviewing and
> commenting.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jincheng
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>