osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi Lin and Jark,
Thanks for sharing those details. Can you please consider summarizing your
DDL design into a google doc.
We can still continue the discussions on Shuyi's proposal. But having a
separate google doc will be easy for the DEV to understand/comment/discuss
on your proposed DDL implementation.

Regards,
Shaoxuan


On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <imjark@xxxxxxxxx> wrote:

> Hi Shuyi,
>
> Thanks for bringing up this discussion and the awesome work! I have left
> some comments in the doc.
>
> I want to share something more about the watermark definition learned from
> Alibaba.
>
>    1.
>
>    Table should be able to accept multiple watermark definition.
>
>    Because a table may have more than one rowtime field. For example, one
>    rowtime field is from existing field but missing in some records,
> another
>    is the ingestion timestamp in Kafka but not very accurate. In this case,
>    user may define two rowtime fields with watermarks in the Table and
> choose
>    one in different situation.
>    2.
>
>    Watermark stragety always work with rowtime field together.
>
> Based on the two points metioned above, I think we should combine the
> watermark strategy and rowtime field selection (i.e. which existing field
> used to generate watermark) in one clause, so that we can define multiple
> watermarks in one Table.
>
> Here I will share the watermark syntax used in Alibaba (simply modified):
>
> watermarkDefinition:
> WATERMARK [watermarkName] FOR <rowtime_field> AS wm_strategy
>
> wm_strategy:
>   BOUNDED WITH OFFSET 'string' timeUnit
> |
>   ASCENDING
>
> The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword
> defines which existing field used to generate watermark, this field should
> already exist in the schema (we can use computed-column to derive from
> other fields). The “AS” keyword defines watermark strategy, such as BOUNDED
> WITH OFFSET (covers almost all the requirements) and ASCENDING.
>
> When the expected rowtime field does not exist in the schema, we can use
> computed-column syntax to derive it from other existing fields using
> built-in functions or user defined functions. So the rowtime/watermark
> definition doesn’t need to care about “field-change” strategy
> (replace/add/from-field). And the proctime field definition can also be
> defined using computed-column. Such as pt as PROCTIME() which defines a
> proctime field named “pt” in the schema.
>
> Looking forward to working with you guys!
>
> Best,
> Jark Wu
>
>
> Lin Li <lincoln.86xy@xxxxxxxxx> 于2018年11月28日周三 下午6:33写道:
>
> > @Shuyi
> > Thanks for the proposal!  We have a simple DDL implementation (extends
> > Calcite's parser) which been running for almost two years on production
> and
> > works well.
> > I think the most valued things we'd learned is keeping simplicity and
> > standard compliance.
> > Here's the approximate grammar, FYI
> > CREATE TABLE
> >
> > CREATE TABLE tableName(
> >         columnDefinition [, columnDefinition]*
> >         [ computedColumnDefinition [, computedColumnDefinition]* ]
> >         [ tableConstraint [, tableConstraint]* ]
> >         [ tableIndex [, tableIndex]* ]
> >     [ PERIOD FOR SYSTEM_TIME ]
> >         [ WATERMARK watermarkName FOR rowTimeColumn AS
> > withOffset(rowTimeColumn, offset) ]     ) [ WITH ( tableOption [ ,
> > tableOption]* ) ] [ ; ]
> >
> > columnDefinition ::=
> >         columnName dataType [ NOT NULL ]
> >
> > dataType  ::=
> >         {
> >           [ VARCHAR ]
> >           | [ BOOLEAN ]
> >           | [ TINYINT ]
> >           | [ SMALLINT ]
> >           | [ INT ]
> >           | [ BIGINT ]
> >           | [ FLOAT ]
> >           | [ DECIMAL ]
> >           | [ DOUBLE ]
> >           | [ DATE ]
> >           | [ TIME ]
> >           | [ TIMESTAMP ]
> >           | [ VARBINARY ]
> >         }
> >
> > computedColumnDefinition ::=
> >         columnName AS computedColumnExpression
> >
> > tableConstraint ::=
> >     { PRIMARY KEY | UNIQUE }
> >         (columnName [, columnName]* )
> >
> > tableIndex ::=
> >         [ UNIQUE ] INDEX indexName
> >          (columnName [, columnName]* )
> >
> > rowTimeColumn ::=
> >         columnName
> >
> > tableOption ::=
> >         property=value
> >         offset ::=
> >         positive integer (unit: ms)
> >
> > CREATE VIEW
> >
> > CREATE VIEW viewName
> >   [
> >         ( columnName [, columnName]* )
> >   ]
> >         AS queryStatement;
> >
> > CREATE FUNCTION
> >
> >  CREATE FUNCTION functionName
> >   AS 'className';
> >
> >  className ::=
> >         fully qualified name
> >
> >
> > Shuyi Chen <suez1224@xxxxxxxxx> 于2018年11月28日周三 上午3:28写道:
> >
> > > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the design
> doc
> > > first and start implementation w/o the unified connector API ready by
> > > skipping some featue.
> > >
> > > Xuefu, I like the idea of making Flink specific properties into generic
> > > key-value pairs, so that it will make integration with Hive DDL (or
> > others,
> > > e.g. Beam DDL) easier.
> > >
> > > I'll run a final pass over the design doc and finalize the design in
> the
> > > next few days. And we can start creating tasks and collaborate on the
> > > implementation. Thanks a lot for all the comments and inputs.
> > >
> > > Cheers!
> > > Shuyi
> > >
> > > On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <xuefu.z@xxxxxxxxxxxxxxx>
> > > wrote:
> > >
> > > > Yeah! I agree with Timo that DDL can actually proceed w/o being
> blocked
> > > by
> > > > connector API. We can leave the unknown out while defining the basic
> > > syntax.
> > > >
> > > > @Shuyi
> > > >
> > > > As commented in the doc, I think we can probably stick with simple
> > syntax
> > > > with general properties, without extending the syntax too much that
> it
> > > > mimics the descriptor API.
> > > >
> > > > Part of our effort on Flink-Hive integration is also to make DDL
> syntax
> > > > compatible with Hive's. The one in the current proposal seems making
> > our
> > > > effort more challenging.
> > > >
> > > > We can help and collaborate. At this moment, I think we can finalize
> on
> > > > the proposal and then we can divide the tasks for better
> collaboration.
> > > >
> > > > Please let me know if there are  any questions or suggestions.
> > > >
> > > > Thanks,
> > > > Xuefu
> > > >
> > > >
> > > >
> > > >
> > > > ------------------------------------------------------------------
> > > > Sender:Timo Walther <twalthr@xxxxxxxxxx>
> > > > Sent at:2018 Nov 27 (Tue) 16:21
> > > > Recipient:dev <dev@xxxxxxxxxxxxxxxx>
> > > > Subject:Re: [DISCUSS] Flink SQL DDL Design
> > > >
> > > > Thanks for offering your help here, Xuefu. It would be great to move
> > > > these efforts forward. I agree that the DDL is somehow releated to
> the
> > > > unified connector API design but we can also start with the basic
> > > > functionality now and evolve the DDL during this release and next
> > > releases.
> > > >
> > > > For example, we could identify the MVP DDL syntax that skips defining
> > > > key constraints and maybe even time attributes. This DDL could be
> used
> > > > for batch usecases, ETL, and materializing SQL queries (no time
> > > > operations like windows).
> > > >
> > > > The unified connector API is high on our priority list for the 1.8
> > > > release. I will try to update the document until mid of next week.
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Timo
> > > >
> > > >
> > > > Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > > > > Thanks a lot, Xuefu. I was busy for some other stuff for the last 2
> > > > weeks,
> > > > > but we are definitely interested in moving this forward. I think
> once
> > > the
> > > > > unified connector API design [1] is done, we can finalize the DDL
> > > design
> > > > as
> > > > > well and start creating concrete subtasks to collaborate on the
> > > > > implementation with the community.
> > > > >
> > > > > Shuyi
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > > >
> > > > > On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
> > xuefu.z@xxxxxxxxxxxxxxx>
> > > > > wrote:
> > > > >
> > > > >> Hi Shuyi,
> > > > >>
> > > > >> I'm wondering if you folks still have the bandwidth working on
> this.
> > > > >>
> > > > >> We have some dedicated resource and like to move this forward. We
> > can
> > > > >> collaborate.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Xuefu
> > > > >>
> > > > >>
> > > > >> ------------------------------------------------------------------
> > > > >> 发件人:wenlong.lwl<wenlong88.lwl@xxxxxxxxx>
> > > > >> 日 期:2018年11月05日 11:15:35
> > > > >> 收件人:<dev@xxxxxxxxxxxxxxxx>
> > > > >> 主 题:Re: [DISCUSS] Flink SQL DDL Design
> > > > >>
> > > > >> Hi, Shuyi, thanks for the proposal.
> > > > >>
> > > > >> I have two concerns about the table ddl:
> > > > >>
> > > > >> 1. how about remove the source/sink mark from the ddl, because it
> is
> > > not
> > > > >> necessary, the framework determine the table referred is a source
> > or a
> > > > sink
> > > > >> according to the context of the query using the table. it will be
> > more
> > > > >> convenient for use defining a table which can be both a source and
> > > sink,
> > > > >> and more convenient for catalog to persistent and manage the meta
> > > infos.
> > > > >>
> > > > >> 2. how about just keeping one pure string map as parameters for
> > table,
> > > > like
> > > > >> create tabe Kafka10SourceTable (
> > > > >> intField INTEGER,
> > > > >> stringField VARCHAR(128),
> > > > >> longField BIGINT,
> > > > >> rowTimeField TIMESTAMP
> > > > >> ) with (
> > > > >> connector.type = ’kafka’,
> > > > >> connector.property-version = ’1’,
> > > > >> connector.version = ’0.10’,
> > > > >> connector.properties.topic = ‘test-kafka-topic’,
> > > > >> connector.properties.startup-mode = ‘latest-offset’,
> > > > >> connector.properties.specific-offset = ‘offset’,
> > > > >> format.type = 'json'
> > > > >> format.prperties.version=’1’,
> > > > >> format.derive-schema = 'true'
> > > > >> );
> > > > >> Because:
> > > > >> 1. in TableFactory, what user use is a string map properties,
> > defining
> > > > >> parameters by string-map can be the closest way to mapping how
> user
> > > use
> > > > the
> > > > >> parameters.
> > > > >> 2. The table descriptor can be extended by user, like what is done
> > in
> > > > Kafka
> > > > >> and Json, it means that the parameter keys in connector or format
> > can
> > > be
> > > > >> different in different implementation, we can not restrict the key
> > in
> > > a
> > > > >> specified set, so we need a map in connector scope and a map in
> > > > >> connector.properties scope. why not just give user a single map,
> let
> > > > them
> > > > >> put parameters in a format they like, which is also the simplest
> way
> > > to
> > > > >> implement DDL parser.
> > > > >> 3. whether we can define a format clause or not, depends on the
> > > > >> implementation of the connector, using different clause in DDL may
> > > make
> > > > a
> > > > >> misunderstanding that we can combine the connectors with arbitrary
> > > > formats,
> > > > >> which may not work actually.
> > > > >>
> > > > >> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński <wossyn@xxxxxxxxx>
> > > wrote:
> > > > >>
> > > > >>> +1, Thanks for the proposal.
> > > > >>>
> > > > >>> I guess this is a long-awaited change. This can vastly increase
> the
> > > > >>> functionalities of the SQL Client as it will be possible to use
> > > complex
> > > > >>> extensions like for example those provided by Apache Bahir[1].
> > > > >>>
> > > > >>> Best Regards,
> > > > >>> Dom.
> > > > >>>
> > > > >>> [1]
> > > > >>> https://github.com/apache/bahir-flink
> > > > >>>
> > > > >>> sob., 3 lis 2018 o 17:17 Rong Rong <walterddr@xxxxxxxxx>
> > napisał(a):
> > > > >>>
> > > > >>>> +1. Thanks for putting the proposal together Shuyi.
> > > > >>>>
> > > > >>>> DDL has been brought up in a couple of times previously [1,2].
> > > > >> Utilizing
> > > > >>>> DDL will definitely be a great extension to the current Flink
> SQL
> > to
> > > > >>>> systematically support some of the previously brought up
> features
> > > such
> > > > >> as
> > > > >>>> [3]. And it will also be beneficial to see the document closely
> > > > aligned
> > > > >>>> with the previous discussion for unified SQL connector API [4].
> > > > >>>>
> > > > >>>> I also left a few comments on the doc. Looking forward to the
> > > > alignment
> > > > >>>> with the other couple of efforts and contributing to them!
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Rong
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > > > >>>> [2]
> > > > >>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> > > > >>>> [3] https://issues.apache.org/jira/browse/FLINK-8003
> > > > >>>> [4]
> > > > >>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3C6676cb66-6f31-23e1-eff5-2e9c19f88483@xxxxxxxxxx%3E
> > > > >>>>
> > > > >>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <bowenli86@xxxxxxxxx>
> > > wrote:
> > > > >>>>
> > > > >>>>> Thanks Shuyi!
> > > > >>>>>
> > > > >>>>> I left some comments there. I think the design of SQL DDL and
> > > > >>> Flink-Hive
> > > > >>>>> integration/External catalog enhancements will work closely
> with
> > > each
> > > > >>>>> other. Hope we are well aligned on the directions of the two
> > > designs,
> > > > >>>> and I
> > > > >>>>> look forward to working with you guys on both!
> > > > >>>>>
> > > > >>>>> Bowen
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen <suez1224@xxxxxxxxx
> >
> > > > >> wrote:
> > > > >>>>>> Hi everyone,
> > > > >>>>>>
> > > > >>>>>> SQL DDL support has been a long-time ask from the community.
> > > > >> Current
> > > > >>>>> Flink
> > > > >>>>>> SQL support only DML (e.g. SELECT and INSERT statements). In
> its
> > > > >>>> current
> > > > >>>>>> form, Flink SQL users still need to define/create table
> sources
> > > and
> > > > >>>> sinks
> > > > >>>>>> programmatically in Java/Scala. Also, in SQL Client, without
> DDL
> > > > >>>> support,
> > > > >>>>>> the current implementation does not allow dynamical creation
> of
> > > > >>> table,
> > > > >>>>> type
> > > > >>>>>> or functions with SQL, this adds friction for its adoption.
> > > > >>>>>>
> > > > >>>>>> I drafted a design doc [1] with a few other community members
> > that
> > > > >>>>> proposes
> > > > >>>>>> the design and implementation for adding DDL support in Flink.
> > The
> > > > >>>>> initial
> > > > >>>>>> design considers DDL for table, view, type, library and
> > function.
> > > > >> It
> > > > >>>> will
> > > > >>>>>> be great to get feedback on the design from the community, and
> > > > >> align
> > > > >>>> with
> > > > >>>>>> latest effort in unified SQL connector API [2] and Flink Hive
> > > > >>>>> integration
> > > > >>>>>> [3].
> > > > >>>>>>
> > > > >>>>>> Any feedback is highly appreciated.
> > > > >>>>>>
> > > > >>>>>> Thanks
> > > > >>>>>> Shuyi Chen
> > > > >>>>>>
> > > > >>>>>> [1]
> > > > >>>>>>
> > > > >>>>>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> > > > >>>>>> [2]
> > > > >>>>>>
> > > > >>>>>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > > >>>>>> [3]
> > > > >>>>>>
> > > > >>>>>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> > > > >>>>>> --
> > > > >>>>>> "So you have to trust that the dots will somehow connect in
> your
> > > > >>>> future."
> > > > >>
> > >
> > >
> > >
> > > --
> > > "So you have to trust that the dots will somehow connect in your
> future."
> > >
> >
>