osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Enhancing the functionality and productivity of Table API


Hi, Timo,
I am very grateful for your feedback, and I am very excited when I hear
that you also consider adding a process function to the TableAPI.

I agree that add support for the Process Function on the Table API, which
is actually part of my proposal Enhancing the functionality of Table API.
In fact, supporting the ProcessFunction means supporting the user-defined
Operator. As you said, A ProcessFunction can implement any logic, including
the user-defined window, which leaves the user with enough freedom and
control. At the same time, Co-PrecessFunction needs to be supported, so we
can implement the logic of User-Defined JOIN through Co-PrecessFunciton. Of
course, Co-PrecessFunciton also needs to introduce the concept of Connect,
and will introduce a new ConnectedTable type on TableAPI.  And I also think
TableAPI also for more event-driven applications.

About processFunction In addition to the timer function, it should be
completely equivalent to flatmapFunction, so maybe we can support map and
flatmap in Table, support processFunction in GroupedTable, because for the
reason of State, the Timer of ProcessFunction can only Apply to KeyedStream.

You are right, ANSI-SQL is difficult to express complex operator logic such
as ProcessFunction, so once we decide to make these enhancements on the
TableAPI, it means that the Flink SQL only includes ANSI-SQL operations,
and the TableAPI' operations is SQL super set. This means that the Flink
High-level API includes the A Query language SQL and A powerfu program
language Table API. In this way, SQL using for those simple ETL user
groups, the TableAPI is for a user group that needs to be customized for
complex logic, and these users can enjoy The benefit of the query
optimizer. Maybe we need more refinement and hard work to support these
functions, but maybe this is a good direction of effort.

Thanks,
Jincheng

Timo Walther <twalthr@xxxxxxxxxx> 于2018年11月1日周四 下午10:08写道:

> Hi Jincheng,
>
> I was also thinking about introducing a process function for the Table
> API several times. This would allow to define more complex logic (custom
> windows, timers, etc.) embedded into a relational API with schema
> awareness and optimization around the black box. Of course this would
> mean that we diverge with Table API from SQL API, however, it would open
> the Table API also for more event-driven applications.
>
> Maybe it would be possible to define timers and firing logic using Table
> API expressions and UDFs. Within planning this would be treated as a
> special Calc node.
>
> Just some ideas that might be interesting for new use cases.
>
> Regards,
> Timo
>
>
> Am 01.11.18 um 13:12 schrieb Aljoscha Krettek:
> > Hi Jincheng,
> >
> > these points sound very good! Are there any concrete proposals for
> changes? For example a FLIP/design document?
> >
> > See here for FLIPs:
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> > Best,
> > Aljoscha
> >
> >> On 1. Nov 2018, at 12:51, jincheng sun <sunjincheng121@xxxxxxxxx>
> wrote:
> >>
> >> *--------I am sorry for the formatting of the email content. I reformat
> >> the **content** as follows-----------*
> >>
> >> *Hi ALL,*
> >>
> >> With the continuous efforts from the community, the Flink system has
> been
> >> continuously improved, which has attracted more and more users. Flink
> SQL
> >> is a canonical, widely used relational query language. However, there
> are
> >> still some scenarios where Flink SQL failed to meet user needs in terms
> of
> >> functionality and ease of use, such as:
> >>
> >> *1. In terms of functionality*
> >>     Iteration, user-defined window, user-defined join, user-defined
> >> GroupReduce, etc. Users cannot express them with SQL;
> >>
> >> *2. In terms of ease of use*
> >>
> >>    - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> >>    udf2(), udf3()....)” can be used to accomplish the same function.,
> with a
> >>    map() function returning 100 columns, one has to define or call 100
> UDFs
> >>    when using SQL, which is quite involved.
> >>    - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
> be
> >>    implemented with “table.join(udtf).select()”. However, it is obvious
> that
> >>    dataStream is easier to use than SQL.
> >>
> >> Due to the above two reasons, some users have to use the DataStream API
> or
> >> the DataSet API. But when they do that, they lose the unification of
> batch
> >> and streaming. They will also lose the sophisticated optimizations such
> as
> >> codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> >>
> >> We believe that enhancing the functionality and productivity is vital
> for
> >> the successful adoption of Table API. To this end,  Table API still
> >> requires more efforts from every contributor in the community. We see
> great
> >> opportunity in improving our user’s experience from this work. Any
> feedback
> >> is welcome.
> >>
> >> Regards,
> >>
> >> Jincheng
> >>
> >> jincheng sun <sunjincheng121@xxxxxxxxx> 于2018年11月1日周四 下午5:07写道:
> >>
> >>> Hi all,
> >>>
> >>> With the continuous efforts from the community, the Flink system has
> been
> >>> continuously improved, which has attracted more and more users. Flink
> SQL
> >>> is a canonical, widely used relational query language. However, there
> are
> >>> still some scenarios where Flink SQL failed to meet user needs in
> terms of
> >>> functionality and ease of use, such as:
> >>>
> >>>
> >>>    -
> >>>
> >>>    In terms of functionality
> >>>
> >>> Iteration, user-defined window, user-defined join, user-defined
> >>> GroupReduce, etc. Users cannot express them with SQL;
> >>>
> >>>    -
> >>>
> >>>    In terms of ease of use
> >>>    -
> >>>
> >>>       Map - e.g. “dataStream.map(mapFun)”. Although
> “table.select(udf1(),
> >>>       udf2(), udf3()....)” can be used to accomplish the same
> function., with a
> >>>       map() function returning 100 columns, one has to define or call
> 100 UDFs
> >>>       when using SQL, which is quite involved.
> >>>       -
> >>>
> >>>       FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> can
> >>>       be implemented with “table.join(udtf).select()”. However, it is
> obvious
> >>>       that datastream is easier to use than SQL.
> >>>
> >>>
> >>> Due to the above two reasons, some users have to use the DataStream
> API or
> >>> the DataSet API. But when they do that, they lose the unification of
> batch
> >>> and streaming. They will also lose the sophisticated optimizations
> such as
> >>> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
> >>>
> >>> We believe that enhancing the functionality and productivity is vital
> for
> >>> the successful adoption of Table API. To this end,  Table API still
> >>> requires more efforts from every contributor in the community. We see
> great
> >>> opportunity in improving our user’s experience from this work. Any
> feedback
> >>> is welcome.
> >>>
> >>> Regards,
> >>>
> >>> Jincheng
> >>>
> >>>
>
>