osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Table API Enhancement Outline


+1. I agree that we should open the JIRAs to start the work. We may
have better ideas on the flavor of the interface when implement/review
the code.

Regards,
shaoxuan


On 11/20/18, jincheng sun <sunjincheng121@xxxxxxxxx> wrote:
> Hi all,
>
> Thanks all for the feedback.
>
> @Piotr About not using abbreviations naming,  +1,I like
> your proposal!Currently both DataSet and DataStream API are using
> `aggregate`,
> BTW,I find other language also not using abbreviations naming,such as R.
>
> Sometimes the interface of the API is really difficult to perfect, we need
> to spend a lot of time thinking and feedback from a large number of users,
> and constantly improve, but for backward compatibility issues, we have to
> adopt the most conservative approach when designing the API(Of course, I am
> more in favor of developing more rich features, when we discuss clearly).
> Therefore, I propose to divide the function implementation of
> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that
> support time attributes and groupKeys. We can develop the features which
> we  have already agreed on the design. And we will continue to discuss the
> uncertain design.
>
> In fact, in addition to the design of APIs, there will be various
> performance optimization details, such as: table Aggregate function
> emitValue will generate multiple calculation results, in extreme cases,
> each record will trigger a large number of retract messages, this will have
> poor performance,so we will also optimize the interface design, such as
> adding the emitWithRetractValue interface (I have updated the google doc)
> to allow the user to optionally perform incremental calculations, thus
> avoiding a large number of retracts. Details like this are difficult to
> fully discuss in the mail list, so I recommend creating JIRAs/FLIP first,
> we develop designs that have been agreed upon and continue to discuss
> non-deterministic designs!  What do you think? @Fabian & Piotr & XiaoWei
>
> Best,
> Jincheng
>
> Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月19日周一 上午12:07写道:
>
>> Hi Fabian & Piotr, thanks for the feedback!
>>
>> I appreciate your concerns, both on timestamp attributes as well as on
>> implicit group keys. At the same time, I'm also concerned with the
>> proposed
>> approach of allowing Expression* as parameters, especially for
>> flatMap/flatAgg. So far, we never allowed a scalar expression to appear
>> together with table expressions. With the Expression* approach, this will
>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if
>> we
>> fully understand the consequences when we try to extend our system in the
>> future. I would be extra cautious in doing this. To avoid this, I think
>> an
>> implicit group key for flatAgg is safer. For flatMap, if users want to
>> keep
>> the rowtime column, he can use crossApply/join instead. So we are not
>> losing any real functionality here.
>>
>> Also a clarification on the following example:
>> tab.window(Tumble ... as 'w)
>>     .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>     .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>     .select('k1, 'col1, 'w.rowtime as 'rtime)
>> If we did not have the select clause in this example, we will have 'w as
>> a
>> regular column in the output. It should not magically disappear.
>>
>> The concern is not as strong for Table.map/Table.agg because we are not
>> mixing scalar and table expressions. But we also want to be a bit
>> consistent with these methods. If we used implicit group keys for
>> Table.flatAgg, we probably should do the same for Table.agg. Now we only
>> have to choose what to do with Table.map. I can see good arguments from
>> both sides. But starting with a single Expression seems safer because
>> that
>> we can always extend to Expression* in the future.
>>
>> While thinking about this problem, it appears that we may need more work
>> in
>> our handling of watermarks for SQL/Table API. Our current way of
>> propagating the watermarks from source all the way to sink might not be
>> optimal. For example, after a tumbling window, the watermark can actually
>> be advanced to just before the expiring of next window. I think that in
>> general, each operator may need to generate new watermarks instead of
>> simply propagating them. Once we accept that watermarks may change during
>> the execution, it appears that the timestamp columns may also change, as
>> long as we have some way to associate watermark with it. My intuition is
>> that once we have a through solution for the watermark issue, we may be
>> able to solve the problem we encountered for Table.map in a cleaner way.
>> But this is a complex issue which deserves a discussion on its own.
>>
>> Regards,
>> Xiaowei
>>
>>
>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
>> wrote:
>>
>> > Hi,
>> >
>> > Isn’t the problem of multiple expressions limited only to `flat***`
>> > functions and to be more specific only to having two (or more)
>> > different
>> > table functions passed as an expressions? `.flatAgg(TableAggA('a),
>> > scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined
>> > (duplicate result of every scalar function to every record. Or am I
>> missing
>> > something?
>> >
>> > Another remark, I would be in favour of not using abbreviations and
>> naming
>> > `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
>> >
>> > Piotrek
>> >
>> > > On 15 Nov 2018, at 14:15, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
>> > >
>> > > Hi Jincheng,
>> > >
>> > > I said before, that I think that the append() method is better than
>> > > implicitly forwarding keys, but still, I believe it adds unnecessary
>> > boiler
>> > > plate code.
>> > >
>> > > Moreover, I haven't seen a convincing argument why map(Expression*)
>> > > is
>> > > worse than map(Expression). In either case we need to do all kinds of
>> > > checks to prevent invalid use of functions.
>> > > If the method is not correctly used, we can emit a good error message
>> and
>> > > documenting map(Expression*) will be easier than
>> > map(append(Expression*)),
>> > > in my opinion.
>> > > I think we should not add unnessary syntax unless there is a good
>> reason
>> > > and to be honest, I haven't seen this reason yet.
>> > >
>> > > Regarding the groupBy.agg() method, I think it should behave just
>> > > like
>> > any
>> > > other method, i.e., not do any implicit forwarding.
>> > > Let's take the example of the windowed group by, that you posted
>> before.
>> > >
>> > > tab.window(Tumble ... as 'w)
>> > >    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> > >    .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>> > >    .select('k1, 'col1, 'w.rowtime as 'rtime)
>> > >
>> > > What happens if 'w.rowtime is not selected? What is the data type of
>> the
>> > > field 'w in the resulting Table? Is it a regular field at all or just
>> > > a
>> > > system field that disappears if it is not selected?
>> > >
>> > > IMO, the following syntax is shorter, more explicit, and better
>> > > aligned
>> > > with the regular window.groupBy.select aggregations that are
>> > > supported
>> > > today.
>> > >
>> > > tab.window(Tumble ... as 'w)
>> > >    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> > >    .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
>> > >
>> > >
>> > > Best, Fabian
>> > >
>> > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
>> > > sunjincheng121@xxxxxxxxx>:
>> > >
>> > >> Hi Fabian/Xiaowei,
>> > >>
>> > >> I am very sorry for my late reply! Glad to see your reply, and
>> > >> sounds
>> > >> pretty good!
>> > >> I agree that the approach with append() which can clearly defined
>> > >> the
>> > >> result schema is better which Fabian mentioned.
>> > >> In addition and append() and also contains non-time attributes,
>> > >> e.g.:
>> > >>
>> > >>    tab('name, 'age, 'address, 'rowtime)
>> > >>    tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
>> > >> 'address, 'rowtime)
>> > >>    .window(Tumble over 5.millis on 'rowtime as 'w)
>> > >>    .groupBy('w, 'address)
>> > >>
>> > >> In this way the append() is very useful, and the behavior is very
>> > similar
>> > >> to withForwardedFields() in DataSet.
>> > >> So +1 to using append() approach for the map()&flatmap()!
>> > >>
>> > >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
>> > >> Xiaowei's approach that define the keys to be implied in the result
>> > table
>> > >> and appears at the beginning, for example as follows:
>> > >>  tab.window(Tumble ... as 'w)
>> > >>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> > >>    .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>> > >>    .select('k1, 'col1, 'w.rowtime as 'rtime)
>> > >>
>> > >> What to you think? @Fabian @Xiaowei
>> > >>
>> > >> Thanks,
>> > >> Jincheng
>> > >>
>> > >> Fabian Hueske <fhueske@xxxxxxxxx> 于2018年11月9日周五 下午6:35写道:
>> > >>
>> > >>> Hi Jincheng,
>> > >>>
>> > >>> Thanks for the summary!
>> > >>> I like the approach with append() better than the implicit
>> > >>> forwarding
>> > as
>> > >> it
>> > >>> clearly indicates which fields are forwarded.
>> > >>> However, I don't see much benefit over the flatMap(Expression*)
>> > variant,
>> > >> as
>> > >>> we would still need to analyze the full expression tree to ensure
>> that
>> > at
>> > >>> most (or exactly?) one Scalar / TableFunction is used.
>> > >>>
>> > >>> Best,
>> > >>> Fabian
>> > >>>
>> > >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
>> > >>> sunjincheng121@xxxxxxxxx>:
>> > >>>
>> > >>>> Hi all,
>> > >>>>
>> > >>>> We are discussing very detailed content about this proposal. We
>> > >>>> are
>> > >>> trying
>> > >>>> to design the API in many aspects (functionality, compatibility,
>> ease
>> > >> of
>> > >>>> use, etc.). I think this is a very good process. Only such a
>> detailed
>> > >>>> discussion, In order to develop PR more clearly and smoothly in
>> > >>>> the
>> > >> later
>> > >>>> stage. I am very grateful to @Fabian and  @Xiaowei for sharing a
>> > >>>> lot
>> > of
>> > >>>> good ideas.
>> > >>>> About the definition of method signatures I want to share my
>> > >>>> points
>> > >> here
>> > >>>> which I am discussing with fabian in google doc (not yet
>> > >>>> completed),
>> > as
>> > >>>> follows:
>> > >>>>
>> > >>>> Assume we have a table:
>> > >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string,
>> > >>>> 'proctime.proctime)
>> > >>>>
>> > >>>> Approach 1:
>> > >>>> case1: Map follows Source Table
>> > >>>> val result =
>> > >>>>  tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime
>> implied
>> > >> in
>> > >>>> the output
>> > >>>>  .window(Tumble over 5.millis on 'proctime as 'w)
>> > >>>>
>> > >>>> case2: FatAgg follows Window (Fabian mentioned above)
>> > >>>> val result =
>> > >>>>    tab.window(Tumble ... as 'w)
>> > >>>>       .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> > >>>>       .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
>> > >>>>       .select('k1, 'col1, 'w.rowtime as 'rtime)
>> > >>>>
>> > >>>> Approach 2: Similar to Fabian‘s approach, which the result schema
>> > would
>> > >>> be
>> > >>>> clearly defined, but add a built-in append UDF. That make
>> > >>>> map/flatmap/agg/flatAgg interface only accept one Expression.
>> > >>>> val result =
>> > >>>>    tab.map(append(udf('string), 'long, 'proctime)) as ('col1,
>> > >>>> 'col2,
>> > >>>> 'long, 'proctime)
>> > >>>>     .window(Tumble over 5.millis on 'proctime as 'w)
>> > >>>>
>> > >>>> Note: Append is a special UDF for built-in that can pass through
>> > >>>> any
>> > >>>> column.
>> > >>>>
>> > >>>> So, May be we can defined the as  table.map(Expression)  first, If
>> > >>>> necessary, we can extend to table.map(Expression*)  in the future
>> > >>>> ?
>> > Of
>> > >>>> course, I also hope that we can do more perfection in this
>> > >>>> proposal
>> > >>> through
>> > >>>> discussion.
>> > >>>>
>> > >>>> Thanks,
>> > >>>> Jincheng
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月7日周三 下午11:45写道:
>> > >>>>
>> > >>>>> Hi Fabian,
>> > >>>>>
>> > >>>>> I think that the key question you raised is if we allow extra
>> > >>> parameters
>> > >>>> in
>> > >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that
>> may
>> > >>>> appear
>> > >>>>> more convenient in some cases. However, it might also cause some
>> > >>>> confusions
>> > >>>>> if we do that. For example, do we allow multiple UDFs in these
>> > >>>> expressions?
>> > >>>>> If we do, the semantics may be weird to define, e.g. what does
>> > >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean?
>> > >>>>> Even
>> > >>> though
>> > >>>>> not allowing it may appear less powerful, but it can make things
>> more
>> > >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys
>> to
>> > >> be
>> > >>>>> implied in the result table and appears at the beginning. You can
>> > >> use a
>> > >>>>> select method if you want to modify this behavior. I think that
>> > >>>> eventually
>> > >>>>> we will have some API which allows other expressions as
>> > >>>>> additional
>> > >>>>> parameters, but I think it's better to do that after we introduce
>> the
>> > >>>>> concept of nested tables. A lot of things we suggested here can
>> > >>>>> be
>> > >>>>> considered as special cases of that. But things are much simpler
>> > >>>>> if
>> > >> we
>> > >>>>> leave that to later.
>> > >>>>>
>> > >>>>> Regards,
>> > >>>>> Xiaowei
>> > >>>>>
>> > >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhueske@xxxxxxxxx>
>> > >>> wrote:
>> > >>>>>
>> > >>>>>> Hi,
>> > >>>>>>
>> > >>>>>> * Re emit:
>> > >>>>>> I think we should start with a well understood semantics of full
>> > >>>>>> replacement. This is how the other agg functions work.
>> > >>>>>> As was said before, there are open questions regarding an append
>> > >> mode
>> > >>>>>> (checkpointing, whether supporting retractions or not and if yes
>> > >> how
>> > >>> to
>> > >>>>>> declare them, ...).
>> > >>>>>> Since this seems to be an optimization, I'd postpone it.
>> > >>>>>>
>> > >>>>>> * Re grouping keys:
>> > >>>>>> I don't think we should automatically add them because the
>> > >>>>>> result
>> > >>>> schema
>> > >>>>>> would not be intuitive.
>> > >>>>>> Would they be added at the beginning of the tuple or at the end?
>> > >> What
>> > >>>>>> metadata fields of windows would be added? In which order would
>> > >> they
>> > >>> be
>> > >>>>>> added?
>> > >>>>>>
>> > >>>>>> However, we could support syntax like this:
>> > >>>>>> val t: Table = ???
>> > >>>>>> t
>> > >>>>>>  .window(Tumble ... as 'w)
>> > >>>>>>  .groupBy('a, 'b)
>> > >>>>>>  .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as
>> > >>>> 'rtime)
>> > >>>>>>
>> > >>>>>> The result schema would be clearly defined as [b, a, f1, f2,
>> > >>>>>> ...,
>> > >> fn,
>> > >>>>> wend,
>> > >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>> > >>>>>>
>> > >>>>>> * Re Multi-staged evaluation:
>> > >>>>>> I think this should be an optimization that can be applied if
>> > >>>>>> the
>> > >> UDF
>> > >>>>>> implements the merge() method.
>> > >>>>>>
>> > >>>>>> Best, Fabian
>> > >>>>>>
>> > >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
>> > >>>>>> wshaoxuan@xxxxxxxxx
>> > >>>>>>> :
>> > >>>>>>
>> > >>>>>>> Hi xiaowei,
>> > >>>>>>>
>> > >>>>>>> Yes, I agree with you that the semantics of
>> > >> TableAggregateFunction
>> > >>>> emit
>> > >>>>>> is
>> > >>>>>>> much more complex than AggregateFunction. The fundamental
>> > >>> difference
>> > >>>> is
>> > >>>>>>> that TableAggregateFunction emits a "table" while
>> > >> AggregateFunction
>> > >>>>>> outputs
>> > >>>>>>> (a column of) a "row". In the case of AggregateFunction it only
>> > >> has
>> > >>>> one
>> > >>>>>>> mode which is “replacing” (complete update). But for
>> > >>>>>>> TableAggregateFunction, it could be incremental (only emit the
>> > >> new
>> > >>>>>> updated
>> > >>>>>>> results) update or complete update (always emit the entire
>> > >>>>>>> table
>> > >>> when
>> > >>>>>>> “emit" is triggered).  From the performance perspective, we
>> > >>>>>>> might
>> > >>>> want
>> > >>>>> to
>> > >>>>>>> use incremental update. But we need review and design this
>> > >>> carefully,
>> > >>>>>>> especially taking into account the cases of the failover
>> > >>>>>>> (instead
>> > >>> of
>> > >>>>> just
>> > >>>>>>> back-up the ACC it may also needs to remember the emit offset)
>> > >> and
>> > >>>>>>> retractions, as the semantics of TableAggregateFunction emit
>> > >>>>>>> are
>> > >>>>>> different
>> > >>>>>>> than other UDFs. TableFunction also emits a table, but it does
>> > >> not
>> > >>>> need
>> > >>>>>> to
>> > >>>>>>> worry this due to the nature of stateless.
>> > >>>>>>>
>> > >>>>>>> Regards,
>> > >>>>>>> Shaoxuan
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
>> > >>>>>>> <xiaoweij@xxxxxxxxx
>> > >>>
>> > >>>>> wrote:
>> > >>>>>>>
>> > >>>>>>>> Hi Jincheng,
>> > >>>>>>>>
>> > >>>>>>>> Thanks for adding the public interfaces! I think that it's a
>> > >> very
>> > >>>>> good
>> > >>>>>>>> start. There are a few points that we need to have more
>> > >>>> discussions.
>> > >>>>>>>>
>> > >>>>>>>>   - TableAggregateFunction - this is a very complex beast,
>> > >>>>> definitely
>> > >>>>>>> the
>> > >>>>>>>>   most complex user defined objects we introduced so far. I
>> > >>> think
>> > >>>>>> there
>> > >>>>>>>> are
>> > >>>>>>>>   quite some interesting questions here. For example, do we
>> > >>> allow
>> > >>>>>>>>   multi-staged TableAggregate in this case? What is the
>> > >>> semantics
>> > >>>> of
>> > >>>>>>>> emit? Is
>> > >>>>>>>>   it amendments to the previous output, or replacing it? I
>> > >> think
>> > >>>>> that
>> > >>>>>>> this
>> > >>>>>>>>   subject itself is worth a discussion to make sure we get the
>> > >>>>> details
>> > >>>>>>>> right.
>> > >>>>>>>>   - GroupedTable.agg - does the group keys automatically
>> > >> appear
>> > >>> in
>> > >>>>> the
>> > >>>>>>>>   output? how about the case of windowing aggregation?
>> > >>>>>>>>
>> > >>>>>>>> Regards,
>> > >>>>>>>> Xiaowei
>> > >>>>>>>>
>> > >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
>> > >>>>> sunjincheng121@xxxxxxxxx>
>> > >>>>>>>> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> Hi, Xiaowei,
>> > >>>>>>>>>
>> > >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
>> > >>> Outline
>> > >>>> !
>> > >>>>>>>>>
>> > >>>>>>>>> I quickly looked at the overall content, these are good
>> > >>>> expressions
>> > >>>>>> of
>> > >>>>>>>> our
>> > >>>>>>>>> offline discussions. But from the points of my view, we
>> > >> should
>> > >>>> add
>> > >>>>>> the
>> > >>>>>>>>> usage of public interfaces that we will introduce in this
>> > >>>> propose.
>> > >>>>>>> So, I
>> > >>>>>>>>> added the following usage description of  interface and
>> > >>> operators
>> > >>>>> in
>> > >>>>>>>>> google doc:
>> > >>>>>>>>>
>> > >>>>>>>>> 1. Map Operator
>> > >>>>>>>>>    Map operator is a new operator of Table, Map operator can
>> > >>>>> apply a
>> > >>>>>>>>> scalar function, and can return multi-column. The usage as
>> > >>>> follows:
>> > >>>>>>>>>
>> > >>>>>>>>>  val res = tab
>> > >>>>>>>>>     .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>> > >>>>>>>>>     .select(‘a, ‘c)
>> > >>>>>>>>>
>> > >>>>>>>>> 2. FlatMap Operator
>> > >>>>>>>>>    FaltMap operator is a new operator of Table, FlatMap
>> > >>> operator
>> > >>>>> can
>> > >>>>>>>> apply
>> > >>>>>>>>> a table function, and can return multi-row. The usage as
>> > >>> follows:
>> > >>>>>>>>>
>> > >>>>>>>>>  val res = tab
>> > >>>>>>>>>      .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>> > >>>>>>>>>      .select(‘a, ‘c)
>> > >>>>>>>>>
>> > >>>>>>>>> 3. Agg Operator
>> > >>>>>>>>>    Agg operator is a new operator of Table/GroupedTable, Agg
>> > >>>>>> operator
>> > >>>>>>>> can
>> > >>>>>>>>> apply a aggregate function, and can return multi-column. The
>> > >>>> usage
>> > >>>>> as
>> > >>>>>>>>> follows:
>> > >>>>>>>>>
>> > >>>>>>>>>   val res = tab
>> > >>>>>>>>>      .groupBy(‘a) // leave groupBy-Clause out to define
>> > >> global
>> > >>>>>>>> aggregates
>> > >>>>>>>>>      .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>> > >>>>>>>>>      .select(‘a, ‘c)
>> > >>>>>>>>>
>> > >>>>>>>>> 4.  FlatAgg Operator
>> > >>>>>>>>>    FlatAgg operator is a new operator of Table/GroupedTable,
>> > >>>>> FaltAgg
>> > >>>>>>>>> operator can apply a table aggregate function, and can return
>> > >>>>>>> multi-row.
>> > >>>>>>>>> The usage as follows:
>> > >>>>>>>>>
>> > >>>>>>>>>    val res = tab
>> > >>>>>>>>>       .groupBy(‘a) // leave groupBy-Clause out to define
>> > >>> global
>> > >>>>>> table
>> > >>>>>>>>> aggregates
>> > >>>>>>>>>       .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>> > >>>>>>>>>       .select(‘a, ‘c)
>> > >>>>>>>>>
>> > >>>>>>>>>  5. TableAggregateFunction
>> > >>>>>>>>>     The behavior of table aggregates is most like
>> > >>>>>> GroupReduceFunction
>> > >>>>>>>> did,
>> > >>>>>>>>> which computed for a group of elements, and output  a group
>> > >> of
>> > >>>>>>> elements.
>> > >>>>>>>>> The TableAggregateFunction can be applied on
>> > >>>>> GroupedTable.flatAgg() .
>> > >>>>>>> The
>> > >>>>>>>>> interface of TableAggregateFunction has a lot of content, so
>> > >> I
>> > >>>>> don't
>> > >>>>>>> copy
>> > >>>>>>>>> it here, Please look at the detail in google doc:
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>> > >>>>>>>>>
>> > >>>>>>>>> I will be very appreciate to anyone for reviewing and
>> > >>> commenting.
>> > >>>>>>>>>
>> > >>>>>>>>> Best,
>> > >>>>>>>>> Jincheng
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> >
>>
>


-- 
-----------------------------------------------------------------------------------

*Rome was not built in one day*

-----------------------------------------------------------------------------------