osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Table API Enhancement Outline


Hi shaoxuan & Hequn,

Thanks for your suggestion,I'll file the JIRAs later.
We can prepare PRs while continuing to move forward the ongoing discussion.

Regards,
Jincheng

jincheng sun <sunjincheng121@xxxxxxxxx> 于2018年11月21日周三 下午7:07写道:

> Hi Piotrek,
> Thanks for your feedback, and thanks for  share your thoughts!
>
> #1) No,watermark solves the issue of the late event. Here, the performance
> problem is caused by the update emit mode. i.e.: When current calculation
> result is output, the previous calculation result needs to be retracted.
> #2) As I mentioned above we should continue the discussion until we solve
> the problems raised by Xiaowei and Fabian.
> #3)I still hope to keep the simplicity that select only support projected
> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
> flatmap('d)).
>
> Thanks,
> Jincheng
>
> Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> 于2018年11月21日周三 下午5:24写道:
>
>> Hi,
>>
>> 1.
>>
>> > In fact, in addition to the design of APIs, there will be various
>> > performance optimization details, such as: table Aggregate function
>> > emitValue will generate multiple calculation results, in extreme cases,
>> > each record will trigger a large number of retract messages, this will
>> have
>> > poor performance
>>
>> Can this be solved/mitigated by emitting the results only on watermarks?
>> I think that was the path that we decided to take both for Temporal Joins
>> and upsert stream conversion. I know that this increases the latency and
>> there is a place for a future global setting/user preference “emit the data
>> ASAP mode”, but emitting only on watermarks seems to me as a better/more
>> sane default.
>>
>> 2.
>>
>> With respect to the API discussion and implicit columns. The problem for
>> me so far is I’m not sure if I like the additionally complexity of
>> `append()` solution, while implicit columns are definitely not in the
>> spirit of SQL. Neither joins nor aggregations add extra unexpected columns
>> to the result without asking. This definitely can be confusing for the
>> users since it brakes the convention. Thus I would lean towards Fabian’s
>> proposal of multi-argument `map(Expression*)` from those 3 options.
>>
>> 3.
>>
>> Another topic is that I’m not 100% convinced that we should be adding new
>> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think
>> the same could be achieved by changing
>>
>> table.map(F('x))
>>
>> into
>>
>> table.select(F('x)).unnest()
>> or
>> table.select(F('x).unnest())
>>
>> Where `unnest()` means unnest row/tuple type into a columnar table.
>>
>> table.flatMap(F('x))
>>
>> Could be on the other hand also handled by
>>
>> table.select(F('x))
>>
>> By correctly deducing that F(x) is a multi row output function
>>
>> Same might apply to `aggregate(F('x))`, but this maybe could be replaced
>> by:
>>
>> table.groupBy(…).select(F('x).unnest())
>>
>> Adding scalar functions should also be possible:
>>
>> table.groupBy('k).select(F('x).unnest(), ‘k)
>>
>> Maybe such approach would allow us to implement the same features in the
>> SQL as well?
>>
>> Piotrek
>>
>> > On 21 Nov 2018, at 09:43, Hequn Cheng <chenghequn@xxxxxxxxx> wrote:
>> >
>> > Hi,
>> >
>> > Thank you all for the great proposal and discussion!
>> > I also prefer to move on to the next step, so +1 for opening the JIRAs
>> to
>> > start the work.
>> > We can have more detailed discussion there. Btw, we can start with JIRAs
>> > which we have agreed on.
>> >
>> > Best,
>> > Hequn
>> >
>> > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaoxuan@xxxxxxxxx>
>> wrote:
>> >
>> >> +1. I agree that we should open the JIRAs to start the work. We may
>> >> have better ideas on the flavor of the interface when implement/review
>> >> the code.
>> >>
>> >> Regards,
>> >> shaoxuan
>> >>
>> >>
>> >> On 11/20/18, jincheng sun <sunjincheng121@xxxxxxxxx> wrote:
>> >>> Hi all,
>> >>>
>> >>> Thanks all for the feedback.
>> >>>
>> >>> @Piotr About not using abbreviations naming,  +1,I like
>> >>> your proposal!Currently both DataSet and DataStream API are using
>> >>> `aggregate`,
>> >>> BTW,I find other language also not using abbreviations naming,such as
>> R.
>> >>>
>> >>> Sometimes the interface of the API is really difficult to perfect, we
>> >> need
>> >>> to spend a lot of time thinking and feedback from a large number of
>> >> users,
>> >>> and constantly improve, but for backward compatibility issues, we
>> have to
>> >>> adopt the most conservative approach when designing the API(Of
>> course, I
>> >> am
>> >>> more in favor of developing more rich features, when we discuss
>> clearly).
>> >>> Therefore, I propose to divide the function implementation of
>> >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that
>> >>> support time attributes and groupKeys. We can develop the features
>> which
>> >>> we  have already agreed on the design. And we will continue to discuss
>> >> the
>> >>> uncertain design.
>> >>>
>> >>> In fact, in addition to the design of APIs, there will be various
>> >>> performance optimization details, such as: table Aggregate function
>> >>> emitValue will generate multiple calculation results, in extreme
>> cases,
>> >>> each record will trigger a large number of retract messages, this will
>> >> have
>> >>> poor performance,so we will also optimize the interface design, such
>> as
>> >>> adding the emitWithRetractValue interface (I have updated the google
>> doc)
>> >>> to allow the user to optionally perform incremental calculations, thus
>> >>> avoiding a large number of retracts. Details like this are difficult
>> to
>> >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP
>> first,
>> >>> we develop designs that have been agreed upon and continue to discuss
>> >>> non-deterministic designs!  What do you think? @Fabian & Piotr &
>> XiaoWei
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>> Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月19日周一 上午12:07写道:
>> >>>
>> >>>> Hi Fabian & Piotr, thanks for the feedback!
>> >>>>
>> >>>> I appreciate your concerns, both on timestamp attributes as well as
>> on
>> >>>> implicit group keys. At the same time, I'm also concerned with the
>> >>>> proposed
>> >>>> approach of allowing Expression* as parameters, especially for
>> >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to
>> appear
>> >>>> together with table expressions. With the Expression* approach, this
>> >> will
>> >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on
>> if
>> >>>> we
>> >>>> fully understand the consequences when we try to extend our system in
>> >> the
>> >>>> future. I would be extra cautious in doing this. To avoid this, I
>> think
>> >>>> an
>> >>>> implicit group key for flatAgg is safer. For flatMap, if users want
>> to
>> >>>> keep
>> >>>> the rowtime column, he can use crossApply/join instead. So we are not
>> >>>> losing any real functionality here.
>> >>>>
>> >>>> Also a clarification on the following example:
>> >>>> tab.window(Tumble ... as 'w)
>> >>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> >>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>> >>>>    .select('k1, 'col1, 'w.rowtime as 'rtime)
>> >>>> If we did not have the select clause in this example, we will have
>> 'w as
>> >>>> a
>> >>>> regular column in the output. It should not magically disappear.
>> >>>>
>> >>>> The concern is not as strong for Table.map/Table.agg because we are
>> not
>> >>>> mixing scalar and table expressions. But we also want to be a bit
>> >>>> consistent with these methods. If we used implicit group keys for
>> >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we
>> only
>> >>>> have to choose what to do with Table.map. I can see good arguments
>> from
>> >>>> both sides. But starting with a single Expression seems safer because
>> >>>> that
>> >>>> we can always extend to Expression* in the future.
>> >>>>
>> >>>> While thinking about this problem, it appears that we may need more
>> work
>> >>>> in
>> >>>> our handling of watermarks for SQL/Table API. Our current way of
>> >>>> propagating the watermarks from source all the way to sink might not
>> be
>> >>>> optimal. For example, after a tumbling window, the watermark can
>> >> actually
>> >>>> be advanced to just before the expiring of next window. I think that
>> in
>> >>>> general, each operator may need to generate new watermarks instead of
>> >>>> simply propagating them. Once we accept that watermarks may change
>> >> during
>> >>>> the execution, it appears that the timestamp columns may also
>> change, as
>> >>>> long as we have some way to associate watermark with it. My
>> intuition is
>> >>>> that once we have a through solution for the watermark issue, we may
>> be
>> >>>> able to solve the problem we encountered for Table.map in a cleaner
>> way.
>> >>>> But this is a complex issue which deserves a discussion on its own.
>> >>>>
>> >>>> Regards,
>> >>>> Xiaowei
>> >>>>
>> >>>>
>> >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
>> >> piotr@xxxxxxxxxxxxxxxxx>
>> >>>> wrote:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> Isn’t the problem of multiple expressions limited only to `flat***`
>> >>>>> functions and to be more specific only to having two (or more)
>> >>>>> different
>> >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a),
>> >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined
>> >>>>> (duplicate result of every scalar function to every record. Or am I
>> >>>> missing
>> >>>>> something?
>> >>>>>
>> >>>>> Another remark, I would be in favour of not using abbreviations and
>> >>>> naming
>> >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
>> >>>>>
>> >>>>> Piotrek
>> >>>>>
>> >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
>> >>>>>>
>> >>>>>> Hi Jincheng,
>> >>>>>>
>> >>>>>> I said before, that I think that the append() method is better than
>> >>>>>> implicitly forwarding keys, but still, I believe it adds
>> unnecessary
>> >>>>> boiler
>> >>>>>> plate code.
>> >>>>>>
>> >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*)
>> >>>>>> is
>> >>>>>> worse than map(Expression). In either case we need to do all kinds
>> >> of
>> >>>>>> checks to prevent invalid use of functions.
>> >>>>>> If the method is not correctly used, we can emit a good error
>> >> message
>> >>>> and
>> >>>>>> documenting map(Expression*) will be easier than
>> >>>>> map(append(Expression*)),
>> >>>>>> in my opinion.
>> >>>>>> I think we should not add unnessary syntax unless there is a good
>> >>>> reason
>> >>>>>> and to be honest, I haven't seen this reason yet.
>> >>>>>>
>> >>>>>> Regarding the groupBy.agg() method, I think it should behave just
>> >>>>>> like
>> >>>>> any
>> >>>>>> other method, i.e., not do any implicit forwarding.
>> >>>>>> Let's take the example of the windowed group by, that you posted
>> >>>> before.
>> >>>>>>
>> >>>>>> tab.window(Tumble ... as 'w)
>> >>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> >>>>>>   .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>> >>>>>>   .select('k1, 'col1, 'w.rowtime as 'rtime)
>> >>>>>>
>> >>>>>> What happens if 'w.rowtime is not selected? What is the data type
>> of
>> >>>> the
>> >>>>>> field 'w in the resulting Table? Is it a regular field at all or
>> >> just
>> >>>>>> a
>> >>>>>> system field that disappears if it is not selected?
>> >>>>>>
>> >>>>>> IMO, the following syntax is shorter, more explicit, and better
>> >>>>>> aligned
>> >>>>>> with the regular window.groupBy.select aggregations that are
>> >>>>>> supported
>> >>>>>> today.
>> >>>>>>
>> >>>>>> tab.window(Tumble ... as 'w)
>> >>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> >>>>>>   .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
>> >>>>>>
>> >>>>>>
>> >>>>>> Best, Fabian
>> >>>>>>
>> >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
>> >>>>>> sunjincheng121@xxxxxxxxx>:
>> >>>>>>
>> >>>>>>> Hi Fabian/Xiaowei,
>> >>>>>>>
>> >>>>>>> I am very sorry for my late reply! Glad to see your reply, and
>> >>>>>>> sounds
>> >>>>>>> pretty good!
>> >>>>>>> I agree that the approach with append() which can clearly defined
>> >>>>>>> the
>> >>>>>>> result schema is better which Fabian mentioned.
>> >>>>>>> In addition and append() and also contains non-time attributes,
>> >>>>>>> e.g.:
>> >>>>>>>
>> >>>>>>>   tab('name, 'age, 'address, 'rowtime)
>> >>>>>>>   tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
>> >>>>>>> 'address, 'rowtime)
>> >>>>>>>   .window(Tumble over 5.millis on 'rowtime as 'w)
>> >>>>>>>   .groupBy('w, 'address)
>> >>>>>>>
>> >>>>>>> In this way the append() is very useful, and the behavior is very
>> >>>>> similar
>> >>>>>>> to withForwardedFields() in DataSet.
>> >>>>>>> So +1 to using append() approach for the map()&flatmap()!
>> >>>>>>>
>> >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
>> >>>>>>> Xiaowei's approach that define the keys to be implied in the
>> result
>> >>>>> table
>> >>>>>>> and appears at the beginning, for example as follows:
>> >>>>>>> tab.window(Tumble ... as 'w)
>> >>>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> >>>>>>>   .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>> >>>>>>>   .select('k1, 'col1, 'w.rowtime as 'rtime)
>> >>>>>>>
>> >>>>>>> What to you think? @Fabian @Xiaowei
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Jincheng
>> >>>>>>>
>> >>>>>>> Fabian Hueske <fhueske@xxxxxxxxx> 于2018年11月9日周五 下午6:35写道:
>> >>>>>>>
>> >>>>>>>> Hi Jincheng,
>> >>>>>>>>
>> >>>>>>>> Thanks for the summary!
>> >>>>>>>> I like the approach with append() better than the implicit
>> >>>>>>>> forwarding
>> >>>>> as
>> >>>>>>> it
>> >>>>>>>> clearly indicates which fields are forwarded.
>> >>>>>>>> However, I don't see much benefit over the flatMap(Expression*)
>> >>>>> variant,
>> >>>>>>> as
>> >>>>>>>> we would still need to analyze the full expression tree to ensure
>> >>>> that
>> >>>>> at
>> >>>>>>>> most (or exactly?) one Scalar / TableFunction is used.
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Fabian
>> >>>>>>>>
>> >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
>> >>>>>>>> sunjincheng121@xxxxxxxxx>:
>> >>>>>>>>
>> >>>>>>>>> Hi all,
>> >>>>>>>>>
>> >>>>>>>>> We are discussing very detailed content about this proposal. We
>> >>>>>>>>> are
>> >>>>>>>> trying
>> >>>>>>>>> to design the API in many aspects (functionality, compatibility,
>> >>>> ease
>> >>>>>>> of
>> >>>>>>>>> use, etc.). I think this is a very good process. Only such a
>> >>>> detailed
>> >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in
>> >>>>>>>>> the
>> >>>>>>> later
>> >>>>>>>>> stage. I am very grateful to @Fabian and  @Xiaowei for sharing a
>> >>>>>>>>> lot
>> >>>>> of
>> >>>>>>>>> good ideas.
>> >>>>>>>>> About the definition of method signatures I want to share my
>> >>>>>>>>> points
>> >>>>>>> here
>> >>>>>>>>> which I am discussing with fabian in google doc (not yet
>> >>>>>>>>> completed),
>> >>>>> as
>> >>>>>>>>> follows:
>> >>>>>>>>>
>> >>>>>>>>> Assume we have a table:
>> >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long,
>> >> 'string,
>> >>>>>>>>> 'proctime.proctime)
>> >>>>>>>>>
>> >>>>>>>>> Approach 1:
>> >>>>>>>>> case1: Map follows Source Table
>> >>>>>>>>> val result =
>> >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime
>> >>>> implied
>> >>>>>>> in
>> >>>>>>>>> the output
>> >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
>> >>>>>>>>>
>> >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above)
>> >>>>>>>>> val result =
>> >>>>>>>>>   tab.window(Tumble ... as 'w)
>> >>>>>>>>>      .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>> >>>>>>>>>      .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
>> >>>>>>>>>      .select('k1, 'col1, 'w.rowtime as 'rtime)
>> >>>>>>>>>
>> >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result
>> schema
>> >>>>> would
>> >>>>>>>> be
>> >>>>>>>>> clearly defined, but add a built-in append UDF. That make
>> >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression.
>> >>>>>>>>> val result =
>> >>>>>>>>>   tab.map(append(udf('string), 'long, 'proctime)) as ('col1,
>> >>>>>>>>> 'col2,
>> >>>>>>>>> 'long, 'proctime)
>> >>>>>>>>>    .window(Tumble over 5.millis on 'proctime as 'w)
>> >>>>>>>>>
>> >>>>>>>>> Note: Append is a special UDF for built-in that can pass through
>> >>>>>>>>> any
>> >>>>>>>>> column.
>> >>>>>>>>>
>> >>>>>>>>> So, May be we can defined the as  table.map(Expression)  first,
>> >> If
>> >>>>>>>>> necessary, we can extend to table.map(Expression*)  in the
>> future
>> >>>>>>>>> ?
>> >>>>> Of
>> >>>>>>>>> course, I also hope that we can do more perfection in this
>> >>>>>>>>> proposal
>> >>>>>>>> through
>> >>>>>>>>> discussion.
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Jincheng
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月7日周三 下午11:45写道:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Fabian,
>> >>>>>>>>>>
>> >>>>>>>>>> I think that the key question you raised is if we allow extra
>> >>>>>>>> parameters
>> >>>>>>>>> in
>> >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing
>> that
>> >>>> may
>> >>>>>>>>> appear
>> >>>>>>>>>> more convenient in some cases. However, it might also cause
>> some
>> >>>>>>>>> confusions
>> >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these
>> >>>>>>>>> expressions?
>> >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does
>> >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean?
>> >>>>>>>>>> Even
>> >>>>>>>> though
>> >>>>>>>>>> not allowing it may appear less powerful, but it can make
>> things
>> >>>> more
>> >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the
>> >> keys
>> >>>> to
>> >>>>>>> be
>> >>>>>>>>>> implied in the result table and appears at the beginning. You
>> >> can
>> >>>>>>> use a
>> >>>>>>>>>> select method if you want to modify this behavior. I think that
>> >>>>>>>>> eventually
>> >>>>>>>>>> we will have some API which allows other expressions as
>> >>>>>>>>>> additional
>> >>>>>>>>>> parameters, but I think it's better to do that after we
>> >> introduce
>> >>>> the
>> >>>>>>>>>> concept of nested tables. A lot of things we suggested here can
>> >>>>>>>>>> be
>> >>>>>>>>>> considered as special cases of that. But things are much
>> simpler
>> >>>>>>>>>> if
>> >>>>>>> we
>> >>>>>>>>>> leave that to later.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Xiaowei
>> >>>>>>>>>>
>> >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <
>> fhueske@xxxxxxxxx
>> >>>
>> >>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi,
>> >>>>>>>>>>>
>> >>>>>>>>>>> * Re emit:
>> >>>>>>>>>>> I think we should start with a well understood semantics of
>> >> full
>> >>>>>>>>>>> replacement. This is how the other agg functions work.
>> >>>>>>>>>>> As was said before, there are open questions regarding an
>> >> append
>> >>>>>>> mode
>> >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if
>> >> yes
>> >>>>>>> how
>> >>>>>>>> to
>> >>>>>>>>>>> declare them, ...).
>> >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it.
>> >>>>>>>>>>>
>> >>>>>>>>>>> * Re grouping keys:
>> >>>>>>>>>>> I don't think we should automatically add them because the
>> >>>>>>>>>>> result
>> >>>>>>>>> schema
>> >>>>>>>>>>> would not be intuitive.
>> >>>>>>>>>>> Would they be added at the beginning of the tuple or at the
>> >> end?
>> >>>>>>> What
>> >>>>>>>>>>> metadata fields of windows would be added? In which order
>> would
>> >>>>>>> they
>> >>>>>>>> be
>> >>>>>>>>>>> added?
>> >>>>>>>>>>>
>> >>>>>>>>>>> However, we could support syntax like this:
>> >>>>>>>>>>> val t: Table = ???
>> >>>>>>>>>>> t
>> >>>>>>>>>>> .window(Tumble ... as 'w)
>> >>>>>>>>>>> .groupBy('a, 'b)
>> >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime
>> >> as
>> >>>>>>>>> 'rtime)
>> >>>>>>>>>>>
>> >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2,
>> >>>>>>>>>>> ...,
>> >>>>>>> fn,
>> >>>>>>>>>> wend,
>> >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>> >>>>>>>>>>>
>> >>>>>>>>>>> * Re Multi-staged evaluation:
>> >>>>>>>>>>> I think this should be an optimization that can be applied if
>> >>>>>>>>>>> the
>> >>>>>>> UDF
>> >>>>>>>>>>> implements the merge() method.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best, Fabian
>> >>>>>>>>>>>
>> >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
>> >>>>>>>>>>> wshaoxuan@xxxxxxxxx
>> >>>>>>>>>>>> :
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi xiaowei,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Yes, I agree with you that the semantics of
>> >>>>>>> TableAggregateFunction
>> >>>>>>>>> emit
>> >>>>>>>>>>> is
>> >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental
>> >>>>>>>> difference
>> >>>>>>>>> is
>> >>>>>>>>>>>> that TableAggregateFunction emits a "table" while
>> >>>>>>> AggregateFunction
>> >>>>>>>>>>> outputs
>> >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it
>> >> only
>> >>>>>>> has
>> >>>>>>>>> one
>> >>>>>>>>>>>> mode which is “replacing” (complete update). But for
>> >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit
>> the
>> >>>>>>> new
>> >>>>>>>>>>> updated
>> >>>>>>>>>>>> results) update or complete update (always emit the entire
>> >>>>>>>>>>>> table
>> >>>>>>>> when
>> >>>>>>>>>>>> “emit" is triggered).  From the performance perspective, we
>> >>>>>>>>>>>> might
>> >>>>>>>>> want
>> >>>>>>>>>> to
>> >>>>>>>>>>>> use incremental update. But we need review and design this
>> >>>>>>>> carefully,
>> >>>>>>>>>>>> especially taking into account the cases of the failover
>> >>>>>>>>>>>> (instead
>> >>>>>>>> of
>> >>>>>>>>>> just
>> >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit
>> offset)
>> >>>>>>> and
>> >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit
>> >>>>>>>>>>>> are
>> >>>>>>>>>>> different
>> >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it
>> does
>> >>>>>>> not
>> >>>>>>>>> need
>> >>>>>>>>>>> to
>> >>>>>>>>>>>> worry this due to the nature of stateless.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Regards,
>> >>>>>>>>>>>> Shaoxuan
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
>> >>>>>>>>>>>> <xiaoweij@xxxxxxxxx
>> >>>>>>>>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi Jincheng,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a
>> >>>>>>> very
>> >>>>>>>>>> good
>> >>>>>>>>>>>>> start. There are a few points that we need to have more
>> >>>>>>>>> discussions.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>  - TableAggregateFunction - this is a very complex beast,
>> >>>>>>>>>> definitely
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>  most complex user defined objects we introduced so far. I
>> >>>>>>>> think
>> >>>>>>>>>>> there
>> >>>>>>>>>>>>> are
>> >>>>>>>>>>>>>  quite some interesting questions here. For example, do we
>> >>>>>>>> allow
>> >>>>>>>>>>>>>  multi-staged TableAggregate in this case? What is the
>> >>>>>>>> semantics
>> >>>>>>>>> of
>> >>>>>>>>>>>>> emit? Is
>> >>>>>>>>>>>>>  it amendments to the previous output, or replacing it? I
>> >>>>>>> think
>> >>>>>>>>>> that
>> >>>>>>>>>>>> this
>> >>>>>>>>>>>>>  subject itself is worth a discussion to make sure we get
>> >> the
>> >>>>>>>>>> details
>> >>>>>>>>>>>>> right.
>> >>>>>>>>>>>>>  - GroupedTable.agg - does the group keys automatically
>> >>>>>>> appear
>> >>>>>>>> in
>> >>>>>>>>>> the
>> >>>>>>>>>>>>>  output? how about the case of windowing aggregation?
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>> Xiaowei
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
>> >>>>>>>>>> sunjincheng121@xxxxxxxxx>
>> >>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Hi, Xiaowei,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
>> >>>>>>>> Outline
>> >>>>>>>>> !
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I quickly looked at the overall content, these are good
>> >>>>>>>>> expressions
>> >>>>>>>>>>> of
>> >>>>>>>>>>>>> our
>> >>>>>>>>>>>>>> offline discussions. But from the points of my view, we
>> >>>>>>> should
>> >>>>>>>>> add
>> >>>>>>>>>>> the
>> >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this
>> >>>>>>>>> propose.
>> >>>>>>>>>>>> So, I
>> >>>>>>>>>>>>>> added the following usage description of  interface and
>> >>>>>>>> operators
>> >>>>>>>>>> in
>> >>>>>>>>>>>>>> google doc:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 1. Map Operator
>> >>>>>>>>>>>>>>   Map operator is a new operator of Table, Map operator can
>> >>>>>>>>>> apply a
>> >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as
>> >>>>>>>>> follows:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> val res = tab
>> >>>>>>>>>>>>>>    .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>> >>>>>>>>>>>>>>    .select(‘a, ‘c)
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 2. FlatMap Operator
>> >>>>>>>>>>>>>>   FaltMap operator is a new operator of Table, FlatMap
>> >>>>>>>> operator
>> >>>>>>>>>> can
>> >>>>>>>>>>>>> apply
>> >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as
>> >>>>>>>> follows:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> val res = tab
>> >>>>>>>>>>>>>>     .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>> >>>>>>>>>>>>>>     .select(‘a, ‘c)
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 3. Agg Operator
>> >>>>>>>>>>>>>>   Agg operator is a new operator of Table/GroupedTable, Agg
>> >>>>>>>>>>> operator
>> >>>>>>>>>>>>> can
>> >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column.
>> The
>> >>>>>>>>> usage
>> >>>>>>>>>> as
>> >>>>>>>>>>>>>> follows:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>  val res = tab
>> >>>>>>>>>>>>>>     .groupBy(‘a) // leave groupBy-Clause out to define
>> >>>>>>> global
>> >>>>>>>>>>>>> aggregates
>> >>>>>>>>>>>>>>     .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>> >>>>>>>>>>>>>>     .select(‘a, ‘c)
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 4.  FlatAgg Operator
>> >>>>>>>>>>>>>>   FlatAgg operator is a new operator of Table/GroupedTable,
>> >>>>>>>>>> FaltAgg
>> >>>>>>>>>>>>>> operator can apply a table aggregate function, and can
>> >> return
>> >>>>>>>>>>>> multi-row.
>> >>>>>>>>>>>>>> The usage as follows:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>   val res = tab
>> >>>>>>>>>>>>>>      .groupBy(‘a) // leave groupBy-Clause out to define
>> >>>>>>>> global
>> >>>>>>>>>>> table
>> >>>>>>>>>>>>>> aggregates
>> >>>>>>>>>>>>>>      .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>> >>>>>>>>>>>>>>      .select(‘a, ‘c)
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 5. TableAggregateFunction
>> >>>>>>>>>>>>>>    The behavior of table aggregates is most like
>> >>>>>>>>>>> GroupReduceFunction
>> >>>>>>>>>>>>> did,
>> >>>>>>>>>>>>>> which computed for a group of elements, and output  a group
>> >>>>>>> of
>> >>>>>>>>>>>> elements.
>> >>>>>>>>>>>>>> The TableAggregateFunction can be applied on
>> >>>>>>>>>> GroupedTable.flatAgg() .
>> >>>>>>>>>>>> The
>> >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content,
>> so
>> >>>>>>> I
>> >>>>>>>>>> don't
>> >>>>>>>>>>>> copy
>> >>>>>>>>>>>>>> it here, Please look at the detail in google doc:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and
>> >>>>>>>> commenting.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>> Jincheng
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >> --
>> >>
>> >>
>> -----------------------------------------------------------------------------------
>> >>
>> >> *Rome was not built in one day*
>> >>
>> >>
>> >>
>> -----------------------------------------------------------------------------------
>> >>
>>
>>