osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Table API Enhancement Outline


Hi Piotrek,
Thanks for your feedback, and thanks for  share your thoughts!

#1) No,watermark solves the issue of the late event. Here, the performance
problem is caused by the update emit mode. i.e.: When current calculation
result is output, the previous calculation result needs to be retracted.
#2) As I mentioned above we should continue the discussion until we solve
the problems raised by Xiaowei and Fabian.
#3)I still hope to keep the simplicity that select only support projected
scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
flatmap('d)).

Thanks,
Jincheng

Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> 于2018年11月21日周三 下午5:24写道:

> Hi,
>
> 1.
>
> > In fact, in addition to the design of APIs, there will be various
> > performance optimization details, such as: table Aggregate function
> > emitValue will generate multiple calculation results, in extreme cases,
> > each record will trigger a large number of retract messages, this will
> have
> > poor performance
>
> Can this be solved/mitigated by emitting the results only on watermarks? I
> think that was the path that we decided to take both for Temporal Joins and
> upsert stream conversion. I know that this increases the latency and there
> is a place for a future global setting/user preference “emit the data ASAP
> mode”, but emitting only on watermarks seems to me as a better/more sane
> default.
>
> 2.
>
> With respect to the API discussion and implicit columns. The problem for
> me so far is I’m not sure if I like the additionally complexity of
> `append()` solution, while implicit columns are definitely not in the
> spirit of SQL. Neither joins nor aggregations add extra unexpected columns
> to the result without asking. This definitely can be confusing for the
> users since it brakes the convention. Thus I would lean towards Fabian’s
> proposal of multi-argument `map(Expression*)` from those 3 options.
>
> 3.
>
> Another topic is that I’m not 100% convinced that we should be adding new
> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think
> the same could be achieved by changing
>
> table.map(F('x))
>
> into
>
> table.select(F('x)).unnest()
> or
> table.select(F('x).unnest())
>
> Where `unnest()` means unnest row/tuple type into a columnar table.
>
> table.flatMap(F('x))
>
> Could be on the other hand also handled by
>
> table.select(F('x))
>
> By correctly deducing that F(x) is a multi row output function
>
> Same might apply to `aggregate(F('x))`, but this maybe could be replaced
> by:
>
> table.groupBy(…).select(F('x).unnest())
>
> Adding scalar functions should also be possible:
>
> table.groupBy('k).select(F('x).unnest(), ‘k)
>
> Maybe such approach would allow us to implement the same features in the
> SQL as well?
>
> Piotrek
>
> > On 21 Nov 2018, at 09:43, Hequn Cheng <chenghequn@xxxxxxxxx> wrote:
> >
> > Hi,
> >
> > Thank you all for the great proposal and discussion!
> > I also prefer to move on to the next step, so +1 for opening the JIRAs to
> > start the work.
> > We can have more detailed discussion there. Btw, we can start with JIRAs
> > which we have agreed on.
> >
> > Best,
> > Hequn
> >
> > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <wshaoxuan@xxxxxxxxx>
> wrote:
> >
> >> +1. I agree that we should open the JIRAs to start the work. We may
> >> have better ideas on the flavor of the interface when implement/review
> >> the code.
> >>
> >> Regards,
> >> shaoxuan
> >>
> >>
> >> On 11/20/18, jincheng sun <sunjincheng121@xxxxxxxxx> wrote:
> >>> Hi all,
> >>>
> >>> Thanks all for the feedback.
> >>>
> >>> @Piotr About not using abbreviations naming,  +1,I like
> >>> your proposal!Currently both DataSet and DataStream API are using
> >>> `aggregate`,
> >>> BTW,I find other language also not using abbreviations naming,such as
> R.
> >>>
> >>> Sometimes the interface of the API is really difficult to perfect, we
> >> need
> >>> to spend a lot of time thinking and feedback from a large number of
> >> users,
> >>> and constantly improve, but for backward compatibility issues, we have
> to
> >>> adopt the most conservative approach when designing the API(Of course,
> I
> >> am
> >>> more in favor of developing more rich features, when we discuss
> clearly).
> >>> Therefore, I propose to divide the function implementation of
> >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that
> >>> support time attributes and groupKeys. We can develop the features
> which
> >>> we  have already agreed on the design. And we will continue to discuss
> >> the
> >>> uncertain design.
> >>>
> >>> In fact, in addition to the design of APIs, there will be various
> >>> performance optimization details, such as: table Aggregate function
> >>> emitValue will generate multiple calculation results, in extreme cases,
> >>> each record will trigger a large number of retract messages, this will
> >> have
> >>> poor performance,so we will also optimize the interface design, such as
> >>> adding the emitWithRetractValue interface (I have updated the google
> doc)
> >>> to allow the user to optionally perform incremental calculations, thus
> >>> avoiding a large number of retracts. Details like this are difficult to
> >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP
> first,
> >>> we develop designs that have been agreed upon and continue to discuss
> >>> non-deterministic designs!  What do you think? @Fabian & Piotr &
> XiaoWei
> >>>
> >>> Best,
> >>> Jincheng
> >>>
> >>> Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月19日周一 上午12:07写道:
> >>>
> >>>> Hi Fabian & Piotr, thanks for the feedback!
> >>>>
> >>>> I appreciate your concerns, both on timestamp attributes as well as on
> >>>> implicit group keys. At the same time, I'm also concerned with the
> >>>> proposed
> >>>> approach of allowing Expression* as parameters, especially for
> >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to
> appear
> >>>> together with table expressions. With the Expression* approach, this
> >> will
> >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on
> if
> >>>> we
> >>>> fully understand the consequences when we try to extend our system in
> >> the
> >>>> future. I would be extra cautious in doing this. To avoid this, I
> think
> >>>> an
> >>>> implicit group key for flatAgg is safer. For flatMap, if users want to
> >>>> keep
> >>>> the rowtime column, he can use crossApply/join instead. So we are not
> >>>> losing any real functionality here.
> >>>>
> >>>> Also a clarification on the following example:
> >>>> tab.window(Tumble ... as 'w)
> >>>>    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>>>    .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> >>>>    .select('k1, 'col1, 'w.rowtime as 'rtime)
> >>>> If we did not have the select clause in this example, we will have 'w
> as
> >>>> a
> >>>> regular column in the output. It should not magically disappear.
> >>>>
> >>>> The concern is not as strong for Table.map/Table.agg because we are
> not
> >>>> mixing scalar and table expressions. But we also want to be a bit
> >>>> consistent with these methods. If we used implicit group keys for
> >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we
> only
> >>>> have to choose what to do with Table.map. I can see good arguments
> from
> >>>> both sides. But starting with a single Expression seems safer because
> >>>> that
> >>>> we can always extend to Expression* in the future.
> >>>>
> >>>> While thinking about this problem, it appears that we may need more
> work
> >>>> in
> >>>> our handling of watermarks for SQL/Table API. Our current way of
> >>>> propagating the watermarks from source all the way to sink might not
> be
> >>>> optimal. For example, after a tumbling window, the watermark can
> >> actually
> >>>> be advanced to just before the expiring of next window. I think that
> in
> >>>> general, each operator may need to generate new watermarks instead of
> >>>> simply propagating them. Once we accept that watermarks may change
> >> during
> >>>> the execution, it appears that the timestamp columns may also change,
> as
> >>>> long as we have some way to associate watermark with it. My intuition
> is
> >>>> that once we have a through solution for the watermark issue, we may
> be
> >>>> able to solve the problem we encountered for Table.map in a cleaner
> way.
> >>>> But this is a complex issue which deserves a discussion on its own.
> >>>>
> >>>> Regards,
> >>>> Xiaowei
> >>>>
> >>>>
> >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
> >> piotr@xxxxxxxxxxxxxxxxx>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Isn’t the problem of multiple expressions limited only to `flat***`
> >>>>> functions and to be more specific only to having two (or more)
> >>>>> different
> >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a),
> >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined
> >>>>> (duplicate result of every scalar function to every record. Or am I
> >>>> missing
> >>>>> something?
> >>>>>
> >>>>> Another remark, I would be in favour of not using abbreviations and
> >>>> naming
> >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
> >>>>>
> >>>>> Piotrek
> >>>>>
> >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
> >>>>>>
> >>>>>> Hi Jincheng,
> >>>>>>
> >>>>>> I said before, that I think that the append() method is better than
> >>>>>> implicitly forwarding keys, but still, I believe it adds unnecessary
> >>>>> boiler
> >>>>>> plate code.
> >>>>>>
> >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*)
> >>>>>> is
> >>>>>> worse than map(Expression). In either case we need to do all kinds
> >> of
> >>>>>> checks to prevent invalid use of functions.
> >>>>>> If the method is not correctly used, we can emit a good error
> >> message
> >>>> and
> >>>>>> documenting map(Expression*) will be easier than
> >>>>> map(append(Expression*)),
> >>>>>> in my opinion.
> >>>>>> I think we should not add unnessary syntax unless there is a good
> >>>> reason
> >>>>>> and to be honest, I haven't seen this reason yet.
> >>>>>>
> >>>>>> Regarding the groupBy.agg() method, I think it should behave just
> >>>>>> like
> >>>>> any
> >>>>>> other method, i.e., not do any implicit forwarding.
> >>>>>> Let's take the example of the windowed group by, that you posted
> >>>> before.
> >>>>>>
> >>>>>> tab.window(Tumble ... as 'w)
> >>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>>>>>   .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> >>>>>>   .select('k1, 'col1, 'w.rowtime as 'rtime)
> >>>>>>
> >>>>>> What happens if 'w.rowtime is not selected? What is the data type of
> >>>> the
> >>>>>> field 'w in the resulting Table? Is it a regular field at all or
> >> just
> >>>>>> a
> >>>>>> system field that disappears if it is not selected?
> >>>>>>
> >>>>>> IMO, the following syntax is shorter, more explicit, and better
> >>>>>> aligned
> >>>>>> with the regular window.groupBy.select aggregations that are
> >>>>>> supported
> >>>>>> today.
> >>>>>>
> >>>>>> tab.window(Tumble ... as 'w)
> >>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>>>>>   .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
> >>>>>>
> >>>>>>
> >>>>>> Best, Fabian
> >>>>>>
> >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
> >>>>>> sunjincheng121@xxxxxxxxx>:
> >>>>>>
> >>>>>>> Hi Fabian/Xiaowei,
> >>>>>>>
> >>>>>>> I am very sorry for my late reply! Glad to see your reply, and
> >>>>>>> sounds
> >>>>>>> pretty good!
> >>>>>>> I agree that the approach with append() which can clearly defined
> >>>>>>> the
> >>>>>>> result schema is better which Fabian mentioned.
> >>>>>>> In addition and append() and also contains non-time attributes,
> >>>>>>> e.g.:
> >>>>>>>
> >>>>>>>   tab('name, 'age, 'address, 'rowtime)
> >>>>>>>   tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
> >>>>>>> 'address, 'rowtime)
> >>>>>>>   .window(Tumble over 5.millis on 'rowtime as 'w)
> >>>>>>>   .groupBy('w, 'address)
> >>>>>>>
> >>>>>>> In this way the append() is very useful, and the behavior is very
> >>>>> similar
> >>>>>>> to withForwardedFields() in DataSet.
> >>>>>>> So +1 to using append() approach for the map()&flatmap()!
> >>>>>>>
> >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
> >>>>>>> Xiaowei's approach that define the keys to be implied in the result
> >>>>> table
> >>>>>>> and appears at the beginning, for example as follows:
> >>>>>>> tab.window(Tumble ... as 'w)
> >>>>>>>   .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>>>>>>   .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
> >>>>>>>   .select('k1, 'col1, 'w.rowtime as 'rtime)
> >>>>>>>
> >>>>>>> What to you think? @Fabian @Xiaowei
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Jincheng
> >>>>>>>
> >>>>>>> Fabian Hueske <fhueske@xxxxxxxxx> 于2018年11月9日周五 下午6:35写道:
> >>>>>>>
> >>>>>>>> Hi Jincheng,
> >>>>>>>>
> >>>>>>>> Thanks for the summary!
> >>>>>>>> I like the approach with append() better than the implicit
> >>>>>>>> forwarding
> >>>>> as
> >>>>>>> it
> >>>>>>>> clearly indicates which fields are forwarded.
> >>>>>>>> However, I don't see much benefit over the flatMap(Expression*)
> >>>>> variant,
> >>>>>>> as
> >>>>>>>> we would still need to analyze the full expression tree to ensure
> >>>> that
> >>>>> at
> >>>>>>>> most (or exactly?) one Scalar / TableFunction is used.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Fabian
> >>>>>>>>
> >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
> >>>>>>>> sunjincheng121@xxxxxxxxx>:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> We are discussing very detailed content about this proposal. We
> >>>>>>>>> are
> >>>>>>>> trying
> >>>>>>>>> to design the API in many aspects (functionality, compatibility,
> >>>> ease
> >>>>>>> of
> >>>>>>>>> use, etc.). I think this is a very good process. Only such a
> >>>> detailed
> >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in
> >>>>>>>>> the
> >>>>>>> later
> >>>>>>>>> stage. I am very grateful to @Fabian and  @Xiaowei for sharing a
> >>>>>>>>> lot
> >>>>> of
> >>>>>>>>> good ideas.
> >>>>>>>>> About the definition of method signatures I want to share my
> >>>>>>>>> points
> >>>>>>> here
> >>>>>>>>> which I am discussing with fabian in google doc (not yet
> >>>>>>>>> completed),
> >>>>> as
> >>>>>>>>> follows:
> >>>>>>>>>
> >>>>>>>>> Assume we have a table:
> >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long,
> >> 'string,
> >>>>>>>>> 'proctime.proctime)
> >>>>>>>>>
> >>>>>>>>> Approach 1:
> >>>>>>>>> case1: Map follows Source Table
> >>>>>>>>> val result =
> >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime
> >>>> implied
> >>>>>>> in
> >>>>>>>>> the output
> >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
> >>>>>>>>>
> >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above)
> >>>>>>>>> val result =
> >>>>>>>>>   tab.window(Tumble ... as 'w)
> >>>>>>>>>      .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> >>>>>>>>>      .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
> >>>>>>>>>      .select('k1, 'col1, 'w.rowtime as 'rtime)
> >>>>>>>>>
> >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result schema
> >>>>> would
> >>>>>>>> be
> >>>>>>>>> clearly defined, but add a built-in append UDF. That make
> >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression.
> >>>>>>>>> val result =
> >>>>>>>>>   tab.map(append(udf('string), 'long, 'proctime)) as ('col1,
> >>>>>>>>> 'col2,
> >>>>>>>>> 'long, 'proctime)
> >>>>>>>>>    .window(Tumble over 5.millis on 'proctime as 'w)
> >>>>>>>>>
> >>>>>>>>> Note: Append is a special UDF for built-in that can pass through
> >>>>>>>>> any
> >>>>>>>>> column.
> >>>>>>>>>
> >>>>>>>>> So, May be we can defined the as  table.map(Expression)  first,
> >> If
> >>>>>>>>> necessary, we can extend to table.map(Expression*)  in the future
> >>>>>>>>> ?
> >>>>> Of
> >>>>>>>>> course, I also hope that we can do more perfection in this
> >>>>>>>>> proposal
> >>>>>>>> through
> >>>>>>>>> discussion.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Jincheng
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Xiaowei Jiang <xiaoweij@xxxxxxxxx> 于2018年11月7日周三 下午11:45写道:
> >>>>>>>>>
> >>>>>>>>>> Hi Fabian,
> >>>>>>>>>>
> >>>>>>>>>> I think that the key question you raised is if we allow extra
> >>>>>>>> parameters
> >>>>>>>>> in
> >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that
> >>>> may
> >>>>>>>>> appear
> >>>>>>>>>> more convenient in some cases. However, it might also cause some
> >>>>>>>>> confusions
> >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these
> >>>>>>>>> expressions?
> >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does
> >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean?
> >>>>>>>>>> Even
> >>>>>>>> though
> >>>>>>>>>> not allowing it may appear less powerful, but it can make things
> >>>> more
> >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the
> >> keys
> >>>> to
> >>>>>>> be
> >>>>>>>>>> implied in the result table and appears at the beginning. You
> >> can
> >>>>>>> use a
> >>>>>>>>>> select method if you want to modify this behavior. I think that
> >>>>>>>>> eventually
> >>>>>>>>>> we will have some API which allows other expressions as
> >>>>>>>>>> additional
> >>>>>>>>>> parameters, but I think it's better to do that after we
> >> introduce
> >>>> the
> >>>>>>>>>> concept of nested tables. A lot of things we suggested here can
> >>>>>>>>>> be
> >>>>>>>>>> considered as special cases of that. But things are much simpler
> >>>>>>>>>> if
> >>>>>>> we
> >>>>>>>>>> leave that to later.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Xiaowei
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhueske@xxxxxxxxx
> >>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> * Re emit:
> >>>>>>>>>>> I think we should start with a well understood semantics of
> >> full
> >>>>>>>>>>> replacement. This is how the other agg functions work.
> >>>>>>>>>>> As was said before, there are open questions regarding an
> >> append
> >>>>>>> mode
> >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if
> >> yes
> >>>>>>> how
> >>>>>>>> to
> >>>>>>>>>>> declare them, ...).
> >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it.
> >>>>>>>>>>>
> >>>>>>>>>>> * Re grouping keys:
> >>>>>>>>>>> I don't think we should automatically add them because the
> >>>>>>>>>>> result
> >>>>>>>>> schema
> >>>>>>>>>>> would not be intuitive.
> >>>>>>>>>>> Would they be added at the beginning of the tuple or at the
> >> end?
> >>>>>>> What
> >>>>>>>>>>> metadata fields of windows would be added? In which order would
> >>>>>>> they
> >>>>>>>> be
> >>>>>>>>>>> added?
> >>>>>>>>>>>
> >>>>>>>>>>> However, we could support syntax like this:
> >>>>>>>>>>> val t: Table = ???
> >>>>>>>>>>> t
> >>>>>>>>>>> .window(Tumble ... as 'w)
> >>>>>>>>>>> .groupBy('a, 'b)
> >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime
> >> as
> >>>>>>>>> 'rtime)
> >>>>>>>>>>>
> >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2,
> >>>>>>>>>>> ...,
> >>>>>>> fn,
> >>>>>>>>>> wend,
> >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
> >>>>>>>>>>>
> >>>>>>>>>>> * Re Multi-staged evaluation:
> >>>>>>>>>>> I think this should be an optimization that can be applied if
> >>>>>>>>>>> the
> >>>>>>> UDF
> >>>>>>>>>>> implements the merge() method.
> >>>>>>>>>>>
> >>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>
> >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
> >>>>>>>>>>> wshaoxuan@xxxxxxxxx
> >>>>>>>>>>>> :
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi xiaowei,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Yes, I agree with you that the semantics of
> >>>>>>> TableAggregateFunction
> >>>>>>>>> emit
> >>>>>>>>>>> is
> >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental
> >>>>>>>> difference
> >>>>>>>>> is
> >>>>>>>>>>>> that TableAggregateFunction emits a "table" while
> >>>>>>> AggregateFunction
> >>>>>>>>>>> outputs
> >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it
> >> only
> >>>>>>> has
> >>>>>>>>> one
> >>>>>>>>>>>> mode which is “replacing” (complete update). But for
> >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit the
> >>>>>>> new
> >>>>>>>>>>> updated
> >>>>>>>>>>>> results) update or complete update (always emit the entire
> >>>>>>>>>>>> table
> >>>>>>>> when
> >>>>>>>>>>>> “emit" is triggered).  From the performance perspective, we
> >>>>>>>>>>>> might
> >>>>>>>>> want
> >>>>>>>>>> to
> >>>>>>>>>>>> use incremental update. But we need review and design this
> >>>>>>>> carefully,
> >>>>>>>>>>>> especially taking into account the cases of the failover
> >>>>>>>>>>>> (instead
> >>>>>>>> of
> >>>>>>>>>> just
> >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit offset)
> >>>>>>> and
> >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit
> >>>>>>>>>>>> are
> >>>>>>>>>>> different
> >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it does
> >>>>>>> not
> >>>>>>>>> need
> >>>>>>>>>>> to
> >>>>>>>>>>>> worry this due to the nature of stateless.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Shaoxuan
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
> >>>>>>>>>>>> <xiaoweij@xxxxxxxxx
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a
> >>>>>>> very
> >>>>>>>>>> good
> >>>>>>>>>>>>> start. There are a few points that we need to have more
> >>>>>>>>> discussions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>  - TableAggregateFunction - this is a very complex beast,
> >>>>>>>>>> definitely
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>  most complex user defined objects we introduced so far. I
> >>>>>>>> think
> >>>>>>>>>>> there
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>  quite some interesting questions here. For example, do we
> >>>>>>>> allow
> >>>>>>>>>>>>>  multi-staged TableAggregate in this case? What is the
> >>>>>>>> semantics
> >>>>>>>>> of
> >>>>>>>>>>>>> emit? Is
> >>>>>>>>>>>>>  it amendments to the previous output, or replacing it? I
> >>>>>>> think
> >>>>>>>>>> that
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>  subject itself is worth a discussion to make sure we get
> >> the
> >>>>>>>>>> details
> >>>>>>>>>>>>> right.
> >>>>>>>>>>>>>  - GroupedTable.agg - does the group keys automatically
> >>>>>>> appear
> >>>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>  output? how about the case of windowing aggregation?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Xiaowei
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
> >>>>>>>>>> sunjincheng121@xxxxxxxxx>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi, Xiaowei,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
> >>>>>>>> Outline
> >>>>>>>>> !
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I quickly looked at the overall content, these are good
> >>>>>>>>> expressions
> >>>>>>>>>>> of
> >>>>>>>>>>>>> our
> >>>>>>>>>>>>>> offline discussions. But from the points of my view, we
> >>>>>>> should
> >>>>>>>>> add
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this
> >>>>>>>>> propose.
> >>>>>>>>>>>> So, I
> >>>>>>>>>>>>>> added the following usage description of  interface and
> >>>>>>>> operators
> >>>>>>>>>> in
> >>>>>>>>>>>>>> google doc:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Map Operator
> >>>>>>>>>>>>>>   Map operator is a new operator of Table, Map operator can
> >>>>>>>>>> apply a
> >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as
> >>>>>>>>> follows:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> val res = tab
> >>>>>>>>>>>>>>    .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> >>>>>>>>>>>>>>    .select(‘a, ‘c)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. FlatMap Operator
> >>>>>>>>>>>>>>   FaltMap operator is a new operator of Table, FlatMap
> >>>>>>>> operator
> >>>>>>>>>> can
> >>>>>>>>>>>>> apply
> >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as
> >>>>>>>> follows:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> val res = tab
> >>>>>>>>>>>>>>     .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> >>>>>>>>>>>>>>     .select(‘a, ‘c)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3. Agg Operator
> >>>>>>>>>>>>>>   Agg operator is a new operator of Table/GroupedTable, Agg
> >>>>>>>>>>> operator
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. The
> >>>>>>>>> usage
> >>>>>>>>>> as
> >>>>>>>>>>>>>> follows:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>  val res = tab
> >>>>>>>>>>>>>>     .groupBy(‘a) // leave groupBy-Clause out to define
> >>>>>>> global
> >>>>>>>>>>>>> aggregates
> >>>>>>>>>>>>>>     .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> >>>>>>>>>>>>>>     .select(‘a, ‘c)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4.  FlatAgg Operator
> >>>>>>>>>>>>>>   FlatAgg operator is a new operator of Table/GroupedTable,
> >>>>>>>>>> FaltAgg
> >>>>>>>>>>>>>> operator can apply a table aggregate function, and can
> >> return
> >>>>>>>>>>>> multi-row.
> >>>>>>>>>>>>>> The usage as follows:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>   val res = tab
> >>>>>>>>>>>>>>      .groupBy(‘a) // leave groupBy-Clause out to define
> >>>>>>>> global
> >>>>>>>>>>> table
> >>>>>>>>>>>>>> aggregates
> >>>>>>>>>>>>>>      .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
> >>>>>>>>>>>>>>      .select(‘a, ‘c)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 5. TableAggregateFunction
> >>>>>>>>>>>>>>    The behavior of table aggregates is most like
> >>>>>>>>>>> GroupReduceFunction
> >>>>>>>>>>>>> did,
> >>>>>>>>>>>>>> which computed for a group of elements, and output  a group
> >>>>>>> of
> >>>>>>>>>>>> elements.
> >>>>>>>>>>>>>> The TableAggregateFunction can be applied on
> >>>>>>>>>> GroupedTable.flatAgg() .
> >>>>>>>>>>>> The
> >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, so
> >>>>>>> I
> >>>>>>>>>> don't
> >>>>>>>>>>>> copy
> >>>>>>>>>>>>>> it here, Please look at the detail in google doc:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and
> >>>>>>>> commenting.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >>
> >>
> -----------------------------------------------------------------------------------
> >>
> >> *Rome was not built in one day*
> >>
> >>
> >>
> -----------------------------------------------------------------------------------
> >>
>
>