osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi Shuyi,

Thanks for bringing up this discussion and the awesome work! I have left
some comments in the doc.

I want to share something more about the watermark definition learned from
Alibaba.

   1.

   Table should be able to accept multiple watermark definition.

   Because a table may have more than one rowtime field. For example, one
   rowtime field is from existing field but missing in some records, another
   is the ingestion timestamp in Kafka but not very accurate. In this case,
   user may define two rowtime fields with watermarks in the Table and choose
   one in different situation.
   2.

   Watermark stragety always work with rowtime field together.

Based on the two points metioned above, I think we should combine the
watermark strategy and rowtime field selection (i.e. which existing field
used to generate watermark) in one clause, so that we can define multiple
watermarks in one Table.

Here I will share the watermark syntax used in Alibaba (simply modified):

watermarkDefinition:
WATERMARK [watermarkName] FOR <rowtime_field> AS wm_strategy

wm_strategy:
  BOUNDED WITH OFFSET 'string' timeUnit
|
  ASCENDING

The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword
defines which existing field used to generate watermark, this field should
already exist in the schema (we can use computed-column to derive from
other fields). The “AS” keyword defines watermark strategy, such as BOUNDED
WITH OFFSET (covers almost all the requirements) and ASCENDING.

When the expected rowtime field does not exist in the schema, we can use
computed-column syntax to derive it from other existing fields using
built-in functions or user defined functions. So the rowtime/watermark
definition doesn’t need to care about “field-change” strategy
(replace/add/from-field). And the proctime field definition can also be
defined using computed-column. Such as pt as PROCTIME() which defines a
proctime field named “pt” in the schema.

Looking forward to working with you guys!

Best,
Jark Wu


Lin Li <lincoln.86xy@xxxxxxxxx> 于2018年11月28日周三 下午6:33写道:

> @Shuyi
> Thanks for the proposal!  We have a simple DDL implementation (extends
> Calcite's parser) which been running for almost two years on production and
> works well.
> I think the most valued things we'd learned is keeping simplicity and
> standard compliance.
> Here's the approximate grammar, FYI
> CREATE TABLE
>
> CREATE TABLE tableName(
>         columnDefinition [, columnDefinition]*
>         [ computedColumnDefinition [, computedColumnDefinition]* ]
>         [ tableConstraint [, tableConstraint]* ]
>         [ tableIndex [, tableIndex]* ]
>     [ PERIOD FOR SYSTEM_TIME ]
>         [ WATERMARK watermarkName FOR rowTimeColumn AS
> withOffset(rowTimeColumn, offset) ]     ) [ WITH ( tableOption [ ,
> tableOption]* ) ] [ ; ]
>
> columnDefinition ::=
>         columnName dataType [ NOT NULL ]
>
> dataType  ::=
>         {
>           [ VARCHAR ]
>           | [ BOOLEAN ]
>           | [ TINYINT ]
>           | [ SMALLINT ]
>           | [ INT ]
>           | [ BIGINT ]
>           | [ FLOAT ]
>           | [ DECIMAL ]
>           | [ DOUBLE ]
>           | [ DATE ]
>           | [ TIME ]
>           | [ TIMESTAMP ]
>           | [ VARBINARY ]
>         }
>
> computedColumnDefinition ::=
>         columnName AS computedColumnExpression
>
> tableConstraint ::=
>     { PRIMARY KEY | UNIQUE }
>         (columnName [, columnName]* )
>
> tableIndex ::=
>         [ UNIQUE ] INDEX indexName
>          (columnName [, columnName]* )
>
> rowTimeColumn ::=
>         columnName
>
> tableOption ::=
>         property=value
>         offset ::=
>         positive integer (unit: ms)
>
> CREATE VIEW
>
> CREATE VIEW viewName
>   [
>         ( columnName [, columnName]* )
>   ]
>         AS queryStatement;
>
> CREATE FUNCTION
>
>  CREATE FUNCTION functionName
>   AS 'className';
>
>  className ::=
>         fully qualified name
>
>
> Shuyi Chen <suez1224@xxxxxxxxx> 于2018年11月28日周三 上午3:28写道:
>
> > Thanks a lot, Timo and Xuefu. Yes, I think we can finalize the design doc
> > first and start implementation w/o the unified connector API ready by
> > skipping some featue.
> >
> > Xuefu, I like the idea of making Flink specific properties into generic
> > key-value pairs, so that it will make integration with Hive DDL (or
> others,
> > e.g. Beam DDL) easier.
> >
> > I'll run a final pass over the design doc and finalize the design in the
> > next few days. And we can start creating tasks and collaborate on the
> > implementation. Thanks a lot for all the comments and inputs.
> >
> > Cheers!
> > Shuyi
> >
> > On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <xuefu.z@xxxxxxxxxxxxxxx>
> > wrote:
> >
> > > Yeah! I agree with Timo that DDL can actually proceed w/o being blocked
> > by
> > > connector API. We can leave the unknown out while defining the basic
> > syntax.
> > >
> > > @Shuyi
> > >
> > > As commented in the doc, I think we can probably stick with simple
> syntax
> > > with general properties, without extending the syntax too much that it
> > > mimics the descriptor API.
> > >
> > > Part of our effort on Flink-Hive integration is also to make DDL syntax
> > > compatible with Hive's. The one in the current proposal seems making
> our
> > > effort more challenging.
> > >
> > > We can help and collaborate. At this moment, I think we can finalize on
> > > the proposal and then we can divide the tasks for better collaboration.
> > >
> > > Please let me know if there are  any questions or suggestions.
> > >
> > > Thanks,
> > > Xuefu
> > >
> > >
> > >
> > >
> > > ------------------------------------------------------------------
> > > Sender:Timo Walther <twalthr@xxxxxxxxxx>
> > > Sent at:2018 Nov 27 (Tue) 16:21
> > > Recipient:dev <dev@xxxxxxxxxxxxxxxx>
> > > Subject:Re: [DISCUSS] Flink SQL DDL Design
> > >
> > > Thanks for offering your help here, Xuefu. It would be great to move
> > > these efforts forward. I agree that the DDL is somehow releated to the
> > > unified connector API design but we can also start with the basic
> > > functionality now and evolve the DDL during this release and next
> > releases.
> > >
> > > For example, we could identify the MVP DDL syntax that skips defining
> > > key constraints and maybe even time attributes. This DDL could be used
> > > for batch usecases, ETL, and materializing SQL queries (no time
> > > operations like windows).
> > >
> > > The unified connector API is high on our priority list for the 1.8
> > > release. I will try to update the document until mid of next week.
> > >
> > >
> > > Regards,
> > >
> > > Timo
> > >
> > >
> > > Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> > > > Thanks a lot, Xuefu. I was busy for some other stuff for the last 2
> > > weeks,
> > > > but we are definitely interested in moving this forward. I think once
> > the
> > > > unified connector API design [1] is done, we can finalize the DDL
> > design
> > > as
> > > > well and start creating concrete subtasks to collaborate on the
> > > > implementation with the community.
> > > >
> > > > Shuyi
> > > >
> > > > [1]
> > > >
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > >
> > > > On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
> xuefu.z@xxxxxxxxxxxxxxx>
> > > > wrote:
> > > >
> > > >> Hi Shuyi,
> > > >>
> > > >> I'm wondering if you folks still have the bandwidth working on this.
> > > >>
> > > >> We have some dedicated resource and like to move this forward. We
> can
> > > >> collaborate.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Xuefu
> > > >>
> > > >>
> > > >> ------------------------------------------------------------------
> > > >> 发件人:wenlong.lwl<wenlong88.lwl@xxxxxxxxx>
> > > >> 日 期:2018年11月05日 11:15:35
> > > >> 收件人:<dev@xxxxxxxxxxxxxxxx>
> > > >> 主 题:Re: [DISCUSS] Flink SQL DDL Design
> > > >>
> > > >> Hi, Shuyi, thanks for the proposal.
> > > >>
> > > >> I have two concerns about the table ddl:
> > > >>
> > > >> 1. how about remove the source/sink mark from the ddl, because it is
> > not
> > > >> necessary, the framework determine the table referred is a source
> or a
> > > sink
> > > >> according to the context of the query using the table. it will be
> more
> > > >> convenient for use defining a table which can be both a source and
> > sink,
> > > >> and more convenient for catalog to persistent and manage the meta
> > infos.
> > > >>
> > > >> 2. how about just keeping one pure string map as parameters for
> table,
> > > like
> > > >> create tabe Kafka10SourceTable (
> > > >> intField INTEGER,
> > > >> stringField VARCHAR(128),
> > > >> longField BIGINT,
> > > >> rowTimeField TIMESTAMP
> > > >> ) with (
> > > >> connector.type = ’kafka’,
> > > >> connector.property-version = ’1’,
> > > >> connector.version = ’0.10’,
> > > >> connector.properties.topic = ‘test-kafka-topic’,
> > > >> connector.properties.startup-mode = ‘latest-offset’,
> > > >> connector.properties.specific-offset = ‘offset’,
> > > >> format.type = 'json'
> > > >> format.prperties.version=’1’,
> > > >> format.derive-schema = 'true'
> > > >> );
> > > >> Because:
> > > >> 1. in TableFactory, what user use is a string map properties,
> defining
> > > >> parameters by string-map can be the closest way to mapping how user
> > use
> > > the
> > > >> parameters.
> > > >> 2. The table descriptor can be extended by user, like what is done
> in
> > > Kafka
> > > >> and Json, it means that the parameter keys in connector or format
> can
> > be
> > > >> different in different implementation, we can not restrict the key
> in
> > a
> > > >> specified set, so we need a map in connector scope and a map in
> > > >> connector.properties scope. why not just give user a single map, let
> > > them
> > > >> put parameters in a format they like, which is also the simplest way
> > to
> > > >> implement DDL parser.
> > > >> 3. whether we can define a format clause or not, depends on the
> > > >> implementation of the connector, using different clause in DDL may
> > make
> > > a
> > > >> misunderstanding that we can combine the connectors with arbitrary
> > > formats,
> > > >> which may not work actually.
> > > >>
> > > >> On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński <wossyn@xxxxxxxxx>
> > wrote:
> > > >>
> > > >>> +1, Thanks for the proposal.
> > > >>>
> > > >>> I guess this is a long-awaited change. This can vastly increase the
> > > >>> functionalities of the SQL Client as it will be possible to use
> > complex
> > > >>> extensions like for example those provided by Apache Bahir[1].
> > > >>>
> > > >>> Best Regards,
> > > >>> Dom.
> > > >>>
> > > >>> [1]
> > > >>> https://github.com/apache/bahir-flink
> > > >>>
> > > >>> sob., 3 lis 2018 o 17:17 Rong Rong <walterddr@xxxxxxxxx>
> napisał(a):
> > > >>>
> > > >>>> +1. Thanks for putting the proposal together Shuyi.
> > > >>>>
> > > >>>> DDL has been brought up in a couple of times previously [1,2].
> > > >> Utilizing
> > > >>>> DDL will definitely be a great extension to the current Flink SQL
> to
> > > >>>> systematically support some of the previously brought up features
> > such
> > > >> as
> > > >>>> [3]. And it will also be beneficial to see the document closely
> > > aligned
> > > >>>> with the previous discussion for unified SQL connector API [4].
> > > >>>>
> > > >>>> I also left a few comments on the doc. Looking forward to the
> > > alignment
> > > >>>> with the other couple of efforts and contributing to them!
> > > >>>>
> > > >>>> Best,
> > > >>>> Rong
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> > > >>>> [2]
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> > > >>>> [3] https://issues.apache.org/jira/browse/FLINK-8003
> > > >>>> [4]
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3C6676cb66-6f31-23e1-eff5-2e9c19f88483@xxxxxxxxxx%3E
> > > >>>>
> > > >>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <bowenli86@xxxxxxxxx>
> > wrote:
> > > >>>>
> > > >>>>> Thanks Shuyi!
> > > >>>>>
> > > >>>>> I left some comments there. I think the design of SQL DDL and
> > > >>> Flink-Hive
> > > >>>>> integration/External catalog enhancements will work closely with
> > each
> > > >>>>> other. Hope we are well aligned on the directions of the two
> > designs,
> > > >>>> and I
> > > >>>>> look forward to working with you guys on both!
> > > >>>>>
> > > >>>>> Bowen
> > > >>>>>
> > > >>>>>
> > > >>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen <suez1224@xxxxxxxxx>
> > > >> wrote:
> > > >>>>>> Hi everyone,
> > > >>>>>>
> > > >>>>>> SQL DDL support has been a long-time ask from the community.
> > > >> Current
> > > >>>>> Flink
> > > >>>>>> SQL support only DML (e.g. SELECT and INSERT statements). In its
> > > >>>> current
> > > >>>>>> form, Flink SQL users still need to define/create table sources
> > and
> > > >>>> sinks
> > > >>>>>> programmatically in Java/Scala. Also, in SQL Client, without DDL
> > > >>>> support,
> > > >>>>>> the current implementation does not allow dynamical creation of
> > > >>> table,
> > > >>>>> type
> > > >>>>>> or functions with SQL, this adds friction for its adoption.
> > > >>>>>>
> > > >>>>>> I drafted a design doc [1] with a few other community members
> that
> > > >>>>> proposes
> > > >>>>>> the design and implementation for adding DDL support in Flink.
> The
> > > >>>>> initial
> > > >>>>>> design considers DDL for table, view, type, library and
> function.
> > > >> It
> > > >>>> will
> > > >>>>>> be great to get feedback on the design from the community, and
> > > >> align
> > > >>>> with
> > > >>>>>> latest effort in unified SQL connector API [2] and Flink Hive
> > > >>>>> integration
> > > >>>>>> [3].
> > > >>>>>>
> > > >>>>>> Any feedback is highly appreciated.
> > > >>>>>>
> > > >>>>>> Thanks
> > > >>>>>> Shuyi Chen
> > > >>>>>>
> > > >>>>>> [1]
> > > >>>>>>
> > > >>>>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> > > >>>>>> [2]
> > > >>>>>>
> > > >>>>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> > > >>>>>> [3]
> > > >>>>>>
> > > >>>>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> > > >>>>>> --
> > > >>>>>> "So you have to trust that the dots will somehow connect in your
> > > >>>> future."
> > > >>
> >
> >
> >
> > --
> > "So you have to trust that the dots will somehow connect in your future."
> >
>