osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi Timo,

I think I get your point why it would be better to put Table Update Mode in
MVP. But because this is a sophisticated problem, we need to think about it
carefully and need some discussions offline. We will reach out to here when
we have a clear design.


8). Support row/map/array data type
Do you mean how to distinguish int[] and Integer[]?  Yes, maybe we need to
support NULL/NOT NULL just for array elements, such as: ARRAY<INT NOT NULL>
is int[], ARRAY<INT> is Integer[].


Cheers,
Jark

On Fri, 14 Dec 2018 at 19:46, Timo Walther <twalthr@xxxxxxxxxx> wrote:

> Hi all,
>
> I think we should discuss what we consider an MVP DDL. For me, an MVP
> DDL was to just focus on a CREATE TABLE statement. It would be great to
> come up with a solution that finally solves the issue of connecting
> different kind of systems. One reason why we postponed DDL statements
> for quite some time is that we cannot change it easily once released.
>
> However, the current state of the discussion can be summarized by the
> following functionality:
>
> 1. Only support append source tables (because the distinction of
> update/retract table is not clear).
> 2. Only support append and update sink tables (because a changeflag is
> missing).
> 3. Don't support outputting to Kafka with time attributes (because we
> cannot set a timestamp).
>
> Personally, I would like to have more use cases enabled by solving the
> header timestamps and change flag discussion. And I don't see a reason
> why we have to rush here.
>
> 8). Support row/map/array data type
> How do we want to support object arrays vs. primitive arrays? Currently,
> we need to make this clear distinction for between external system and
> Java [1] (E.g. byte[] arrays vs. object arrays) and users can choose
> between Types.PRIMITIVE_ARRAY and Types.OBJECT_ARRAY. Otherwise we need
> to support NULL/NOT NULL for array elements.
>
> 4) Event-Time Attributes and Watermarks
> I completely agree with Rong here. `ts AS SYSTEMROWTIME()` indicates
> that the system takes care of this column and for unification this would
> mean both for sources and sinks. It is still a computed column but gives
> hints to connectors. Implementing connectors can choose if they want to
> use this hint or not. The Flink Kafka connector would make use of it.
> @Jark: I think a PERSISTED keyword would confuse users (as shown by your
> Stackoverflow question) and would only make sense for SYSTEMROWTIME and
> no other computed column.
>
> 3) SOURCE / SINK / BOTH
> @Jark: My initial suggestion was to make the SOURCE/SINK optional such
> that users can only use CREATE TABLE depending on the use case. But as I
> said before, since I cannot find support here, we can drop the keywords.
>
> 7) Table Update Mode
> @Jark: The questions that you posted are exactly the ones that we should
> find an answer for. Because a DDL should just be the front end to the
> characteristics of an engine. After thinking about it again a change
> flag is actually more similar to a PARTITION BY clause because it
> defines a field that is not in the table's schema but in the schema of
> the physical format. However, the columns defined by a PARTITION BY are
> shown when describing/projecting a table whereas a change flag column
> must not be shown.
>
> If a table source supports append, upserts, and retractions, we need a
> way to express how we want to connect to the system.
>
> hasPrimaryKey() && !hasChangeFlag() -> append mode
> hasPrimaryKey() && hasChangeFlag() -> upsert mode
> !hasPrimaryKey() && hasChangeFlag() -> retract mode
>
> Are we fine with this?
>
> Regarding reading `topic`, `partition`, `offset` or custom properties
> from message headers. I already discussed this in my unified connector
> document. We don't need built-in functions for all these properties.
> Those things depend on the connector and format, it is their
> responsibility to extend the table schema in order to expose those
> properties (e.g. by providing a Map<String, String> for all these kind
> of properties).
>
> Example:
>
> CREATE TABLE myTopic (
>      col1 INT,
>      col2 VARCHAR,
>      col3 MAP<VARCHAR, VARCHAR>,
>      col4 AS SYSTEMROWTIME()
> )
> PARTITION BY (col0 LONG)
> WITH (
>    connector.type = kafka
>    format.type = key-value-metadata
>    format.key-format.type = avro
>    format.value-format.type = json
> )
>
> The format defines to use a KeyedDeserializationSchema that extends the
> schema by a metadata column. The PARTITION BY declares the columns for
> Kafka's key in Avro format. col1 till col2 are Kafka's JSON columns.
>
> Thanks for your feedback,
> Timo
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings
>
>
> Am 13.12.18 um 09:50 schrieb Jark Wu:
> > Hi all,
> >
> > Here are a bunch of my thoughts:
> >
> > 8). support row/map/array data type
> > That's fine with me if we want to support them in the MVP. In my mind, we
> > can have the field type syntax like this:
> >
> > ```
> > filedType ::=
> >              {
> >                  simpleType
> >               | MAP<simpleType, fieldType>
> >               | ARRAY<fieldType>
> >               | ROW<columnDefinition [, columnDefinition]*>
> >              }
> > ```
> >
> > I have included this in @Shuyi's summary doc [1] . Please leave feedbacks
> > there!
> >
> > [1]
> >
> https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit
> >
> > 3) SOURCE / SINK / BOTH
> > @Timo, CREATE TABLE statement is registering a virtual table in the
> session
> > or catalog. I don't think it is immutable, as we might also want to
> support
> > CREATE INDEX statements in the future. On the other hand, ACL is not a
> part
> > of the table definition, it should belong to the permission system which
> is
> > usually stored in somewhere else. So GRANT/INVOKE sounds like a more
> > standard option.
> >
> > 7) Table Update Mode
> > I agree with @Shuyi that table update mode can be left out from the MVP.
> > Because IMO, the update mode will not break the current MVP design. It
> > should be something to add, like the CHANGE_FLAG you proposed. We can
> > continue this discussion when we finalize the MVP.
> >
> > Meanwhile, the update mode is a big topic which may involve several weeks
> > to discuss. For example, (a) do we support CHANGE_FLAG when the table
> > supports upsert (or when the table defined a primary key)?  (b) the
> > CHANGE_FLAG should support write and read both. (c) currently, we only
> > support true (add) and false (retract) flag type, are they enough? (d)
> How
> > to connect an external storage which also support insert/delete flag like
> > mysql binlog?
> >
> > Regarding to the CHANGE_FLAG @Timo proposed, I think this is a good
> > direction. But should isRetraction be a physical field and make
> CHANGE_FLAG
> > like a constraint on that? If yes, then what the type of isRetraction?
> >
> > 4.b) Ingesting and writing timestamps to systems.
> > @Shuyi, PERSISTED can solve the problem of the field is not physically
> > stored. However, it doesn't solve the problem that how to write a field
> > back to the computed column, because "A computed column cannot be the
> > target of an INSERT or UPDATE statement" even if the computed column is
> > persisted. If we want to write a rowtime back the the external system,
> the
> > DML should look like this: "INSERT INTO sink SELECT a, rowtime FROM
> > source". The point is that the `rowtime` must be specified in the INSERT
> > statement, that's why I hope the `rowtime` field in Table is not a
> computed
> > column. See more information about PERSISTED [2] [3].
> >
> > Another point to consider is SYSTEMROWTIME() only solve reading timestamp
> > from message header in systems. There are many similar requirements here,
> > such as reading `topic`, `partition`, `offset` or custom properties from
> > message headers, do we plan to support a bunch of built-in functions like
> > SYSTEMROWTIME()?  Do we have some clean and easy way for this?
> >
> > [2]:
> >
> https://docs.microsoft.com/en-us/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-2017
> > [3]:
> >
> https://stackoverflow.com/questions/51390531/sql-server-persisted-computed-columns-versus-actual-normal-column
> >
> > Looking forward to collaborate with you guys!
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 13 Dec 2018 at 01:38, Rong Rong <walterddr@xxxxxxxxx> wrote:
> >
> >> Thanks for the summary effort @shuyi. Sorry for jumping in the
> discussion
> >> so late.
> >>
> >> As of the scope of MVP, I think we might want to consider adding "table
> >> update mode" problem to it. I agree with @timo that might not be easily
> >> changed in the future if the flags has to be part of the schema/column
> >> definition.
> >>
> >> Regarding the components under discussion.
> >> 4) Event-Time Attributes and Watermarks
> >> b, c) I actually like the special indicator way @fabian suggested to
> hint
> >> Flink to read time attributes directly from the system not the data
> `(ts AS
> >> SYSTEMROWTIME())`. It should also address the "compute field not
> emitted"
> >> problem by carrying the "virtual column" concept like @shuyi suggested.
> >> However if I understand correctly, this also required to be defined as
> part
> >> of the schema/column definition.
> >>
> >> 3) SOURCE / SINK / BOTH
> >> +1 on not adding properties to `CREATE TABLE` to manage ACL/permission.
> >>
> >> On a higher level, I think one question I have is whether we can
> >> definitively come to an agreement that the features under discussion
> (and
> >> potential solutions) can be cleanly adjusted/added from what we are
> >> providing on MVP (e.g. the schema/column definition might be hard to
> >> achieve but if we all agree ACL/permission should not be part of the
> >> `CREATE TABLE` and a decision can be made later). @shuyi I can also
> help in
> >> drafting the FLIP doc by summarizing the features under discussion and
> the
> >> concerns to whether included in the MVP, so that we can carry on the
> >> discussions alongside with the MVP implementation effort. I think each
> one
> >> of these features deserves a subsection dedicated for it.
> >>
> >> Many thanks,
> >> Rong
> >>
> >>
> >> On Wed, Dec 12, 2018 at 1:14 AM Shuyi Chen <suez1224@xxxxxxxxx> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I summarize the MVP based on the features that we agreed upon. For
> table
> >>> update mode and custom watermark strategy and ts extractor, I found
> there
> >>> are some discussions, so I decided to leave them out for the MVP.
> >>> For row/map/array data type, I think we can add it as well if everyone
> >>> agrees.
> >>>
> >>>
> >>> 4) Event-Time Attributes and Watermarks
> >>> Cited from SQL Server 2017 document (
> >>>
> >>>
> >>
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2017
> >>> ),
> >>> "A
> >>> computed column is a virtual column that is not physically stored in
> the
> >>> table, unless the column is marked PERSISTED. A computed column
> >> expression
> >>> can use data from other columns to calculate a value for the column to
> >>> which it belongs. " I think we can also use introduce the PERSISTED
> >> keyword
> >>> for computed column to indicate that the field can be stored back to
> the
> >>> table, i.e. ts AS SYSTEMROWTIME() PERSISTED.
> >>>
> >>> 3) SOURCE / SINK / BOTH
> >>> GRANT/INVOKE sounds like a more standard option than adding a property
> to
> >>> CREATE TABLE to manage the ACL/permission. The ACL can be stored
> >> somewhere
> >>> in a database, and allow/disallow access to a dynamic table depending
> on
> >>> whether it's a "INSERT INTO" or "SELECT".
> >>>
> >>> I can volunteer to put the discussion as a FLIP.  I can try to
> summarize
> >>> the current discussion, and share edit permission with you to
> collaborate
> >>> on the documents. After we finalized the doc, we can publish it as a
> >> FLIP.
> >>> What do you think?
> >>>
> >>> Shuyi
> >>>
> >>>
> >>>
> >>> On Tue, Dec 11, 2018 at 9:13 AM Timo Walther <twalthr@xxxxxxxxxx>
> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> thanks for summarizing the discussion @Shuyi. I think we need to
> >> include
> >>>> the "table update mode" problem as it might not be changed easily in
> >> the
> >>>> future. Regarding "support row/map/array data type", I don't see a
> >>>> problem why we should not support them now as the data types are
> >> already
> >>>> included in the runtime. The "support custom timestamp extractor" is
> >>>> solved by the computed columns approach. The "custom watermark
> >> strategy"
> >>>> can be added by supplying a class name as paramter in my opinion.
> >>>>
> >>>> Regarding the comments of Lin and Jark:
> >>>>
> >>>> @Lin: Instantiating a TableSource/Sink should not cost much, but we
> >>>> should not mix catalog discussion and DDL at this point.
> >>>>
> >>>> 4) Event-Time Attributes and Watermarks
> >>>> 4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will
> >>>> violate the rule": there is no explicit rule of doing so. Computed
> >>>> column are also not standard compliant, if we can use information that
> >>>> is encoded in constraints we should use it. Adding more and more
> >>>> top-level properties makes the interaction with connectors more
> >>>> difficult. An additional HEADER keyword sounds too connector-specific
> >>>> and also not SQL compliant to me.
> >>>>
> >>>> 3) SOURCE / SINK / BOTH
> >>>> GRANT/INVOKE are mutating an existing table, right? In my opinion,
> >>>> independent of SQL databases but focusing on Flink user requirements,
> a
> >>>> CREATE TABLE statement should be an immutable definition of a
> >> connection
> >>>> to an external system.
> >>>>
> >>>> 7) Table Update Mode
> >>>> As far as I can see, the only thing missing for enabling all table
> >> modes
> >>>> is the declaration of a change flag. We could introduce a new keyword
> >>>> here similar to WATERMARK:
> >>>>
> >>>> CREATE TABLE output_kafka_t1(
> >>>>     id bigint,
> >>>>     msg varchar,
> >>>>     CHANGE_FLAG FOR isRetraction
> >>>> ) WITH (
> >>>>     type=kafka
> >>>>     ,...
> >>>> );
> >>>>
> >>>> CREATE TABLE output_kafka_t1(
> >>>>     CHANGE_FLAG FOR isUpsert
> >>>>     id bigint,
> >>>>     msg varchar,
> >>>>     PRIMARY_KEY(id)
> >>>> ) WITH (
> >>>>     type=kafka
> >>>>     ,...
> >>>> );
> >>>>
> >>>> What do you think?
> >>>>
> >>>> @Jark: We should definitely stage the discussions and mention the
> >>>> opinions and advantages/disadvantages that have been proposed already
> >> in
> >>>> the FLIP.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> Am 10.12.18 um 08:10 schrieb Jark Wu:
> >>>>> Hi all,
> >>>>>
> >>>>> It's great to see we have an agreement on MVP.
> >>>>>
> >>>>> 4.b) Ingesting and writing timestamps to systems.
> >>>>> I would treat the field as a physical column not a virtual column. If
> >>> we
> >>>>> treat it as computed column, it will be confused that the behavior is
> >>>>> different when it is a source or sink.
> >>>>> When it is a physical column, the behavior could be unified. Then the
> >>>>> problem is how to mapping from the field to kafka message timestamp?
> >>>>> One is Lin proposed above and is also used in KSQL[1]. Another idea
> >> is
> >>>>> introducing a HEADER column which strictly map by name to the fields
> >> in
> >>>>> message header.
> >>>>> For example,
> >>>>>
> >>>>> CREATE TABLE output_kafka_t1(
> >>>>>     id bigint,
> >>>>>     ts timestamp HEADER,
> >>>>>     msg varchar
> >>>>> ) WITH (
> >>>>>     type=kafka
> >>>>>     ,...
> >>>>> );
> >>>>>
> >>>>> This is used in Alibaba but not included in the DDL draft. It will
> >>>> further
> >>>>> extend the SQL syntax, which is we should be cautious about. What do
> >>> you
> >>>>> think about this two solutions?
> >>>>>
> >>>>> 4.d) Custom watermark strategies:
> >>>>> @Timo,  I don't have a strong opinion on this.
> >>>>>
> >>>>> 3) SOURCE/SINK/BOTH
> >>>>> Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and
> >>>>> standard way to manage the permission, which is also adopted by
> >> HIVE[2]
> >>>> and
> >>>>> many databases.
> >>>>>
> >>>>> [1]:
> >>> https://docs.confluent.io/current/ksql/docs/tutorials/examples.html
> >>>>> [2]:
> >>>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges
> >>>>> @Timo, it's great if someone can conclude the discussion and
> >> summarize
> >>>> into
> >>>>> a FLIP.
> >>>>> @Shuyi, Thanks a lot for putting it all together. The google doc
> >> looks
> >>>> good
> >>>>> to me, and I left some minor comments there.
> >>>>>
> >>>>> Regarding to the FLIP, I have some suggestions:
> >>>>> 1. The FLIP can contain MILESTONE1 and FUTURE WORKS.
> >>>>> 2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax.
> >>>>> 3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and
> >> ADOPTED.
> >>> We
> >>>>> can derive MILESTONE2 from this easily when it is ready.
> >>>>>
> >>>>> I summarized the Future Works based on Shuyi's work:
> >>>>>
> >>>>> Adopted: (Should detailed described here...)
> >>>>> 1. support data type nullability and precision.
> >>>>> 2. comment on table and columns.
> >>>>>
> >>>>> Under Discussion: (Should briefly describe some options...)
> >>>>> 1. Ingesting and writing timestamps to systems.
> >>>>> 2. support custom watermark strategy.
> >>>>> 3. support table update mode
> >>>>> 4. support row/map/array data type
> >>>>> 5. support schema derivation
> >>>>> 6. support system versioned temporal table
> >>>>> 7. support table index
> >>>>>
> >>>>> We can continue the further discussion here, also can separate to an
> >>>> other
> >>>>> DISCUSS topic if it is a sophisticated problem such as Table Update
> >>> Mode,
> >>>>> Temporal Table.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Mon, 10 Dec 2018 at 11:54, Lin Li <lincoln.86xy@xxxxxxxxx> wrote:
> >>>>>
> >>>>>> hi all,
> >>>>>> Thanks for your valuable input!
> >>>>>>
> >>>>>> 4) Event-Time Attributes and Watermarks:
> >>>>>> 4.b) @Fabian As you mentioned using a computed columns `ts AS
> >>>>>> SYSTEMROWTIME()`
> >>>>>> for writing out to kafka table sink will violate the rule that
> >>> computed
> >>>>>> fields are not emitted.
> >>>>>> Since the timestamp column in kafka's header area is a specific
> >>>>>> materialization protocol,
> >>>>>> why don't we treat it as an connector property? For an example:
> >>>>>> ```
> >>>>>> CREATE TABLE output_kafka_t1(
> >>>>>>     id bigint,
> >>>>>>     ts timestamp,
> >>>>>>     msg varchar
> >>>>>> ) WITH (
> >>>>>>     type=kafka,
> >>>>>>     header.timestamp=ts
> >>>>>>     ,...
> >>>>>> );
> >>>>>> ```
> >>>>>>
> >>>>>> 4d) For custom watermark strategies
> >>>>>> @Fabian Agree with you that opening another topic about this feature
> >>>> later.
> >>>>>> 3) SOURCE / SINK / BOTH
> >>>>>> I think the permissions and availabilities are two separately
> >> things,
> >>>>>> permissions
> >>>>>> can be managed well by using GRANT/INVOKE(you can call it DCL)
> >>> solutions
> >>>>>> which
> >>>>>> commonly used in different DBs. The permission part can be an new
> >>> topic
> >>>> for
> >>>>>> later discussion, what do you think?
> >>>>>>
> >>>>>> For the availabilities, @Fabian @Timo  I've another question,
> >>>>>> does instantiate a TableSource/Sink cost much or has some other
> >>>> downsides?
> >>>>>> IMO, create a new source/sink object via the construct seems not
> >>> costly.
> >>>>>> When receiving a DDL we should associate it with the catalog object
> >>>>>> (reusing an existence or create a new one).
> >>>>>> Am I lost something important?
> >>>>>>
> >>>>>> 5. Schema declaration:
> >>>>>> @Timo  yes, your concern about the user convenience is very
> >> important.
> >>>> But
> >>>>>> I haven't seen a clear way to solve this so far.
> >>>>>> Do we put it later and wait for more inputs from the community?
> >>>>>>
> >>>>>> Shuyi Chen <suez1224@xxxxxxxxx> 于2018年12月8日周六 下午4:27写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> Thanks a lot for the great discussion. I think we can continue the
> >>>>>>> discussion here while carving out a MVP so that the community can
> >>> start
> >>>>>>> working on. Based on the discussion so far, I try to summarize what
> >>> we
> >>>>>> will
> >>>>>>> do for the MVP:
> >>>>>>>
> >>>>>>> MVP
> >>>>>>>
> >>>>>>>      1. support CREATE TABLE
> >>>>>>>      2. support exisiting data type in Flink SQL, ignore
> nullability
> >>> and
> >>>>>>>      precision
> >>>>>>>      3. support table comments and column comments
> >>>>>>>      4. support table constraint PRIMARY KEY and UNIQUE
> >>>>>>>      5. support table properties using key-value pairs
> >>>>>>>      6. support partitioned by
> >>>>>>>      7. support computed column
> >>>>>>>      8. support from-field and from-source timestamp extractors
> >>>>>>>      9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE
> >>>> watermark
> >>>>>>>      strategies.
> >>>>>>>      10. support a table property to allow explicit enforcement of
> >>>>>>>      read/write(source/sink) permission of a table
> >>>>>>>
> >>>>>>> I try to put up the DDL grammar (
> >>>>>>>
> >>>>>>>
> >>
> https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing
> >>>>>>> )
> >>>>>>> based on the MVP features above and the previous design docs.
> >> Please
> >>>>>> take a
> >>>>>>> look and comment on it.
> >>>>>>>
> >>>>>>>
> >>>>>>> Also, I summarize the future Improvement on CREATE TABLE as the
> >>>>>> followings:
> >>>>>>>      1. support table update mode
> >>>>>>>      2. support data type nullability and precision
> >>>>>>>      3. support row/map/array data type
> >>>>>>>      4. support custom timestamp extractor and watermark strategy
> >>>>>>>      5. support schema derivation
> >>>>>>>      6. support system versioned temporal table
> >>>>>>>      7. support table index
> >>>>>>>
> >>>>>>> I suggest we first agree on the MVP feature list and the MVP
> >> grammar.
> >>>> And
> >>>>>>> then we can either continue the discussion of the future
> >> improvements
> >>>>>> here,
> >>>>>>> or create separate JIRAs for each item and discuss further in the
> >>> JIRA.
> >>>>>>> What do you guys think?
> >>>>>>>
> >>>>>>> Shuyi
> >>>>>>>
> >>>>>>> On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twalthr@xxxxxxxxxx>
> >>>> wrote:
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I think we are making good progress. Thanks for all the feedback
> >> so
> >>>>>> far.
> >>>>>>>> 3. Sources/Sinks:
> >>>>>>>> It seems that I can not find supporters for explicit SOURCE/SINK
> >>>>>>>> declaration so I'm fine with not using those keywords.
> >>>>>>>> @Fabian: Maybe we don't haven have to change the TableFactory
> >>>> interface
> >>>>>>>> but just provide some helper functions in the TableFactoryService.
> >>>> This
> >>>>>>>> would solve the availability problem, but the permission problem
> >>> would
> >>>>>>>> still not be solved. If you are fine with it, we could introduce a
> >>>>>>>> property instead?
> >>>>>>>>
> >>>>>>>> 5. Schema declaration:
> >>>>>>>> @Lin: We should find an agreement on this as it requires changes
> >> to
> >>>> the
> >>>>>>>> TableFactory interface. We should minimize changes to this
> >> interface
> >>>>>>>> because it is user-facing. Especially, if format schema and table
> >>>>>> schema
> >>>>>>>> differ, the need for such a functionality is very important. Our
> >>> goal
> >>>>>> is
> >>>>>>>> to connect to existing infrastructure. For example, if we are
> >> using
> >>>>>> Avro
> >>>>>>>> and the existing Avro format has enums but Flink SQL does not
> >>> support
> >>>>>>>> enums, it would be helpful to let the Avro format derive a table
> >>>>>> schema.
> >>>>>>>> Otherwise your need to declare both schemas which leads to CREATE
> >>>> TABLE
> >>>>>>>> statements of 400 lines+.
> >>>>>>>> I think the mentioned query:
> >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> >>>>>>>> format.schema-file = "/my/avrofile.avsc")
> >>>>>>>> is fine and should only be valid if the schema contains no
> >>>> non-computed
> >>>>>>>> columns.
> >>>>>>>>
> >>>>>>>> 7. Table Update Mode:
> >>>>>>>> After thinking about it again, I agree. The mode of the sinks can
> >> be
> >>>>>>>> derived from the query and the existence of a PRIMARY KEY
> >>> declaration.
> >>>>>>>> But Fabian raised a very good point. How do we deal with sources?
> >>>> Shall
> >>>>>>>> we introduce a new keywords similar to WATERMARKS such that a
> >>>>>>>> upsert/retract flag is not part of the visible schema?
> >>>>>>>>
> >>>>>>>> 4a. How to mark a field as attribute?
> >>>>>>>> @Jark: Thanks for the explanation of the WATERMARK clause
> >> semantics.
> >>>>>>>> This is a nice way of marking existing fields. This sounds good to
> >>> me.
> >>>>>>>> 4c) WATERMARK as constraint
> >>>>>>>> I'm fine with leaving the WATERMARK clause in the schema
> >> definition.
> >>>>>>>> 4d) Custom watermark strategies:
> >>>>>>>> I would already think about custom watermark strategies as the
> >>> current
> >>>>>>>> descriptor design already supports this. ScalarFunction's don't
> >> work
> >>>> as
> >>>>>>>> a PeriodicWatermarkAssigner has different semantics. Why not
> >> simply
> >>>>>>>> entering the a full class name here as it is done in the current
> >>>>>> design?
> >>>>>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka)
> >>>>>>>> @Fabian: Yes, your suggestion sounds good to me. This behavior
> >> would
> >>>> be
> >>>>>>>> similar to our current `timestamps: from-source` design.
> >>>>>>>>
> >>>>>>>> Once our discussion has found a conclusion, I would like to
> >>> volunteer
> >>>>>>>> and summarize the outcome of this mailing thread. It nicely aligns
> >>>> with
> >>>>>>>> the update work on the connector improvements document (that I
> >>> wanted
> >>>>>> to
> >>>>>>>> do anyway) and the ongoing external catalog discussion.
> >>> Furthermore, I
> >>>>>>>> would also want to propose how to change existing interfaces by
> >>>> keeping
> >>>>>>>> the DDL, connector improvements, and external catalog support in
> >>> mind.
> >>>>>>>> Would that be ok for you?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Am 07.12.18 um 14:48 schrieb Fabian Hueske:
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> Thanks for the discussion.
> >>>>>>>>> I'd like to share my point of view as well.
> >>>>>>>>>
> >>>>>>>>> 4) Event-Time Attributes and Watermarks:
> >>>>>>>>> 4.a) I agree with Lin and Jark's proposal. Declaring a watermark
> >> on
> >>>>>> an
> >>>>>>>>> attribute declares it as an event-time attribute.
> >>>>>>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka). We
> >>>>>> could
> >>>>>>>> use
> >>>>>>>>> a special function like (ts AS SYSTEMROWTIME()). This function
> >> will
> >>>>>>>>> indicate that we read the timestamp directly from the system (and
> >>> not
> >>>>>>> the
> >>>>>>>>> data). We can also write the field back to the system when
> >> emitting
> >>>>>> the
> >>>>>>>>> table (violating the rule that computed fields are not emitted).
> >>>>>>>>> 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE
> >> KEY
> >>>>>>>>> constraint and therefore keep it in the schema definition.
> >>>>>>>>> 4d) For custom watermark strategies, a simple expressions or
> >>>>>>>>> ScalarFunctions won't be sufficient. Sophisticated approaches
> >> could
> >>>>>>>> collect
> >>>>>>>>> histograms, etc. But I think we can leave that out for later.
> >>>>>>>>>
> >>>>>>>>> 3) SOURCE / SINK / BOTH
> >>>>>>>>> As you said, there are two things to consider here: permission
> >> and
> >>>>>>>>> availability of a TableSource/TableSink.
> >>>>>>>>> I think that neither should be a reason to add a keyword at such
> >> a
> >>>>>>>>> sensitive position.
> >>>>>>>>> However, I also see Timo's point that it would be good to know
> >>>>>> up-front
> >>>>>>>> how
> >>>>>>>>> a table can be used without trying to instantiate a
> >>> TableSource/Sink
> >>>>>>> for
> >>>>>>>> a
> >>>>>>>>> query.
> >>>>>>>>> Maybe we can extend the TableFactory such that it provides
> >>>>>> information
> >>>>>>>>> about which sources/sinks it can provide.
> >>>>>>>>>
> >>>>>>>>> 7. Table Update Mode
> >>>>>>>>> Something that we definitely need to consider is how tables are
> >>>>>>> ingested,
> >>>>>>>>> i.e., append, retract or upsert.
> >>>>>>>>> Especially, since upsert and retraction need a meta-data column
> >>> that
> >>>>>>>>> indicates whether an event is an insert (or upsert) or a delete
> >>>>>> change.
> >>>>>>>>> This column needs to be identified somehow, most likely as part
> >> of
> >>>>>> the
> >>>>>>>>> input format. Ideally, this column should not be part of the
> >> table
> >>>>>>> schema
> >>>>>>>>> (as it would be always true).
> >>>>>>>>> Emitting tables is not so much of an issue as the properties of
> >> the
> >>>>>>> table
> >>>>>>>>> tell use what to do (append-only/update, unique key y/n).
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Fabian
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <
> >>> imjark@xxxxxxxxx
> >>>>>>> :
> >>>>>>>>>> Hi Timo,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for your quickly feedback! Here are some of my thoughts:
> >>>>>>>>>>
> >>>>>>>>>> Append, upserts, retract mode on sinks is also a very complex
> >>>>>>> problem. I
> >>>>>>>>>> think append/upserts/retract is the ability of a table, user do
> >>> not
> >>>>>>>> need to
> >>>>>>>>>> specify a table is used for append or retraction or upsert. The
> >>>>>> query
> >>>>>>>> can
> >>>>>>>>>> choose which mode the sink is. If an unbounded groupby is
> >> inserted
> >>>>>>> into
> >>>>>>>> an
> >>>>>>>>>> append sink (the sink only implements/supports append), an
> >>> exception
> >>>>>>>> can be
> >>>>>>>>>> thrown. A more complex problem is, if we want to write
> >>>>>>>> retractions/upserts
> >>>>>>>>>> to Kafka, how to encode the change flag (add or retract/delete)
> >> on
> >>>>>> the
> >>>>>>>>>> table? Maybe we should propose some protocal for the change flag
> >>>>>>>> encoding,
> >>>>>>>>>> but I don't have a clear idea about this right now.
> >>>>>>>>>>
> >>>>>>>>>> 3. Sources/Sinks: The source/sink tag is similar to the
> >>>>>>>>>> append/upsert/retract problem. Besides source/sink, actully we
> >>> have
> >>>>>>>> stream
> >>>>>>>>>> source, stream sink, batch source, batch sink, and the stream
> >> sink
> >>>>>>> also
> >>>>>>>>>> include append/upsert/retract three modes. Should we put all the
> >>>>>> tags
> >>>>>>> on
> >>>>>>>>>> the CREATE TABLE? IMO, the table's ability is defined by the
> >> table
> >>>>>>>> itself,
> >>>>>>>>>> user do not need to specify it. If it is only a readable table,
> >> an
> >>>>>>>>>> exception can be thrown when write to it. As the source/sink tag
> >>> can
> >>>>>>> be
> >>>>>>>>>> omitted in CREATE TABLE, could we skip it and only support
> >> CREATE
> >>>>>>> TABLE
> >>>>>>>> in
> >>>>>>>>>> the first version, and add it back in the future when we really
> >>> need
> >>>>>>>> it? It
> >>>>>>>>>> keeps API compatible and make sure the MVP is what we consider
> >>>>>>> clearly.
> >>>>>>>>>> 4a. How to mark a field as attribute?
> >>>>>>>>>> The watermark definition includes two parts: use which field as
> >>> time
> >>>>>>>>>> attribute and use what generate strategy.
> >>>>>>>>>> When we want to mark `ts` field as attribute: WATERMARK FOR `ts`
> >>> AS
> >>>>>>>> OFFSET
> >>>>>>>>>> '5' SECOND.
> >>>>>>>>>> If we have a POJO{id, user, ts} field named "pojo", we can mark
> >> it
> >>>>>>> like
> >>>>>>>>>> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND
> >>>>>>>>>>
> >>>>>>>>>> 4b. timestamp write to Kafka message header
> >>>>>>>>>> Even though we can define multiple time attribute on a table,
> >> only
> >>>>>> one
> >>>>>>>> time
> >>>>>>>>>> attribute can be actived/used in a query (in a stream). When we
> >>>>>> enable
> >>>>>>>>>> `writeTiemstamp`, the only attribute actived in the stream will
> >> be
> >>>>>>>> write to
> >>>>>>>>>> Kafka message header. What I mean the timestmap in StreamRecord
> >> is
> >>>>>> the
> >>>>>>>> time
> >>>>>>>>>> attribute in the stream.
> >>>>>>>>>>
> >>>>>>>>>> 4c. Yes. We introduced the WATERMARK keyword similar to the
> >> INDEX,
> >>>>>>>> PRIMARY
> >>>>>>>>>> KEY keywords.
> >>>>>>>>>>
> >>>>>>>>>> @Timo, Do you have any other advice or questions on the
> >> watermark
> >>>>>>>> syntax ?
> >>>>>>>>>> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS
> >>>>>>>> "OFFSET"
> >>>>>>>>>> VS ...
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Jark
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.86xy@xxxxxxxxx>
> >>> wrote:
> >>>>>>>>>>> Hi Timo,
> >>>>>>>>>>> Thanks for your feedback, here's some thoughts of mine:
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Sources/Sinks:
> >>>>>>>>>>> "Let's assume an interactive CLI session, people should be able
> >>> to
> >>>>>>> list
> >>>>>>>>>> all
> >>>>>>>>>>> source table and sink tables to know upfront if they can use an
> >>>>>>> INSERT
> >>>>>>>>>> INTO
> >>>>>>>>>>> here or not."
> >>>>>>>>>>> This requirement can be simply resolved by a document that list
> >>> all
> >>>>>>>>>>> supported source/sink/both connectors and the sql-client can
> >>>>>> perform
> >>>>>>> a
> >>>>>>>>>>> quick check. It's only an implementation choice, not necessary
> >>> for
> >>>>>>> the
> >>>>>>>>>>> syntax.
> >>>>>>>>>>> For connector implementation, a connector may implement one or
> >>> some
> >>>>>>> or
> >>>>>>>>>> all
> >>>>>>>>>>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can
> >>>>>> derive
> >>>>>>>> the
> >>>>>>>>>>> availability for any give query without the SOURCE/SINk
> >> keywords
> >>> or
> >>>>>>>>>>> specific table properties in WITH clause.
> >>>>>>>>>>> Since there's still indeterminacy, shall we skip these two
> >>> keywords
> >>>>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>> MVP DDL? We can make further discussion after users' feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> 6. Partitioning and keys
> >>>>>>>>>>> Agree with you that raise the priority of table constraint and
> >>>>>>>>>> partitioned
> >>>>>>>>>>> table support for better connectivity to Hive and Kafka. I'll
> >> add
> >>>>>>>>>>> partitioned table syntax(compatible to hive) into the DDL Draft
> >>> doc
> >>>>>>>>>>> later[1].
> >>>>>>>>>>>
> >>>>>>>>>>> 5. Schema declaration
> >>>>>>>>>>> "if users want to declare computed columns they have a "schema"
> >>>>>>>>>> constraints
> >>>>>>>>>>> but without columns
> >>>>>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> >>>>>>>>>>> format.schema-file = "/my/avrofile.avsc") "
> >>>>>>>>>>>
> >>>>>>>>>>>    From the point of my view, this ddl is invalid because the
> >>>> primary
> >>>>>>> key
> >>>>>>>>>>> constraint already references two columns but types unseen.
> >>>>>>>>>>> And Xuefu pointed a important matching problem, so let's put
> >>> schema
> >>>>>>>>>>> derivation as a follow-up extension ?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月6日周四 下午6:05写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> great to have such a lively discussion. My next batch of
> >>> feedback:
> >>>>>>>>>>>> @Jark: We don't need to align the descriptor approach with
> >> SQL.
> >>>>>> I'm
> >>>>>>>>>> open
> >>>>>>>>>>>> for different approaches as long as we can serve a broad set
> >> of
> >>>>>> use
> >>>>>>>>>>>> cases and systems. The descriptor approach was a first attempt
> >>> to
> >>>>>>>> cover
> >>>>>>>>>>>> all aspects and connector/format characteristics. Just another
> >>>>>>>> example,
> >>>>>>>>>>>> that is missing in the DDL design: How can a user decide if
> >>>>>> append,
> >>>>>>>>>>>> retraction, or upserts should be used to sink data into the
> >>> target
> >>>>>>>>>>>> system? Do we want to define all these improtant properties in
> >>> the
> >>>>>>> big
> >>>>>>>>>>>> WITH property map? If yes, we are already close to the
> >>> descriptor
> >>>>>>>>>>>> approach. Regarding the "standard way", most DDL languages
> >> have
> >>>>>> very
> >>>>>>>>>>>> custom syntax so there is not a real "standard".
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access
> >> it
> >>>>>> can
> >>>>>>>> be
> >>>>>>>>>>>> created using a regular CREATE TABLE (omitting a specific
> >>>>>>> source/sink)
> >>>>>>>>>>>> declaration. Regarding the transition from source/sink to
> >> both,
> >>>>>> yes
> >>>>>>> we
> >>>>>>>>>>>> would need to update the a DDL and catalogs. But is this a
> >>>>>> problem?
> >>>>>>>> One
> >>>>>>>>>>>> also needs to add new queries that use the tables. @Xuefu: It
> >> is
> >>>>>> not
> >>>>>>>>>>>> only about security aspects. Especially for streaming use
> >> cases,
> >>>>>> not
> >>>>>>>>>>>> every connector can be used as a source easily. For example, a
> >>>>>> JDBC
> >>>>>>>>>> sink
> >>>>>>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI
> >>>>>>> session,
> >>>>>>>>>>>> people should be able to list all source table and sink tables
> >>> to
> >>>>>>> know
> >>>>>>>>>>>> upfront if they can use an INSERT INTO here or not.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this
> >> in
> >>>>>> the
> >>>>>>>>>>>> design given that Hive integration and Kafka key support are
> >> in
> >>>>>> the
> >>>>>>>>>>>> making/are on our roadmap for this release.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 5. Schema declaration: @Lin: You are right it is not
> >>> conflicting.
> >>>>>> I
> >>>>>>>>>> just
> >>>>>>>>>>>> wanted to raise the point because if users want to declare
> >>>>>> computed
> >>>>>>>>>>>> columns they have a "schema" constraints but without columns.
> >>> Are
> >>>>>> we
> >>>>>>>> ok
> >>>>>>>>>>>> with a syntax like ...
> >>>>>>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> >>>>>>>>>>>> format.schema-file = "/my/avrofile.avsc") ?
> >>>>>>>>>>>> @Xuefu: Yes, you are right that an external schema might not
> >>>>>> excatly
> >>>>>>>>>>>> match but this is true for both directions:
> >>>>>>>>>>>> table schema "derives" format schema and format schema
> >> "derives"
> >>>>>>> table
> >>>>>>>>>>>> schema.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular
> >> but
> >>> we
> >>>>>>>>>>>> should not just adopt everything from Hive as there syntax is
> >>> very
> >>>>>>>>>>>> batch-specific. We should come up with a superset of
> >> historical
> >>>>>> and
> >>>>>>>>>>>> future requirements. Supporting Hive queries can be an
> >>>>>> intermediate
> >>>>>>>>>>>> layer on top of Flink's DDL.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the
> >>>>>>>> TimestampExtractor
> >>>>>>>>>>>> interface as this is also important for better separation of
> >>>>>>> connector
> >>>>>>>>>>>> and table module [1]. However, I'm wondering about watermark
> >>>>>>>>>> generation.
> >>>>>>>>>>>> 4a. timestamps are in the schema twice:
> >>>>>>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it
> >> as
> >>>>>>>>>>>> rowtime": yes, but we need to mark a field as such an
> >> attribute.
> >>>>>> How
> >>>>>>>>>>>> does the syntax for marking look like? Also in case of
> >>> timestamps
> >>>>>>> that
> >>>>>>>>>>>> are nested in the schema?
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4b. how can we write out a timestamp into the message header?:
> >>>>>>>>>>>> I agree to simply ignore computed columns when writing out.
> >> This
> >>>>>> is
> >>>>>>>>>> like
> >>>>>>>>>>>> 'field-change: add' that I mentioned in the improvements
> >>> document.
> >>>>>>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to
> >>> Kafka
> >>>>>>>>>>>> message header": Unfortunately, there is no timestamp in the
> >>>>>> stream
> >>>>>>>>>>>> record. Additionally, multiple time attributes can be in a
> >>> schema.
> >>>>>>> So
> >>>>>>>>>> we
> >>>>>>>>>>>> need a constraint that tells the sink which column to use
> >>>>>> (possibly
> >>>>>>>>>>>> computed as well)?
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4c. separate all time attribute concerns into a special clause
> >>>>>> next
> >>>>>>> to
> >>>>>>>>>>>> the regular schema?
> >>>>>>>>>>>> @Jark: I don't have a strong opinion on this. I just have the
> >>>>>>> feeling
> >>>>>>>>>>>> that the "schema part" becomes quite messy because the actual
> >>>>>> schema
> >>>>>>>>>>>> with types and fields is accompanied by so much metadata about
> >>>>>>>>>>>> timestamps, watermarks, keys,... and we would need to
> >> introduce
> >>> a
> >>>>>>> new
> >>>>>>>>>>>> WATERMARK keyword within a schema that was close to standard
> >> up
> >>> to
> >>>>>>>> this
> >>>>>>>>>>>> point.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks everyone,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-9461
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Am 06.12.18 um 07:08 schrieb Jark Wu:
> >>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thank you for the valuable feedbacks.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> First of all, I think we don't need to align the SQL
> >>>>>> functionality
> >>>>>>> to
> >>>>>>>>>>>>> Descriptor. Because SQL is a more standard API, we should be
> >> as
> >>>>>>>>>>> cautious
> >>>>>>>>>>>> as
> >>>>>>>>>>>>> possible to extend the SQL syntax. If something can be done
> >> in
> >>> a
> >>>>>>>>>>> standard
> >>>>>>>>>>>>> way, we shouldn't introduce something new.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Here are some of my thoughts:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. Scope: Agree.
> >>>>>>>>>>>>> 2. Constraints: Agree.
> >>>>>>>>>>>>> 4. Time attributes:
> >>>>>>>>>>>>>       4a. timestamps are in the schema twice.
> >>>>>>>>>>>>>        If an existing field is Long/Timestamp, we can just
> use
> >>> it
> >>>>>> as
> >>>>>>>>>>>> rowtime,
> >>>>>>>>>>>>> no twice defined. If it is not a Long/Timestamp, we use
> >>> computed
> >>>>>>>>>> column
> >>>>>>>>>>>> to
> >>>>>>>>>>>>> get an expected timestamp column to be rowtime, is this what
> >>> you
> >>>>>>> mean
> >>>>>>>>>>>>> defined twice?  But I don't think it is a problem, but an
> >>>>>>> advantages,
> >>>>>>>>>>>>> because it is easy to use, user do not need to consider
> >> whether
> >>>>>> to
> >>>>>>>>>>>> "replace
> >>>>>>>>>>>>> the existing column" or "add a new column", he will not be
> >>>>>> confused
> >>>>>>>>>>>> what's
> >>>>>>>>>>>>> the real schema is, what's the index of rowtime in the
> >> schema?
> >>>>>>>>>>> Regarding
> >>>>>>>>>>>> to
> >>>>>>>>>>>>> the optimization, even if timestamps are in schema twice,
> >> when
> >>>>>> the
> >>>>>>>>>>>> original
> >>>>>>>>>>>>> timestamp is never used in query, then the projection
> >> pushdown
> >>>>>>>>>>>> optimization
> >>>>>>>>>>>>> can cut this field as early as possible, which is exactly the
> >>>>>> same
> >>>>>>> as
> >>>>>>>>>>>>> "replacing the existing column" in runtime.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        4b. how can we write out a timestamp into the message
> >>>>>> header?
> >>>>>>>>>>>>>         That's a good point. I think computed column is just
> a
> >>>>>>> virtual
> >>>>>>>>>>>> column
> >>>>>>>>>>>>> on table which is only relative to reading. If we want to
> >> write
> >>>>>> to
> >>>>>>> a
> >>>>>>>>>>>> table
> >>>>>>>>>>>>> with computed column defined, we only need to provide the
> >>> columns
> >>>>>>>>>>> except
> >>>>>>>>>>>>> computed columns (see SQL Server [1]). The computed column is
> >>>>>>> ignored
> >>>>>>>>>>> in
> >>>>>>>>>>>>> the insert statement. Get back to the question, how can we
> >>> write
> >>>>>>> out
> >>>>>>>>>> a
> >>>>>>>>>>>>> timestamp into the message header? IMO, we can provide a
> >>>>>>>>>> configuration
> >>>>>>>>>>> to
> >>>>>>>>>>>>> support this, such as `kafka.writeTimestamp=true`, then the
> >>>>>>> timestmap
> >>>>>>>>>>> in
> >>>>>>>>>>>>> StreamRecord will be write to Kafka message header. What do
> >> you
> >>>>>>>>>> think?
> >>>>>>>>>>>>>         4c. separate all time attribute concerns into a
> >> special
> >>>>>>> clause
> >>>>>>>>>>> next
> >>>>>>>>>>>> to
> >>>>>>>>>>>>> the regular schema?
> >>>>>>>>>>>>>         Separating watermark into a special clause similar to
> >>>>>>>>>> PARTITIONED
> >>>>>>>>>>>> BY is
> >>>>>>>>>>>>> also a good idea. Conceptually, it's fine to put watermark in
> >>>>>>> schema
> >>>>>>>>>>> part
> >>>>>>>>>>>>> or out schema part. But if we want to support multiple
> >>> watermark
> >>>>>>>>>>>>> definition, maybe it would be better to put it in schema
> >> part.
> >>> It
> >>>>>>> is
> >>>>>>>>>>>>> similar to Index Definition that we can define several
> >> indexes
> >>>>>> on a
> >>>>>>>>>>> table
> >>>>>>>>>>>>> in schema part.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>         4d. How can people come up with a custom watermark
> >>>>>> strategy?
> >>>>>>>>>>>>>         In most cases, the built-in strategy can works good.
> >> If
> >>> we
> >>>>>>> need
> >>>>>>>>>> a
> >>>>>>>>>>>>> custom one, we can use a scalar function which restrict to
> >> only
> >>>>>>>>>> return
> >>>>>>>>>>> a
> >>>>>>>>>>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime
> >> AS
> >>>>>>>>>>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined
> >>>>>> scalar
> >>>>>>>>>>>> function
> >>>>>>>>>>>>> accepts 3 parameters and return a nullable Long which can be
> >>> used
> >>>>>>> as
> >>>>>>>>>>>>> punctuated watermark assigner. Another choice is
> >> implementing a
> >>>>>>> class
> >>>>>>>>>>>>> extending the
> >>>>>>>>>>>>>
> >> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy`
> >>>>>> and
> >>>>>>>>>> use
> >>>>>>>>>>>> it
> >>>>>>>>>>>>> in SQL: WATERMARK for rowtime AS
> >> 'com.my.MyWatermarkStrategy'.
> >>>>>> But
> >>>>>>> if
> >>>>>>>>>>>>> scalar function can cover the requirements here, I would
> >> prefer
> >>>>>> it
> >>>>>>>>>>> here,
> >>>>>>>>>>>>> because it keeps standard compliant. BTW, this feature is not
> >>> in
> >>>>>>> MVP,
> >>>>>>>>>>> we
> >>>>>>>>>>>>> can discuss it more depth in the future when we need it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 5. Schema declaration:
> >>>>>>>>>>>>> I like the proposal to omit the schema if we can get the
> >> schema
> >>>>>>> from
> >>>>>>>>>>>>> external storage or something schema file. Actually, we have
> >>>>>>> already
> >>>>>>>>>>>>> encountered this requirement in out company.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> +1 to @Xuefu that we should be as close as possible to Hive
> >>>>>> syntax
> >>>>>>>>>>> while
> >>>>>>>>>>>>> keeping SQL ANSI standard. This will make it more acceptable
> >>> and
> >>>>>>>>>> reduce
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> learning cost for user.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>
> >>
> https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <
> >>>>>> xuefu.z@xxxxxxxxxxxxxxx
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>> Hi Timo/Shuyi/Lin,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the discussions. It seems that we are converging
> >> to
> >>>>>>>>>>> something
> >>>>>>>>>>>>>> meaningful. Here are some of my thoughts:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. +1 on MVP DDL
> >>>>>>>>>>>>>> 3. Markers for source or sink seem more about permissions on
> >>>>>>> tables
> >>>>>>>>>>> that
> >>>>>>>>>>>>>> belong to a security component. Unless the table is created
> >>>>>>>>>>> differently
> >>>>>>>>>>>>>> based on source, sink, or both, it doesn't seem necessary to
> >>> use
> >>>>>>>>>> these
> >>>>>>>>>>>>>> keywords to enforce permissions.
> >>>>>>>>>>>>>> 5. It might be okay if schema declaration is always needed.
> >>>>>> While
> >>>>>>>>>>> there
> >>>>>>>>>>>>>> might be some duplication sometimes, it's not always true.
> >> For
> >>>>>>>>>>> example,
> >>>>>>>>>>>>>> external schema may not be exactly matching Flink schema.
> >> For
> >>>>>>>>>>> instance,
> >>>>>>>>>>>>>> data types. Even if so, perfect match is not required. For
> >>>>>>> instance,
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> external schema file may evolve while table schema in Flink
> >>> may
> >>>>>>> stay
> >>>>>>>>>>>>>> unchanged. A responsible reader should be able to scan the
> >>> file
> >>>>>>>>>> based
> >>>>>>>>>>> on
> >>>>>>>>>>>>>> file schema and return the data based on table schema.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Other aspects:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to
> >>>>>>> operate
> >>>>>>>>>> on
> >>>>>>>>>>>>>> Hive metadata and data, it's an add-on benefit if we can be
> >>>>>>>>>> compatible
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>> Hive syntax/semantics while following ANSI standard. At
> >> least
> >>> we
> >>>>>>>>>>> should
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>> as close as possible. Hive DDL can found at
> >>>>>>>>>>>>>>
> >> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Xuefu
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>> ------------------------------------------------------------------
> >>>>>>>>>>>>>> Sender:Lin Li <lincoln.86xy@xxxxxxxxx>
> >>>>>>>>>>>>>> Sent at:2018 Dec 6 (Thu) 10:49
> >>>>>>>>>>>>>> Recipient:dev <dev@xxxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Timo and Shuyi,
> >>>>>>>>>>>>>>       thanks for your feedback.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Scope
> >>>>>>>>>>>>>> agree with you we should focus on the MVP DDL first.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. Constraints
> >>>>>>>>>>>>>> yes, this can be a follow-up issue.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3. Sources/Sinks
> >>>>>>>>>>>>>> If a TABLE has both read/write access requirements, should
> >> we
> >>>>>>>>>> declare
> >>>>>>>>>>> it
> >>>>>>>>>>>>>> using
> >>>>>>>>>>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further
> >>>>>>>>>> question,
> >>>>>>>>>>>> if a
> >>>>>>>>>>>>>> TABLE
> >>>>>>>>>>>>>> t1 firstly declared as read only (as a source table), then
> >> for
> >>>>>>> some
> >>>>>>>>>>> new
> >>>>>>>>>>>>>> requirements
> >>>>>>>>>>>>>> t1 will change to a sink table,  in this case we need
> >> updating
> >>>>>>> both
> >>>>>>>>>>> the
> >>>>>>>>>>>> DDL
> >>>>>>>>>>>>>> and catalogs.
> >>>>>>>>>>>>>> Further more, let's think about the BATCH query, update one
> >>>>>> table
> >>>>>>>>>>>> in-place
> >>>>>>>>>>>>>> can be a common case.
> >>>>>>>>>>>>>> e.g.,
> >>>>>>>>>>>>>> ```
> >>>>>>>>>>>>>> CREATE TABLE t1 (
> >>>>>>>>>>>>>>       col1 varchar,
> >>>>>>>>>>>>>>       col2 int,
> >>>>>>>>>>>>>>       col3 varchar
> >>>>>>>>>>>>>>       ...
> >>>>>>>>>>>>>> );
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> INSERT [OVERWRITE] TABLE t1
> >>>>>>>>>>>>>> AS
> >>>>>>>>>>>>>> SELECT
> >>>>>>>>>>>>>>       (some computing ...)
> >>>>>>>>>>>>>> FROM t1;
> >>>>>>>>>>>>>> ```
> >>>>>>>>>>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the
> >>>>>>>>>> validation
> >>>>>>>>>>>>>> purpose, we can find out other ways.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4. Time attributes
> >>>>>>>>>>>>>> As Shuyi mentioned before, there exists an
> >>>>>>>>>>>>>>
> >>> `org.apache.flink.table.sources.tsextractors.TimestampExtractor`
> >>>>>>> for
> >>>>>>>>>>>> custom
> >>>>>>>>>>>>>> defined time attributes usage, but this expression based
> >> class
> >>>>>> is
> >>>>>>>>>> more
> >>>>>>>>>>>>>> friendly for table api not the SQL.
> >>>>>>>>>>>>>> ```
> >>>>>>>>>>>>>> /**
> >>>>>>>>>>>>>>       * Provides the an expression to extract the timestamp
> >>> for a
> >>>>>>>>>> rowtime
> >>>>>>>>>>>>>> attribute.
> >>>>>>>>>>>>>>       */
> >>>>>>>>>>>>>> abstract class TimestampExtractor extends
> >> FieldComputer[Long]
> >>>>>> with
> >>>>>>>>>>>>>> Serializable {
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       /** Timestamp extractors compute the timestamp as
> Long.
> >>> */
> >>>>>>>>>>>>>>       override def getReturnType: TypeInformation[Long] =
> >>>>>>>>>>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]]
> >>>>>>>>>>>>>> }
> >>>>>>>>>>>>>> ```
> >>>>>>>>>>>>>> BTW, I think both the Scalar function and the
> >>> TimestampExtractor
> >>>>>>> are
> >>>>>>>>>>>>>> expressing computing logic, the TimestampExtractor has no
> >> more
> >>>>>>>>>>>> advantage in
> >>>>>>>>>>>>>> SQL scenarios.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 6. Partitioning and keys
> >>>>>>>>>>>>>> Primary Key is included in Constraint part, and partitioned
> >>>>>> table
> >>>>>>>>>>>> support
> >>>>>>>>>>>>>> can be another topic later.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 5. Schema declaration
> >>>>>>>>>>>>>> Agree with you that we can do better schema derivation for
> >>> user
> >>>>>>>>>>>>>> convenience, but this is not conflict with the syntax.
> >>>>>>>>>>>>>> Table properties can carry any useful informations both for
> >>> the
> >>>>>>>>>> users
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> the framework, I like your `contract name` proposal,
> >>>>>>>>>>>>>> e.g., `WITH (format.type = avro)`, the framework can
> >> recognize
> >>>>>>> some
> >>>>>>>>>>>>>> `contract name` like `format.type`, `connector.type` and
> >> etc.
> >>>>>>>>>>>>>> And also derive the table schema from an existing schema
> >> file
> >>>>>> can
> >>>>>>> be
> >>>>>>>>>>>> handy
> >>>>>>>>>>>>>> especially one with too many table columns.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Lin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月5日周三 下午10:40写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Jark and Shuyi,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> thanks for pushing the DDL efforts forward. I agree that we
> >>>>>>> should
> >>>>>>>>>>> aim
> >>>>>>>>>>>>>>> to combine both Shuyi's design and your design.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Here are a couple of concerns that I think we should
> >> address
> >>> in
> >>>>>>> the
> >>>>>>>>>>>>>> design:
> >>>>>>>>>>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE
> >>>>>> statements
> >>>>>>>>>>> first.
> >>>>>>>>>>>>>>> I think this topic has already enough potential for long
> >>>>>>>>>> discussions
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>> is very helpful for users. We can discuss CREATE VIEW and
> >>>>>> CREATE
> >>>>>>>>>>>>>>> FUNCTION afterwards as they are not related to each other.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2. Constraints: I think we should consider things like
> >>>>>>> nullability,
> >>>>>>>>>>>>>>> VARCHAR length, and decimal scale and precision in the
> >> future
> >>>>>> as
> >>>>>>>>>> they
> >>>>>>>>>>>>>>> allow for nice optimizations. However, since both the
> >>>>>> translation
> >>>>>>>>>> and
> >>>>>>>>>>>>>>> runtime operators do not support those features. I would
> >> not
> >>>>>>>>>>> introduce
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>> arbitrary default value but omit those parameters for now.
> >>> This
> >>>>>>> can
> >>>>>>>>>>> be
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>> follow-up issue once the basic DDL has been merged.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs
> >>>>>>> CREATE
> >>>>>>>>>>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow
> >>> for
> >>>>>>>>>> these
> >>>>>>>>>>>>>>> explicit declaration because in most production scenarios,
> >>>>>> teams
> >>>>>>>>>> have
> >>>>>>>>>>>>>>> strict read/write access requirements. For example, a data
> >>>>>>> science
> >>>>>>>>>>> team
> >>>>>>>>>>>>>>> should only consume from a event Kafka topic but should not
> >>>>>>>>>>> accidently
> >>>>>>>>>>>>>>> write back to the single source of truth.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 4. Time attributes: In general, I like your computed
> >> columns
> >>>>>>>>>> approach
> >>>>>>>>>>>>>>> because it makes defining a rowtime attributes transparent
> >>> and
> >>>>>>>>>>> simple.
> >>>>>>>>>>>>>>> However, there are downsides that we should discuss.
> >>>>>>>>>>>>>>> 4a. Jarks current design means that timestamps are in the
> >>>>>> schema
> >>>>>>>>>>> twice.
> >>>>>>>>>>>>>>> The design that is mentioned in [1] makes this more
> >> flexible
> >>> as
> >>>>>>> it
> >>>>>>>>>>>>>>> either allows to replace an existing column or add a
> >> computed
> >>>>>>>>>> column.
> >>>>>>>>>>>>>>> 4b. We need to consider the zoo of storage systems that is
> >>> out
> >>>>>>>>>> there
> >>>>>>>>>>>>>>> right now. Take Kafka as an example, how can we write out a
> >>>>>>>>>> timestamp
> >>>>>>>>>>>>>>> into the message header? We need to think of a reverse
> >>>>>> operation
> >>>>>>>>>> to a
> >>>>>>>>>>>>>>> computed column.
> >>>>>>>>>>>>>>> 4c. Does defining a watermark really fit into the schema
> >> part
> >>>>>> of
> >>>>>>> a
> >>>>>>>>>>>>>>> table? Shouldn't we separate all time attribute concerns
> >>> into a
> >>>>>>>>>>> special
> >>>>>>>>>>>>>>> clause next to the regular schema, similar how PARTITIONED
> >> BY
> >>>>>>> does
> >>>>>>>>>> it
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> Hive?
> >>>>>>>>>>>>>>> 4d. How can people come up with a custom watermark
> >> strategy?
> >>> I
> >>>>>>>>>> guess
> >>>>>>>>>>>>>>> this can not be implemented in a scalar function and would
> >>>>>>> require
> >>>>>>>>>>> some
> >>>>>>>>>>>>>>> new type of UDF?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 6. Partitioning and keys: Another question that the DDL
> >>> design
> >>>>>>>>>> should
> >>>>>>>>>>>>>>> answer is how do we express primary keys (for upserts),
> >>>>>>>>>> partitioning
> >>>>>>>>>>>>>>> keys (for Hive, Kafka message keys). All part of the table
> >>>>>>> schema?
> >>>>>>>>>>>>>>> 5. Schema declaration: I find it very annoying that we want
> >>> to
> >>>>>>>>>> force
> >>>>>>>>>>>>>>> people to declare all columns and types again even though
> >>> this
> >>>>>> is
> >>>>>>>>>>>>>>> usually already defined in some company-wide format. I know
> >>>>>> that
> >>>>>>>>>>>> catalog
> >>>>>>>>>>>>>>> support will greatly improve this. But if no catalog is
> >> used,
> >>>>>>>>>> people
> >>>>>>>>>>>>>>> need to manually define a schema with 50+ fields in a Flink
> >>>>>> DDL.
> >>>>>>>>>>> What I
> >>>>>>>>>>>>>>> actually promoted having two ways of reading data:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. Either the format derives its schema from the table
> >>> schema.
> >>>>>>>>>>>>>>> CREATE TABLE (col INT) WITH (format.type = avro)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2. Or the table schema can be omitted and the format schema
> >>>>>>> defines
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> table schema (+ time attributes).
> >>>>>>>>>>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file =
> >>>>>>>>>>>>>>> "/my/avrofile.avsc")
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please let me know what you think about each item. I will
> >> try
> >>>>>> to
> >>>>>>>>>>>>>>> incorporate your feedback in [1] this week.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf
> >>>>>>>>>>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu:
> >>>>>>>>>>>>>>>> Hi Shuyi,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It's exciting to see we can make such a great progress
> >> here.
> >>>>>>>>>>>>>>>> Regarding to the watermark:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Watermarks can be defined on any columns (including
> >>>>>>>>>> computed-column)
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> table schema.
> >>>>>>>>>>>>>>>> The computed column can be computed from existing columns
> >>>>>> using
> >>>>>>>>>>>> builtin
> >>>>>>>>>>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction).
> >>>>>>>>>>>>>>>> So IMO, it can work out for almost all the scenarios not
> >>> only
> >>>>>>>>>> common
> >>>>>>>>>>>>>>>> scenarios.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I don't think using a `TimestampExtractor` to support
> >> custom
> >>>>>>>>>>> timestamp
> >>>>>>>>>>>>>>>> extractor in SQL is a good idea. Because
> >>> `TimestampExtractor`
> >>>>>>>>>>>>>>>> is not a SQL standard function. If we support
> >>>>>>> `TimestampExtractor`
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>> SQL,
> >>>>>>>>>>>>>>>> do we need to support CREATE FUNCTION for
> >>>>>> `TimestampExtractor`?
> >>>>>>>>>>>>>>>> I think `ScalarFunction` can do the same thing with
> >>>>>>>>>>>>>> `TimestampExtractor`
> >>>>>>>>>>>>>>>> but more powerful and standard.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The core idea of the watermark definition syntax is that
> >> the
> >>>>>>>>>> schema
> >>>>>>>>>>>>>> part
> >>>>>>>>>>>>>>>> defines all the columns of the table, it is exactly what
> >> the
> >>>>>>> query
> >>>>>>>>>>>>>> sees.
> >>>>>>>>>>>>>>>> The watermark part is something like a primary key
> >>> definition
> >>>>>> or
> >>>>>>>>>>>>>>> constraint
> >>>>>>>>>>>>>>>> on SQL Table, it has no side effect on the schema, only
> >>>>>> defines
> >>>>>>>>>> what
> >>>>>>>>>>>>>>>> watermark strategy is and makes which field as the rowtime
> >>>>>>>>>> attribute
> >>>>>>>>>>>>>>> field.
> >>>>>>>>>>>>>>>> If the rowtime field is not in the existing fields, we can
> >>> use
> >>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>> to generate it from other existing fields. The Descriptor
> >>>>>>> Pattern
> >>>>>>>>>>> API
> >>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>> is very useful when writing a Table API job, but is not
> >>>>>>>>>>> contradictory
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> Watermark DDL from my perspective.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
> >>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <
> >> suez1224@xxxxxxxxx
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>> Hi Jark and Shaoxuan,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks a lot for the summary. I think we are making great
> >>>>>>>>>> progress
> >>>>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>> Below are my thoughts.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> *(1) watermark definition
> >>>>>>>>>>>>>>>>> IMO, it's better to keep it consistent with the rowtime
> >>>>>>>>>> extractors
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> watermark strategies defined in
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
> >>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>> Using built-in functions seems to be too much for most of
> >>> the
> >>>>>>>>>>> common
> >>>>>>>>>>>>>>>>> scenarios.
> >>>>>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
> >>>>>>>>>>>>>>>>> Actually, I think we can put the source/sink type info
> >> into
> >>>>>> the
> >>>>>>>>>>> table
> >>>>>>>>>>>>>>>>> properties, so we can use CREATE TABLE.
> >>>>>>>>>>>>>>>>> (3) View DDL with properties
> >>>>>>>>>>>>>>>>> We can remove the view properties section now for the MVP
> >>> and
> >>>>>>> add
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>> later if needed.
> >>>>>>>>>>>>>>>>> (4) Type Definition
> >>>>>>>>>>>>>>>>> I agree we can put the type length or precision into
> >> future
> >>>>>>>>>>> versions.
> >>>>>>>>>>>>>> As
> >>>>>>>>>>>>>>>>> for the grammar difference, currently, I am using the
> >>> grammar
> >>>>>>> in
> >>>>>>>>>>>>>> Calcite
> >>>>>>>>>>>>>>>>> type DDL, but since we'll extend the parser in Flink, so
> >> we
> >>>>>> can
> >>>>>>>>>>>>>>> definitely
> >>>>>>>>>>>>>>>>> change if needed.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Shuyi
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <
> >> imjark@xxxxxxxxx>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>> Hi Shaoxuan,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag
> >> on
> >>>>>>> create
> >>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> the another major difference.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Summarize the main differences again:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> *(1) watermark definition
> >>>>>>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
> >>>>>>>>>>>>>>>>>> (3) View DDL with properties
> >>>>>>>>>>>>>>>>>> (4) Type Definition
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <
> >>>>>>> wshaoxuan@xxxxxxxxx
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>>>>>>>> Thanks for the summary. Your plan for the 1st round
> >>>>>>>>>>> implementation
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>> looks good to me.
> >>>>>>>>>>>>>>>>>>> Have we reached the agreement on simplifying/unifying
> >>>>>> "create
> >>>>>>>>>>>>>>>>>> [source/sink]
> >>>>>>>>>>>>>>>>>>> table" to "create table"? "Watermark definition" and
> >>>>>> "create
> >>>>>>>>>>> table"
> >>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> major obstacles on the way to merge two design
> >> proposals
> >>>>>>> FMPOV.
> >>>>>>>>>>>>>>> @Shuyi,
> >>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>> would be great if you can spend time and respond to
> >> these
> >>>>>> two
> >>>>>>>>>>> parts
> >>>>>>>>>>>>>>>>>> first.
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>> Shaoxuan
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <
> >>> imjark@xxxxxxxxx>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>> Hi Shuyi,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that
> >> Lin
> >>>>>>> and I
> >>>>>>>>>>>>>>>>> drafted.
> >>>>>>>>>>>>>>>>>>>> This doc covers all the features running in Alibaba.
> >>>>>>>>>>>>>>>>>>>> But some of features might be not needed in the first
> >>>>>>> version
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>>>>>>> DDL.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and
> >>>>>> reach
> >>>>>>>>>>>>>>>>> agreement
> >>>>>>>>>>>>>>>>>>> ASAP
> >>>>>>>>>>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2]
> >> Shuyi
> >>>>>>>>>>> proposed.
> >>>>>>>>>>>>>>>>>>>> And we can discuss on the main differences one by one.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The following is the MVP DDLs should be included in
> >> the
> >>>>>>> first
> >>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>> opinion (feedbacks are welcome):
> >>>>>>>>>>>>>>>>>>>> (1) Table DDL:
> >>>>>>>>>>>>>>>>>>>>          (1.1) Type definition
> >>>>>>>>>>>>>>>>>>>>          (1.2) computed column definition
> >>>>>>>>>>>>>>>>>>>>          (1.3) watermark definition
> >>>>>>>>>>>>>>>>>>>>          (1.4) with properties
> >>>>>>>>>>>>>>>>>>>>          (1.5) table constraint (primary key/unique)
> >>>>>>>>>>>>>>>>>>>>          (1.6) column nullability (nice to have)
> >>>>>>>>>>>>>>>>>>>> (2) View DDL
> >>>>>>>>>>>>>>>>>>>> (3) Function DDL
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The main differences from two DDL docs (sth maybe
> >>> missed,
> >>>>>>>>>>> welcome
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> point
> >>>>>>>>>>>>>>>>>>>> out):
> >>>>>>>>>>>>>>>>>>>> *(1.3) watermark*: this is the main and the most
> >>> important
> >>>>>>>>>>>>>>>>> difference,
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> would be great if @Timo Walther <twalthr@xxxxxxxxxx>
> >>>>>>> @Fabian
> >>>>>>>>>>>>>> Hueske
> >>>>>>>>>>>>>>>>>>>> <fhueske@xxxxxxxxx>  give some feedbacks.
> >>>>>>>>>>>>>>>>>>>>       (1.1) Type definition:
> >>>>>>>>>>>>>>>>>>>>            (a) Should VARCHAR carry a length, e.g.
> >>>>>>> VARCHAR(128)
> >>>>>>>> ?
> >>>>>>>>>>>>>>>>>>>>                 In most cases, the varchar length is
> >> not
> >>>>>> used
> >>>>>>>>>>> because
> >>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> stored as String in Flink. But it can be used to
> >>> optimize
> >>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> future
> >>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>> we know the column is a fixed length VARCHAR.
> >>>>>>>>>>>>>>>>>>>>                 So IMO, we can support VARCHAR with
> >>> length
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>> future,
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> just VARCHAR in this version.
> >>>>>>>>>>>>>>>>>>>>            (b) Should DECIMAL support custom scale and
> >>>>>>>> precision,
> >>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>>>>>>> DECIMAL(12, 5)?
> >>>>>>>>>>>>>>>>>>>>                 If we clearly know the scale and
> >>> precision
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>>>>>>> Decimal,
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> can have some optimization on
> >>>>>> serialization/deserialization.
> >>>>>>>>>>> IMO,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>> support just support DECIMAL in this version,
> >>>>>>>>>>>>>>>>>>>>                 which means DECIMAL(38, 18) as
> default.
> >>> And
> >>>>>>>>>> support
> >>>>>>>>>>>>>>> custom
> >>>>>>>>>>>>>>>>>>> scale
> >>>>>>>>>>>>>>>>>>>> and precision in the future.
> >>>>>>>>>>>>>>>>>>>>       (2) View DDL: Do we need WITH properties in View
> >>> DDL
> >>>>>>>>>>> (proposed
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> doc[2])?
> >>>>>>>>>>>>>>>>>>>> What are the properties on the view used for?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The features could be supported and discussed in the
> >>>>>> future:
> >>>>>>>>>>>>>>>>>>>> (1) period definition on table
> >>>>>>>>>>>>>>>>>>>> (2) Type DDL
> >>>>>>>>>>>>>>>>>>>> (3) Index DDL
> >>>>>>>>>>>>>>>>>>>> (4) Library DDL
> >>>>>>>>>>>>>>>>>>>> (5) Drop statement
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> [1] Flink DDL draft by Lin and Jark:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit#
> >>>>>>>>>>>>>>>>>>>> [2] Flink SQL DDL design by Shuyi:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit#
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang <
> >>>>>>>>>>> wshaoxuan@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>> Sure Shuyu,
> >>>>>>>>>>>>>>>>>>>>> What I hope is that we can reach an agreement on DDL
> >>>>>> gramma
> >>>>>>>>>> as
> >>>>>>>>>>>>>> soon
> >>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>> possible. There are a few differences between your
> >>>>>> proposal
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>> ours.
> >>>>>>>>>>>>>>>>>>>> Once
> >>>>>>>>>>>>>>>>>>>>> Lin and Jark propose our design, we can quickly
> >> discuss
> >>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>>>>> differences, and see how far away towards a unified
> >>>>>> design.
> >>>>>>>>>>>>>>>>>>>>> WRT the external catalog, I think it is an orthogonal
> >>>>>>> topic,
> >>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>>>> it in parallel. I believe @Xuefu, @Bowen are already
> >>>>>>> working
> >>>>>>>>>>> on.
> >>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>>> should/will definitely involve them to review the
> >> final
> >>>>>>>>>> design
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>> implementation. I would suggest that we should give
> >> it
> >>> a
> >>>>>>>>>> higher
> >>>>>>>>>>>>>>>>>>> priority
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>> the DDL implementation, as it is a crucial component
> >>> for
> >>>>>>> the
> >>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>>>> experience of SQL_CLI.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>> Shaoxuan
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen <
> >>>>>>>>>> suez1224@xxxxxxxxx
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Shaoxuan, Jack and Lin. We should
> >>>>>> definitely
> >>>>>>>>>>>>>>>>>>> collaborate
> >>>>>>>>>>>>>>>>>>>>>> here, we have also our own DDL implementation
> >> running
> >>> in
> >>>>>>>>>>>>>>>>> production
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> almost 2 years at Uber. With the joint experience
> >> from
> >>>>>>> both
> >>>>>>>>>>>>>>>>>>> companies,
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> can definitely make the Flink SQL DDL better.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> As @shaoxuan suggest, Jark can come up with a doc
> >> that
> >>>>>>> talks
> >>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> current DDL design in Alibaba, and we can discuss
> >> and
> >>>>>>> merge
> >>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>> one,
> >>>>>>>>>>>>>>>>>>>>>> make it as a FLIP, and plan the tasks for
> >>>>>> implementation.
> >>>>>>>>>>> Also,
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>> take into account the new external catalog effort in
> >>> the
> >>>>>>>>>>> design.
> >>>>>>>>>>>>>>>>>> What
> >>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>>>> you guys think?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Shuyi
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <
> >>>>>> imjark@xxxxxxxxx
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>> Hi Shaoxuan,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I think summarizing it into a google doc is a good
> >>>>>> idea.
> >>>>>>> We
> >>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>> prepare
> >>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> in the next few days.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Shaoxuan Wang <wshaoxuan@xxxxxxxxx> 于2018年11月28日周三
> >>>>>>>>>> 下午9:17写道:
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Lin and Jark,
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for sharing those details. Can you please
> >>>>>>> consider
> >>>>>>>>>>>>>>>>>>>> summarizing
> >>>>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>> DDL design into a google doc.
> >>>>>>>>>>>>>>>>>>>>>>>> We can still continue the discussions on Shuyi's
> >>>>>>> proposal.
> >>>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>>>> having a
> >>>>>>>>>>>>>>>>>>>>>>>> separate google doc will be easy for the DEV to
> >>>>>>>>>>>>>>>>>>>>>>> understand/comment/discuss
> >>>>>>>>>>>>>>>>>>>>>>>> on your proposed DDL implementation.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>> Shaoxuan
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <
> >>>>>>> imjark@xxxxxxxxx
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Shuyi,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for bringing up this discussion and the
> >>>>>> awesome
> >>>>>>>>>>>>>>>>> work!
> >>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>> left
> >>>>>>>>>>>>>>>>>>>>>>>>> some comments in the doc.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I want to share something more about the
> >> watermark
> >>>>>>>>>>>>>>>>> definition
> >>>>>>>>>>>>>>>>>>>>> learned
> >>>>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>> Alibaba.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>         1.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>         Table should be able to accept multiple
> >>>>>> watermark
> >>>>>>>>>>>>>>>>>>> definition.
> >>>>>>>>>>>>>>>>>>>>>>>>>         Because a table may have more than one
> >>> rowtime
> >>>>>>>>>> field.
> >>>>>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>>>>>>>         rowtime field is from existing field but
> >>>>>> missing
> >>>>>>> in
> >>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>>>>>>         is the ingestion timestamp in Kafka but
> >> not
> >>>>>> very
> >>>>>>>>>>>>>>>>> accurate.
> >>>>>>>>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>>>>>>>>>>         user may define two rowtime fields with
> >>>>>>> watermarks
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> Table
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>> choose
> >>>>>>>>>>>>>>>>>>>>>>>>>         one in different situation.
> >>>>>>>>>>>>>>>>>>>>>>>>>         2.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>         Watermark stragety always work with
> >> rowtime
> >>>>>> field
> >>>>>>>>>>>>>>>>>> together.
> >>>>>>>>>>>>>>>>>>>>>>>>> Based on the two points metioned above, I think
> >> we
> >>>>>>> should
> >>>>>>>>>>>>>>>>>>> combine
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> watermark strategy and rowtime field selection
> >>> (i.e.
> >>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>>>>> field
> >>>>>>>>>>>>>>>>>>>>>>>>> used to generate watermark) in one clause, so
> >> that
> >>> we
> >>>>>>> can
> >>>>>>>>>>>>>>>>>>> define
> >>>>>>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>>>>> watermarks in one Table.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Here I will share the watermark syntax used in
> >>>>>> Alibaba
> >>>>>>>>>>>>>>>>>> (simply
> >>>>>>>>>>>>>>>>>>>>>>> modified):
> >>>>>>>>>>>>>>>>>>>>>>>>> watermarkDefinition:
> >>>>>>>>>>>>>>>>>>>>>>>>> WATERMARK [watermarkName] FOR <rowtime_field> AS
> >>>>>>>>>>>>>>>>> wm_strategy
> >>>>>>>>>>>>>>>>>>>>>>>>> wm_strategy:
> >>>>>>>>>>>>>>>>>>>>>>>>>        BOUNDED WITH OFFSET 'string' timeUnit
> >>>>>>>>>>>>>>>>>>>>>>>>> |
> >>>>>>>>>>>>>>>>>>>>>>>>>        ASCENDING
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> The “WATERMARK” keyword starts a watermark
> >>>>>> definition.
> >>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>> “FOR”
> >>>>>>>>>>>>>>>>>>>>>>> keyword
> >>>>>>>>>>>>>>>>>>>>>>>>> defines which existing field used to generate
> >>>>>>> watermark,
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> field
> >>>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>> already exist in the schema (we can use
> >>>>>> computed-column
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> derive
> >>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>> other fields). The “AS” keyword defines watermark
> >>>>>>>>>> strategy,
> >>>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>> BOUNDED
> >>>>>>>>>>>>>>>>>>>>>>>>> WITH OFFSET (covers almost all the requirements)
> >>> and
> >>>>>>>>>>>>>>>>>> ASCENDING.
> >>>>>>>>>>>>>>>>>>>>>>>>> When the expected rowtime field does not exist in
> >>> the
> >>>>>>>>>>>>>>>>> schema,
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>> computed-column syntax to derive it from other
> >>>>>> existing
> >>>>>>>>>>>>>>>>>> fields
> >>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>> built-in functions or user defined functions. So
> >>> the
> >>>>>>>>>>>>>>>>>>>>>> rowtime/watermark
> >>>>>>>>>>>>>>>>>>>>>>>>> definition doesn’t need to care about
> >>> “field-change”
> >>>>>>>>>>>>>>>>> strategy
> >>>>>>>>>>>>>>>>>>>>>>>>> (replace/add/from-field). And the proctime field
> >>>>>>>>>> definition
> >>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> defined using computed-column. Such as pt as
> >>>>>> PROCTIME()
> >>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> defines a
> >>>>>>>>>>>>>>>>>>>>>>>>> proctime field named “pt” in the schema.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to working with you guys!
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>> Jark Wu
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Lin Li <lincoln.86xy@xxxxxxxxx> 于2018年11月28日周三
> >>>>>>> 下午6:33写道:
> >>>>>>>>>>>>>>>>>>>>>>>>>> @Shuyi
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal!  We have a simple DDL
> >>>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>>>>>>> (extends
> >>>>>>>>>>>>>>>>>>>>>>>>>> Calcite's parser) which been running for almost
> >>> two
> >>>>>>>>>> years
> >>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> production
> >>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>> works well.
> >>>>>>>>>>>>>>>>>>>>>>>>>> I think the most valued things we'd learned is
> >>>>>> keeping
> >>>>>>>>>>>>>>>>>>>> simplicity
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>> standard compliance.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Here's the approximate grammar, FYI
> >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE TABLE
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE TABLE tableName(
> >>>>>>>>>>>>>>>>>>>>>>>>>>              columnDefinition [,
> >> columnDefinition]*
> >>>>>>>>>>>>>>>>>>>>>>>>>>              [ computedColumnDefinition [,
> >>>>>>>>>>>>>>>>>>>> computedColumnDefinition]*
> >>>>>>>>>>>>>>>>>>>>> ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>              [ tableConstraint [,
> >>> tableConstraint]* ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>              [ tableIndex [, tableIndex]* ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>          [ PERIOD FOR SYSTEM_TIME ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>              [ WATERMARK watermarkName FOR
> >>>>>> rowTimeColumn
> >>>>>>>> AS
> >>>>>>>>>>>>>>>>>>>>>>>>>> withOffset(rowTimeColumn, offset) ]     ) [
> >> WITH (
> >>>>>>>>>>>>>>>>>>> tableOption
> >>>>>>>>>>>>>>>>>>>> [
> >>>>>>>>>>>>>>>>>>>>> ,
> >>>>>>>>>>>>>>>>>>>>>>>>>> tableOption]* ) ] [ ; ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> columnDefinition ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              columnName dataType [ NOT NULL ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> dataType  ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              {
> >>>>>>>>>>>>>>>>>>>>>>>>>>                [ VARCHAR ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ BOOLEAN ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ TINYINT ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ SMALLINT ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ INT ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ BIGINT ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ FLOAT ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ DECIMAL ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ DOUBLE ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ DATE ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ TIME ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ TIMESTAMP ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>                | [ VARBINARY ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>              }
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> computedColumnDefinition ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              columnName AS
> >> computedColumnExpression
> >>>>>>>>>>>>>>>>>>>>>>>>>> tableConstraint ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>          { PRIMARY KEY | UNIQUE }
> >>>>>>>>>>>>>>>>>>>>>>>>>>              (columnName [, columnName]* )
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> tableIndex ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              [ UNIQUE ] INDEX indexName
> >>>>>>>>>>>>>>>>>>>>>>>>>>               (columnName [, columnName]* )
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> rowTimeColumn ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              columnName
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> tableOption ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              property=value
> >>>>>>>>>>>>>>>>>>>>>>>>>>              offset ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              positive integer (unit: ms)
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE VIEW
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE VIEW viewName
> >>>>>>>>>>>>>>>>>>>>>>>>>>        [
> >>>>>>>>>>>>>>>>>>>>>>>>>>              ( columnName [, columnName]* )
> >>>>>>>>>>>>>>>>>>>>>>>>>>        ]
> >>>>>>>>>>>>>>>>>>>>>>>>>>              AS queryStatement;
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> CREATE FUNCTION
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>       CREATE FUNCTION functionName
> >>>>>>>>>>>>>>>>>>>>>>>>>>        AS 'className';
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>       className ::=
> >>>>>>>>>>>>>>>>>>>>>>>>>>              fully qualified name
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen <suez1224@xxxxxxxxx> 于2018年11月28日周三
> >>>>>>>>>> 上午3:28写道:
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Timo and Xuefu. Yes, I think we
> >> can
> >>>>>>>>>>>>>>>>>> finalize
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>>>>>>>> doc
> >>>>>>>>>>>>>>>>>>>>>>>>>>> first and start implementation w/o the unified
> >>>>>>>>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>>> ready
> >>>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>>>> skipping some featue.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu, I like the idea of making Flink specific
> >>>>>>>>>>>>>>>>>> properties
> >>>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>>>> generic
> >>>>>>>>>>>>>>>>>>>>>>>>>>> key-value pairs, so that it will make
> >> integration
> >>>>>>> with
> >>>>>>>>>>>>>>>>>> Hive
> >>>>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>>> (or
> >>>>>>>>>>>>>>>>>>>>>>>>>> others,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> e.g. Beam DDL) easier.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I'll run a final pass over the design doc and
> >>>>>>> finalize
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> next few days. And we can start creating tasks
> >>> and
> >>>>>>>>>>>>>>>>>>>> collaborate
> >>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> implementation. Thanks a lot for all the
> >> comments
> >>>>>> and
> >>>>>>>>>>>>>>>>>>> inputs.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers!
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
> >>>>>>>>>>>>>>>>>>>>>>>> xuefu.z@xxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah! I agree with Timo that DDL can actually
> >>>>>>> proceed
> >>>>>>>>>>>>>>>>>> w/o
> >>>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>>>>> blocked
> >>>>>>>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> connector API. We can leave the unknown out
> >>> while
> >>>>>>>>>>>>>>>>>>> defining
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> basic
> >>>>>>>>>>>>>>>>>>>>>>>>>>> syntax.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> @Shuyi
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> As commented in the doc, I think we can
> >> probably
> >>>>>>>>>>>>>>>>> stick
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>> simple
> >>>>>>>>>>>>>>>>>>>>>>>>>> syntax
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> with general properties, without extending the
> >>>>>>> syntax
> >>>>>>>>>>>>>>>>>> too
> >>>>>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> mimics the descriptor API.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Part of our effort on Flink-Hive integration
> >> is
> >>>>>> also
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>>>>>> syntax
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> compatible with Hive's. The one in the current
> >>>>>>>>>>>>>>>>> proposal
> >>>>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>>>>>> making
> >>>>>>>>>>>>>>>>>>>>>>>>>> our
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> effort more challenging.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> We can help and collaborate. At this moment, I
> >>>>>> think
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>> finalize
> >>>>>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the proposal and then we can divide the tasks
> >>> for
> >>>>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>>>>>>>> collaboration.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Please let me know if there are  any questions
> >>> or
> >>>>>>>>>>>>>>>>>>>>> suggestions.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> ------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sender:Timo Walther <twalthr@xxxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sent at:2018 Nov 27 (Tue) 16:21
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Recipient:dev <dev@xxxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for offering your help here, Xuefu. It
> >>>>>> would
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> great
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> move
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> these efforts forward. I agree that the DDL is
> >>>>>>>>>>>>>>>>> somehow
> >>>>>>>>>>>>>>>>>>>>> releated
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> unified connector API design but we can also
> >>> start
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> basic
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> functionality now and evolve the DDL during
> >> this
> >>>>>>>>>>>>>>>>>> release
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>>>>>>>>>> releases.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, we could identify the MVP DDL
> >>> syntax
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> skips
> >>>>>>>>>>>>>>>>>>>>>>>> defining
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> key constraints and maybe even time
> >> attributes.
> >>>>>> This
> >>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> for batch usecases, ETL, and materializing SQL
> >>>>>>>>>>>>>>>>> queries
> >>>>>>>>>>>>>>>>>>> (no
> >>>>>>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> operations like windows).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The unified connector API is high on our
> >>> priority
> >>>>>>>>>>>>>>>>> list
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> 1.8
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> release. I will try to update the document
> >> until
> >>>>>> mid
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>>>>>> week.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 27.11.18 um 08:08 schrieb Shuyi Chen:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot, Xuefu. I was busy for some
> >> other
> >>>>>>>>>>>>>>>>> stuff
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> last 2
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> weeks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> but we are definitely interested in moving
> >> this
> >>>>>>>>>>>>>>>>>>> forward.
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>> once
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> unified connector API design [1] is done, we
> >>> can
> >>>>>>>>>>>>>>>>>>> finalize
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> well and start creating concrete subtasks to
> >>>>>>>>>>>>>>>>>>> collaborate
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation with the community.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu
> >> <
> >>>>>>>>>>>>>>>>>>>>>>>>>> xuefu.z@xxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Shuyi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if you folks still have the
> >>>>>>>>>>>>>>>>> bandwidth
> >>>>>>>>>>>>>>>>>>>>> working
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We have some dedicated resource and like to
> >>> move
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> forward.
> >>>>>>>>>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> collaborate.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xuefu
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>> ------------------------------------------------------------------
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 发件人:wenlong.lwl<wenlong88.lwl@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 日 期:2018年11月05日 11:15:35
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 收件人:<dev@xxxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 主 题:Re: [DISCUSS] Flink SQL DDL Design
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, Shuyi, thanks for the proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two concerns about the table ddl:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. how about remove the source/sink mark
> >> from
> >>>>>> the
> >>>>>>>>>>>>>>>>>> ddl,
> >>>>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> necessary, the framework determine the table
> >>>>>>>>>>>>>>>>>> referred
> >>>>>>>>>>>>>>>>>>>> is a
> >>>>>>>>>>>>>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>>>>>>>>> or a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> sink
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> according to the context of the query using
> >>> the
> >>>>>>>>>>>>>>>>>> table.
> >>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> convenient for use defining a table which
> >> can
> >>> be
> >>>>>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>> sink,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and more convenient for catalog to
> >> persistent
> >>>>>> and
> >>>>>>>>>>>>>>>>>>> manage
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> meta
> >>>>>>>>>>>>>>>>>>>>>>>>>>> infos.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. how about just keeping one pure string
> >> map
> >>> as
> >>>>>>>>>>>>>>>>>>>>> parameters
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>> table,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> create tabe Kafka10SourceTable (
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> intField INTEGER,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stringField VARCHAR(128),
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> longField BIGINT,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rowTimeField TIMESTAMP
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) with (
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.type = ’kafka’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.property-version = ’1’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.version = ’0.10’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.topic =
> >>> ‘test-kafka-topic’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.startup-mode =
> >>>>>>>>>>>>>>>>> ‘latest-offset’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties.specific-offset =
> >>> ‘offset’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> format.type = 'json'
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> format.prperties.version=’1’,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> format.derive-schema = 'true'
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> );
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Because:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. in TableFactory, what user use is a
> >> string
> >>>>>> map
> >>>>>>>>>>>>>>>>>>>>>> properties,
> >>>>>>>>>>>>>>>>>>>>>>>>>> defining
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters by string-map can be the closest
> >>> way
> >>>>>> to
> >>>>>>>>>>>>>>>>>>>> mapping
> >>>>>>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. The table descriptor can be extended by
> >>> user,
> >>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and Json, it means that the parameter keys
> >> in
> >>>>>>>>>>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>> format
> >>>>>>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> different in different implementation, we
> >> can
> >>>>>> not
> >>>>>>>>>>>>>>>>>>>> restrict
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specified set, so we need a map in connector
> >>>>>> scope
> >>>>>>>>>>>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>>>>> map
> >>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.properties scope. why not just
> >> give
> >>>>>>>>>>>>>>>>> user a
> >>>>>>>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>>>>> map,
> >>>>>>>>>>>>>>>>>>>>>>>>> let
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> put parameters in a format they like, which
> >> is
> >>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> simplest
> >>>>>>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implement DDL parser.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. whether we can define a format clause or
> >>> not,
> >>>>>>>>>>>>>>>>>>> depends
> >>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of the connector, using
> >>> different
> >>>>>>>>>>>>>>>>>>> clause
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misunderstanding that we can combine the
> >>>>>>>>>>>>>>>>> connectors
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> arbitrary
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> formats,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which may not work actually.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 4 Nov 2018 at 18:25, Dominik
> >> Wosiński
> >>> <
> >>>>>>>>>>>>>>>>>>>>>>> wossyn@xxxxxxxxx
> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +1, Thanks for the proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I guess this is a long-awaited change. This
> >>> can
> >>>>>>>>>>>>>>>>>>> vastly
> >>>>>>>>>>>>>>>>>>>>>>> increase
> >>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functionalities of the SQL Client as it
> >> will
> >>> be
> >>>>>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>>>> complex
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extensions like for example those provided
> >> by
> >>>>>>>>>>>>>>>>>> Apache
> >>>>>>>>>>>>>>>>>>>>>>> Bahir[1].
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dom.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/bahir-flink
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sob., 3 lis 2018 o 17:17 Rong Rong <
> >>>>>>>>>>>>>>>>>>>> walterddr@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>>> napisał(a):
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +1. Thanks for putting the proposal
> >> together
> >>>>>>>>>>>>>>>>>> Shuyi.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DDL has been brought up in a couple of
> >> times
> >>>>>>>>>>>>>>>>>>>> previously
> >>>>>>>>>>>>>>>>>>>>>>> [1,2].
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Utilizing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DDL will definitely be a great extension
> >> to
> >>>>>> the
> >>>>>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> systematically support some of the
> >>> previously
> >>>>>>>>>>>>>>>>>>> brought
> >>>>>>>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>>>>>>>> features
> >>>>>>>>>>>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]. And it will also be beneficial to see
> >>> the
> >>>>>>>>>>>>>>>>>>>> document
> >>>>>>>>>>>>>>>>>>>>>>>> closely
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> aligned
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the previous discussion for unified
> >> SQL
> >>>>>>>>>>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>>>>>> [4].
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left a few comments on the doc.
> >>> Looking
> >>>>>>>>>>>>>>>>>>> forward
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> alignment
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the other couple of efforts and
> >>>>>>>>>>>>>>>>> contributing
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> them!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Rong
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]
> >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-8003
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [4]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3C6676cb66-6f31-23e1-eff5-2e9c19f88483@xxxxxxxxxx%3E
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <
> >>>>>>>>>>>>>>>>>>>>>>> bowenli86@xxxxxxxxx
> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Shuyi!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I left some comments there. I think the
> >>>>>> design
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink-Hive
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integration/External catalog enhancements
> >>>>>> will
> >>>>>>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>>>>>>>>> closely
> >>>>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>> each
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other. Hope we are well aligned on the
> >>>>>>>>>>>>>>>>> directions
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>>>>>>>>>>> designs,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> look forward to working with you guys on
> >>>>>> both!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bowen
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 1, 2018 at 10:57 PM Shuyi
> >> Chen
> >>> <
> >>>>>>>>>>>>>>>>>>>>>>>> suez1224@xxxxxxxxx
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SQL DDL support has been a long-time ask
> >>>>>> from
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> community.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SQL support only DML (e.g. SELECT and
> >>> INSERT
> >>>>>>>>>>>>>>>>>>>>>> statements).
> >>>>>>>>>>>>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> form, Flink SQL users still need to
> >>>>>>>>>>>>>>>>>> define/create
> >>>>>>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>>>>> sources
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sinks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> programmatically in Java/Scala. Also, in
> >>> SQL
> >>>>>>>>>>>>>>>>>>> Client,
> >>>>>>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>>>>>>> DDL
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the current implementation does not
> >> allow
> >>>>>>>>>>>>>>>>>>> dynamical
> >>>>>>>>>>>>>>>>>>>>>>> creation
> >>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or functions with SQL, this adds
> >> friction
> >>>>>> for
> >>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>> adoption.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I drafted a design doc [1] with a few
> >>> other
> >>>>>>>>>>>>>>>>>>>> community
> >>>>>>>>>>>>>>>>>>>>>>>> members
> >>>>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the design and implementation for adding
> >>> DDL
> >>>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> Flink.
> >>>>>>>>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design considers DDL for table, view,
> >>> type,
> >>>>>>>>>>>>>>>>>>> library
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>> function.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be great to get feedback on the design
> >>> from
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> community,
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> align
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> latest effort in unified SQL connector
> >> API
> >>>>>> [2]
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>>>>>> Hive
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> integration
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3].
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly appreciated.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Shuyi Chen
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will
> >>>>>>>>>>>>>>>>> somehow
> >>>>>>>>>>>>>>>>>>>>> connect
> >>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> future."
> >>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will
> >> somehow
> >>>>>>>>>>>>>>>>> connect
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>> future."
> >>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow
> >>> connect
> >>>>>>> in
> >>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>> future."
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> "So you have to trust that the dots will somehow connect
> >> in
> >>>>>>> your
> >>>>>>>>>>>>>>> future."
> >>>>>>>>>>>>>>>
> >>>>>>> --
> >>>>>>> "So you have to trust that the dots will somehow connect in your
> >>>> future."
> >>>>
> >>> --
> >>> "So you have to trust that the dots will somehow connect in your
> future."
> >>>
>
>