osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi all,

I have been following this thread and it looks interesting. Can I please be
of any help, please let me know.

Thanks,
Teja

On Wed, Dec 12, 2018, 4:31 AM Kurt Young <ykt836@xxxxxxxxx wrote:

> Sounds great, thanks for the effort, Shuyi.
>
> Best,
> Kurt
>
>
> On Wed, Dec 12, 2018 at 5:14 PM Shuyi Chen <suez1224@xxxxxxxxx> wrote:
>
> > Hi all,
> >
> > I summarize the MVP based on the features that we agreed upon. For table
> > update mode and custom watermark strategy and ts extractor, I found there
> > are some discussions, so I decided to leave them out for the MVP.
> > For row/map/array data type, I think we can add it as well if everyone
> > agrees.
> >
> >
> > 4) Event-Time Attributes and Watermarks
> > Cited from SQL Server 2017 document (
> >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2017
> > ),
> > "A
> > computed column is a virtual column that is not physically stored in the
> > table, unless the column is marked PERSISTED. A computed column
> expression
> > can use data from other columns to calculate a value for the column to
> > which it belongs. " I think we can also use introduce the PERSISTED
> keyword
> > for computed column to indicate that the field can be stored back to the
> > table, i.e. ts AS SYSTEMROWTIME() PERSISTED.
> >
> > 3) SOURCE / SINK / BOTH
> > GRANT/INVOKE sounds like a more standard option than adding a property to
> > CREATE TABLE to manage the ACL/permission. The ACL can be stored
> somewhere
> > in a database, and allow/disallow access to a dynamic table depending on
> > whether it's a "INSERT INTO" or "SELECT".
> >
> > I can volunteer to put the discussion as a FLIP.  I can try to summarize
> > the current discussion, and share edit permission with you to collaborate
> > on the documents. After we finalized the doc, we can publish it as a
> FLIP.
> > What do you think?
> >
> > Shuyi
> >
> >
> >
> > On Tue, Dec 11, 2018 at 9:13 AM Timo Walther <twalthr@xxxxxxxxxx> wrote:
> >
> > > Hi all,
> > >
> > > thanks for summarizing the discussion @Shuyi. I think we need to
> include
> > > the "table update mode" problem as it might not be changed easily in
> the
> > > future. Regarding "support row/map/array data type", I don't see a
> > > problem why we should not support them now as the data types are
> already
> > > included in the runtime. The "support custom timestamp extractor" is
> > > solved by the computed columns approach. The "custom watermark
> strategy"
> > > can be added by supplying a class name as paramter in my opinion.
> > >
> > > Regarding the comments of Lin and Jark:
> > >
> > > @Lin: Instantiating a TableSource/Sink should not cost much, but we
> > > should not mix catalog discussion and DDL at this point.
> > >
> > > 4) Event-Time Attributes and Watermarks
> > > 4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will
> > > violate the rule": there is no explicit rule of doing so. Computed
> > > column are also not standard compliant, if we can use information that
> > > is encoded in constraints we should use it. Adding more and more
> > > top-level properties makes the interaction with connectors more
> > > difficult. An additional HEADER keyword sounds too connector-specific
> > > and also not SQL compliant to me.
> > >
> > > 3) SOURCE / SINK / BOTH
> > > GRANT/INVOKE are mutating an existing table, right? In my opinion,
> > > independent of SQL databases but focusing on Flink user requirements, a
> > > CREATE TABLE statement should be an immutable definition of a
> connection
> > > to an external system.
> > >
> > > 7) Table Update Mode
> > > As far as I can see, the only thing missing for enabling all table
> modes
> > > is the declaration of a change flag. We could introduce a new keyword
> > > here similar to WATERMARK:
> > >
> > > CREATE TABLE output_kafka_t1(
> > >    id bigint,
> > >    msg varchar,
> > >    CHANGE_FLAG FOR isRetraction
> > > ) WITH (
> > >    type=kafka
> > >    ,...
> > > );
> > >
> > > CREATE TABLE output_kafka_t1(
> > >    CHANGE_FLAG FOR isUpsert
> > >    id bigint,
> > >    msg varchar,
> > >    PRIMARY_KEY(id)
> > > ) WITH (
> > >    type=kafka
> > >    ,...
> > > );
> > >
> > > What do you think?
> > >
> > > @Jark: We should definitely stage the discussions and mention the
> > > opinions and advantages/disadvantages that have been proposed already
> in
> > > the FLIP.
> > >
> > > Regards,
> > > Timo
> > >
> > > Am 10.12.18 um 08:10 schrieb Jark Wu:
> > > > Hi all,
> > > >
> > > > It's great to see we have an agreement on MVP.
> > > >
> > > > 4.b) Ingesting and writing timestamps to systems.
> > > > I would treat the field as a physical column not a virtual column. If
> > we
> > > > treat it as computed column, it will be confused that the behavior is
> > > > different when it is a source or sink.
> > > > When it is a physical column, the behavior could be unified. Then the
> > > > problem is how to mapping from the field to kafka message timestamp?
> > > > One is Lin proposed above and is also used in KSQL[1]. Another idea
> is
> > > > introducing a HEADER column which strictly map by name to the fields
> in
> > > > message header.
> > > > For example,
> > > >
> > > > CREATE TABLE output_kafka_t1(
> > > >    id bigint,
> > > >    ts timestamp HEADER,
> > > >    msg varchar
> > > > ) WITH (
> > > >    type=kafka
> > > >    ,...
> > > > );
> > > >
> > > > This is used in Alibaba but not included in the DDL draft. It will
> > > further
> > > > extend the SQL syntax, which is we should be cautious about. What do
> > you
> > > > think about this two solutions?
> > > >
> > > > 4.d) Custom watermark strategies:
> > > > @Timo,  I don't have a strong opinion on this.
> > > >
> > > > 3) SOURCE/SINK/BOTH
> > > > Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and
> > > > standard way to manage the permission, which is also adopted by
> HIVE[2]
> > > and
> > > > many databases.
> > > >
> > > > [1]:
> > https://docs.confluent.io/current/ksql/docs/tutorials/examples.html
> > > > [2]:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges
> > > >
> > > > @Timo, it's great if someone can conclude the discussion and
> summarize
> > > into
> > > > a FLIP.
> > > > @Shuyi, Thanks a lot for putting it all together. The google doc
> looks
> > > good
> > > > to me, and I left some minor comments there.
> > > >
> > > > Regarding to the FLIP, I have some suggestions:
> > > > 1. The FLIP can contain MILESTONE1 and FUTURE WORKS.
> > > > 2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax.
> > > > 3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and
> ADOPTED.
> > We
> > > > can derive MILESTONE2 from this easily when it is ready.
> > > >
> > > > I summarized the Future Works based on Shuyi's work:
> > > >
> > > > Adopted: (Should detailed described here...)
> > > > 1. support data type nullability and precision.
> > > > 2. comment on table and columns.
> > > >
> > > > Under Discussion: (Should briefly describe some options...)
> > > > 1. Ingesting and writing timestamps to systems.
> > > > 2. support custom watermark strategy.
> > > > 3. support table update mode
> > > > 4. support row/map/array data type
> > > > 5. support schema derivation
> > > > 6. support system versioned temporal table
> > > > 7. support table index
> > > >
> > > > We can continue the further discussion here, also can separate to an
> > > other
> > > > DISCUSS topic if it is a sophisticated problem such as Table Update
> > Mode,
> > > > Temporal Table.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Mon, 10 Dec 2018 at 11:54, Lin Li <lincoln.86xy@xxxxxxxxx> wrote:
> > > >
> > > >> hi all,
> > > >> Thanks for your valuable input!
> > > >>
> > > >> 4) Event-Time Attributes and Watermarks:
> > > >> 4.b) @Fabian As you mentioned using a computed columns `ts AS
> > > >> SYSTEMROWTIME()`
> > > >> for writing out to kafka table sink will violate the rule that
> > computed
> > > >> fields are not emitted.
> > > >> Since the timestamp column in kafka's header area is a specific
> > > >> materialization protocol,
> > > >> why don't we treat it as an connector property? For an example:
> > > >> ```
> > > >> CREATE TABLE output_kafka_t1(
> > > >>    id bigint,
> > > >>    ts timestamp,
> > > >>    msg varchar
> > > >> ) WITH (
> > > >>    type=kafka,
> > > >>    header.timestamp=ts
> > > >>    ,...
> > > >> );
> > > >> ```
> > > >>
> > > >> 4d) For custom watermark strategies
> > > >> @Fabian Agree with you that opening another topic about this feature
> > > later.
> > > >>
> > > >> 3) SOURCE / SINK / BOTH
> > > >> I think the permissions and availabilities are two separately
> things,
> > > >> permissions
> > > >> can be managed well by using GRANT/INVOKE(you can call it DCL)
> > solutions
> > > >> which
> > > >> commonly used in different DBs. The permission part can be an new
> > topic
> > > for
> > > >> later discussion, what do you think?
> > > >>
> > > >> For the availabilities, @Fabian @Timo  I've another question,
> > > >> does instantiate a TableSource/Sink cost much or has some other
> > > downsides?
> > > >> IMO, create a new source/sink object via the construct seems not
> > costly.
> > > >> When receiving a DDL we should associate it with the catalog object
> > > >> (reusing an existence or create a new one).
> > > >> Am I lost something important?
> > > >>
> > > >> 5. Schema declaration:
> > > >> @Timo  yes, your concern about the user convenience is very
> important.
> > > But
> > > >> I haven't seen a clear way to solve this so far.
> > > >> Do we put it later and wait for more inputs from the community?
> > > >>
> > > >> Shuyi Chen <suez1224@xxxxxxxxx> 于2018年12月8日周六 下午4:27写道:
> > > >>
> > > >>> Hi all,
> > > >>>
> > > >>> Thanks a lot for the great discussion. I think we can continue the
> > > >>> discussion here while carving out a MVP so that the community can
> > start
> > > >>> working on. Based on the discussion so far, I try to summarize what
> > we
> > > >> will
> > > >>> do for the MVP:
> > > >>>
> > > >>> MVP
> > > >>>
> > > >>>     1. support CREATE TABLE
> > > >>>     2. support exisiting data type in Flink SQL, ignore nullability
> > and
> > > >>>     precision
> > > >>>     3. support table comments and column comments
> > > >>>     4. support table constraint PRIMARY KEY and UNIQUE
> > > >>>     5. support table properties using key-value pairs
> > > >>>     6. support partitioned by
> > > >>>     7. support computed column
> > > >>>     8. support from-field and from-source timestamp extractors
> > > >>>     9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE
> > > watermark
> > > >>>     strategies.
> > > >>>     10. support a table property to allow explicit enforcement of
> > > >>>     read/write(source/sink) permission of a table
> > > >>>
> > > >>> I try to put up the DDL grammar (
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing
> > > >>> )
> > > >>> based on the MVP features above and the previous design docs.
> Please
> > > >> take a
> > > >>> look and comment on it.
> > > >>>
> > > >>>
> > > >>> Also, I summarize the future Improvement on CREATE TABLE as the
> > > >> followings:
> > > >>>     1. support table update mode
> > > >>>     2. support data type nullability and precision
> > > >>>     3. support row/map/array data type
> > > >>>     4. support custom timestamp extractor and watermark strategy
> > > >>>     5. support schema derivation
> > > >>>     6. support system versioned temporal table
> > > >>>     7. support table index
> > > >>>
> > > >>> I suggest we first agree on the MVP feature list and the MVP
> grammar.
> > > And
> > > >>> then we can either continue the discussion of the future
> improvements
> > > >> here,
> > > >>> or create separate JIRAs for each item and discuss further in the
> > JIRA.
> > > >>> What do you guys think?
> > > >>>
> > > >>> Shuyi
> > > >>>
> > > >>> On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twalthr@xxxxxxxxxx>
> > > wrote:
> > > >>>
> > > >>>> Hi all,
> > > >>>>
> > > >>>> I think we are making good progress. Thanks for all the feedback
> so
> > > >> far.
> > > >>>> 3. Sources/Sinks:
> > > >>>> It seems that I can not find supporters for explicit SOURCE/SINK
> > > >>>> declaration so I'm fine with not using those keywords.
> > > >>>> @Fabian: Maybe we don't haven have to change the TableFactory
> > > interface
> > > >>>> but just provide some helper functions in the TableFactoryService.
> > > This
> > > >>>> would solve the availability problem, but the permission problem
> > would
> > > >>>> still not be solved. If you are fine with it, we could introduce a
> > > >>>> property instead?
> > > >>>>
> > > >>>> 5. Schema declaration:
> > > >>>> @Lin: We should find an agreement on this as it requires changes
> to
> > > the
> > > >>>> TableFactory interface. We should minimize changes to this
> interface
> > > >>>> because it is user-facing. Especially, if format schema and table
> > > >> schema
> > > >>>> differ, the need for such a functionality is very important. Our
> > goal
> > > >> is
> > > >>>> to connect to existing infrastructure. For example, if we are
> using
> > > >> Avro
> > > >>>> and the existing Avro format has enums but Flink SQL does not
> > support
> > > >>>> enums, it would be helpful to let the Avro format derive a table
> > > >> schema.
> > > >>>> Otherwise your need to declare both schemas which leads to CREATE
> > > TABLE
> > > >>>> statements of 400 lines+.
> > > >>>> I think the mentioned query:
> > > >>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > > >>>> format.schema-file = "/my/avrofile.avsc")
> > > >>>> is fine and should only be valid if the schema contains no
> > > non-computed
> > > >>>> columns.
> > > >>>>
> > > >>>> 7. Table Update Mode:
> > > >>>> After thinking about it again, I agree. The mode of the sinks can
> be
> > > >>>> derived from the query and the existence of a PRIMARY KEY
> > declaration.
> > > >>>> But Fabian raised a very good point. How do we deal with sources?
> > > Shall
> > > >>>> we introduce a new keywords similar to WATERMARKS such that a
> > > >>>> upsert/retract flag is not part of the visible schema?
> > > >>>>
> > > >>>> 4a. How to mark a field as attribute?
> > > >>>> @Jark: Thanks for the explanation of the WATERMARK clause
> semantics.
> > > >>>> This is a nice way of marking existing fields. This sounds good to
> > me.
> > > >>>>
> > > >>>> 4c) WATERMARK as constraint
> > > >>>> I'm fine with leaving the WATERMARK clause in the schema
> definition.
> > > >>>>
> > > >>>> 4d) Custom watermark strategies:
> > > >>>> I would already think about custom watermark strategies as the
> > current
> > > >>>> descriptor design already supports this. ScalarFunction's don't
> work
> > > as
> > > >>>> a PeriodicWatermarkAssigner has different semantics. Why not
> simply
> > > >>>> entering the a full class name here as it is done in the current
> > > >> design?
> > > >>>> 4.b) Ingesting and writing timestamps to systems (like Kafka)
> > > >>>> @Fabian: Yes, your suggestion sounds good to me. This behavior
> would
> > > be
> > > >>>> similar to our current `timestamps: from-source` design.
> > > >>>>
> > > >>>> Once our discussion has found a conclusion, I would like to
> > volunteer
> > > >>>> and summarize the outcome of this mailing thread. It nicely aligns
> > > with
> > > >>>> the update work on the connector improvements document (that I
> > wanted
> > > >> to
> > > >>>> do anyway) and the ongoing external catalog discussion.
> > Furthermore, I
> > > >>>> would also want to propose how to change existing interfaces by
> > > keeping
> > > >>>> the DDL, connector improvements, and external catalog support in
> > mind.
> > > >>>> Would that be ok for you?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Timo
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Am 07.12.18 um 14:48 schrieb Fabian Hueske:
> > > >>>>> Hi all,
> > > >>>>>
> > > >>>>> Thanks for the discussion.
> > > >>>>> I'd like to share my point of view as well.
> > > >>>>>
> > > >>>>> 4) Event-Time Attributes and Watermarks:
> > > >>>>> 4.a) I agree with Lin and Jark's proposal. Declaring a watermark
> on
> > > >> an
> > > >>>>> attribute declares it as an event-time attribute.
> > > >>>>> 4.b) Ingesting and writing timestamps to systems (like Kafka). We
> > > >> could
> > > >>>> use
> > > >>>>> a special function like (ts AS SYSTEMROWTIME()). This function
> will
> > > >>>>> indicate that we read the timestamp directly from the system (and
> > not
> > > >>> the
> > > >>>>> data). We can also write the field back to the system when
> emitting
> > > >> the
> > > >>>>> table (violating the rule that computed fields are not emitted).
> > > >>>>> 4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE
> KEY
> > > >>>>> constraint and therefore keep it in the schema definition.
> > > >>>>> 4d) For custom watermark strategies, a simple expressions or
> > > >>>>> ScalarFunctions won't be sufficient. Sophisticated approaches
> could
> > > >>>> collect
> > > >>>>> histograms, etc. But I think we can leave that out for later.
> > > >>>>>
> > > >>>>> 3) SOURCE / SINK / BOTH
> > > >>>>> As you said, there are two things to consider here: permission
> and
> > > >>>>> availability of a TableSource/TableSink.
> > > >>>>> I think that neither should be a reason to add a keyword at such
> a
> > > >>>>> sensitive position.
> > > >>>>> However, I also see Timo's point that it would be good to know
> > > >> up-front
> > > >>>> how
> > > >>>>> a table can be used without trying to instantiate a
> > TableSource/Sink
> > > >>> for
> > > >>>> a
> > > >>>>> query.
> > > >>>>> Maybe we can extend the TableFactory such that it provides
> > > >> information
> > > >>>>> about which sources/sinks it can provide.
> > > >>>>>
> > > >>>>> 7. Table Update Mode
> > > >>>>> Something that we definitely need to consider is how tables are
> > > >>> ingested,
> > > >>>>> i.e., append, retract or upsert.
> > > >>>>> Especially, since upsert and retraction need a meta-data column
> > that
> > > >>>>> indicates whether an event is an insert (or upsert) or a delete
> > > >> change.
> > > >>>>> This column needs to be identified somehow, most likely as part
> of
> > > >> the
> > > >>>>> input format. Ideally, this column should not be part of the
> table
> > > >>> schema
> > > >>>>> (as it would be always true).
> > > >>>>> Emitting tables is not so much of an issue as the properties of
> the
> > > >>> table
> > > >>>>> tell use what to do (append-only/update, unique key y/n).
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Fabian
> > > >>>>>
> > > >>>>>
> > > >>>>> Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <
> > imjark@xxxxxxxxx
> > > >>> :
> > > >>>>>> Hi Timo,
> > > >>>>>>
> > > >>>>>> Thanks for your quickly feedback! Here are some of my thoughts:
> > > >>>>>>
> > > >>>>>> Append, upserts, retract mode on sinks is also a very complex
> > > >>> problem. I
> > > >>>>>> think append/upserts/retract is the ability of a table, user do
> > not
> > > >>>> need to
> > > >>>>>> specify a table is used for append or retraction or upsert. The
> > > >> query
> > > >>>> can
> > > >>>>>> choose which mode the sink is. If an unbounded groupby is
> inserted
> > > >>> into
> > > >>>> an
> > > >>>>>> append sink (the sink only implements/supports append), an
> > exception
> > > >>>> can be
> > > >>>>>> thrown. A more complex problem is, if we want to write
> > > >>>> retractions/upserts
> > > >>>>>> to Kafka, how to encode the change flag (add or retract/delete)
> on
> > > >> the
> > > >>>>>> table? Maybe we should propose some protocal for the change flag
> > > >>>> encoding,
> > > >>>>>> but I don't have a clear idea about this right now.
> > > >>>>>>
> > > >>>>>> 3. Sources/Sinks: The source/sink tag is similar to the
> > > >>>>>> append/upsert/retract problem. Besides source/sink, actully we
> > have
> > > >>>> stream
> > > >>>>>> source, stream sink, batch source, batch sink, and the stream
> sink
> > > >>> also
> > > >>>>>> include append/upsert/retract three modes. Should we put all the
> > > >> tags
> > > >>> on
> > > >>>>>> the CREATE TABLE? IMO, the table's ability is defined by the
> table
> > > >>>> itself,
> > > >>>>>> user do not need to specify it. If it is only a readable table,
> an
> > > >>>>>> exception can be thrown when write to it. As the source/sink tag
> > can
> > > >>> be
> > > >>>>>> omitted in CREATE TABLE, could we skip it and only support
> CREATE
> > > >>> TABLE
> > > >>>> in
> > > >>>>>> the first version, and add it back in the future when we really
> > need
> > > >>>> it? It
> > > >>>>>> keeps API compatible and make sure the MVP is what we consider
> > > >>> clearly.
> > > >>>>>> 4a. How to mark a field as attribute?
> > > >>>>>> The watermark definition includes two parts: use which field as
> > time
> > > >>>>>> attribute and use what generate strategy.
> > > >>>>>> When we want to mark `ts` field as attribute: WATERMARK FOR `ts`
> > AS
> > > >>>> OFFSET
> > > >>>>>> '5' SECOND.
> > > >>>>>> If we have a POJO{id, user, ts} field named "pojo", we can mark
> it
> > > >>> like
> > > >>>>>> this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND
> > > >>>>>>
> > > >>>>>> 4b. timestamp write to Kafka message header
> > > >>>>>> Even though we can define multiple time attribute on a table,
> only
> > > >> one
> > > >>>> time
> > > >>>>>> attribute can be actived/used in a query (in a stream). When we
> > > >> enable
> > > >>>>>> `writeTiemstamp`, the only attribute actived in the stream will
> be
> > > >>>> write to
> > > >>>>>> Kafka message header. What I mean the timestmap in StreamRecord
> is
> > > >> the
> > > >>>> time
> > > >>>>>> attribute in the stream.
> > > >>>>>>
> > > >>>>>> 4c. Yes. We introduced the WATERMARK keyword similar to the
> INDEX,
> > > >>>> PRIMARY
> > > >>>>>> KEY keywords.
> > > >>>>>>
> > > >>>>>> @Timo, Do you have any other advice or questions on the
> watermark
> > > >>>> syntax ?
> > > >>>>>> For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS
> > > >>>> "OFFSET"
> > > >>>>>> VS ...
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Cheers,
> > > >>>>>> Jark
> > > >>>>>>
> > > >>>>>> On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.86xy@xxxxxxxxx>
> > wrote:
> > > >>>>>>
> > > >>>>>>> Hi Timo,
> > > >>>>>>> Thanks for your feedback, here's some thoughts of mine:
> > > >>>>>>>
> > > >>>>>>> 3. Sources/Sinks:
> > > >>>>>>> "Let's assume an interactive CLI session, people should be able
> > to
> > > >>> list
> > > >>>>>> all
> > > >>>>>>> source table and sink tables to know upfront if they can use an
> > > >>> INSERT
> > > >>>>>> INTO
> > > >>>>>>> here or not."
> > > >>>>>>> This requirement can be simply resolved by a document that list
> > all
> > > >>>>>>> supported source/sink/both connectors and the sql-client can
> > > >> perform
> > > >>> a
> > > >>>>>>> quick check. It's only an implementation choice, not necessary
> > for
> > > >>> the
> > > >>>>>>> syntax.
> > > >>>>>>> For connector implementation, a connector may implement one or
> > some
> > > >>> or
> > > >>>>>> all
> > > >>>>>>> of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can
> > > >> derive
> > > >>>> the
> > > >>>>>>> availability for any give query without the SOURCE/SINk
> keywords
> > or
> > > >>>>>>> specific table properties in WITH clause.
> > > >>>>>>> Since there's still indeterminacy, shall we skip these two
> > keywords
> > > >>> for
> > > >>>>>> the
> > > >>>>>>> MVP DDL? We can make further discussion after users' feedback.
> > > >>>>>>>
> > > >>>>>>> 6. Partitioning and keys
> > > >>>>>>> Agree with you that raise the priority of table constraint and
> > > >>>>>> partitioned
> > > >>>>>>> table support for better connectivity to Hive and Kafka. I'll
> add
> > > >>>>>>> partitioned table syntax(compatible to hive) into the DDL Draft
> > doc
> > > >>>>>>> later[1].
> > > >>>>>>>
> > > >>>>>>> 5. Schema declaration
> > > >>>>>>> "if users want to declare computed columns they have a "schema"
> > > >>>>>> constraints
> > > >>>>>>> but without columns
> > > >>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > > >>>>>>> format.schema-file = "/my/avrofile.avsc") "
> > > >>>>>>>
> > > >>>>>>>   From the point of my view, this ddl is invalid because the
> > > primary
> > > >>> key
> > > >>>>>>> constraint already references two columns but types unseen.
> > > >>>>>>> And Xuefu pointed a important matching problem, so let's put
> > schema
> > > >>>>>>> derivation as a follow-up extension ?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月6日周四 下午6:05写道:
> > > >>>>>>>
> > > >>>>>>>> Hi everyone,
> > > >>>>>>>>
> > > >>>>>>>> great to have such a lively discussion. My next batch of
> > feedback:
> > > >>>>>>>>
> > > >>>>>>>> @Jark: We don't need to align the descriptor approach with
> SQL.
> > > >> I'm
> > > >>>>>> open
> > > >>>>>>>> for different approaches as long as we can serve a broad set
> of
> > > >> use
> > > >>>>>>>> cases and systems. The descriptor approach was a first attempt
> > to
> > > >>>> cover
> > > >>>>>>>> all aspects and connector/format characteristics. Just another
> > > >>>> example,
> > > >>>>>>>> that is missing in the DDL design: How can a user decide if
> > > >> append,
> > > >>>>>>>> retraction, or upserts should be used to sink data into the
> > target
> > > >>>>>>>> system? Do we want to define all these improtant properties in
> > the
> > > >>> big
> > > >>>>>>>> WITH property map? If yes, we are already close to the
> > descriptor
> > > >>>>>>>> approach. Regarding the "standard way", most DDL languages
> have
> > > >> very
> > > >>>>>>>> custom syntax so there is not a real "standard".
> > > >>>>>>>>
> > > >>>>>>>> 3. Sources/Sinks: @Lin: If a table has both read/write access
> it
> > > >> can
> > > >>>> be
> > > >>>>>>>> created using a regular CREATE TABLE (omitting a specific
> > > >>> source/sink)
> > > >>>>>>>> declaration. Regarding the transition from source/sink to
> both,
> > > >> yes
> > > >>> we
> > > >>>>>>>> would need to update the a DDL and catalogs. But is this a
> > > >> problem?
> > > >>>> One
> > > >>>>>>>> also needs to add new queries that use the tables. @Xuefu: It
> is
> > > >> not
> > > >>>>>>>> only about security aspects. Especially for streaming use
> cases,
> > > >> not
> > > >>>>>>>> every connector can be used as a source easily. For example, a
> > > >> JDBC
> > > >>>>>> sink
> > > >>>>>>>> is easier than a JDBC source. Let's assume an interactive CLI
> > > >>> session,
> > > >>>>>>>> people should be able to list all source table and sink tables
> > to
> > > >>> know
> > > >>>>>>>> upfront if they can use an INSERT INTO here or not.
> > > >>>>>>>>
> > > >>>>>>>> 6. Partitioning and keys: @Lin: I would like to include this
> in
> > > >> the
> > > >>>>>>>> design given that Hive integration and Kafka key support are
> in
> > > >> the
> > > >>>>>>>> making/are on our roadmap for this release.
> > > >>>>>>>>
> > > >>>>>>>> 5. Schema declaration: @Lin: You are right it is not
> > conflicting.
> > > >> I
> > > >>>>>> just
> > > >>>>>>>> wanted to raise the point because if users want to declare
> > > >> computed
> > > >>>>>>>> columns they have a "schema" constraints but without columns.
> > Are
> > > >> we
> > > >>>> ok
> > > >>>>>>>> with a syntax like ...
> > > >>>>>>>> CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
> > > >>>>>>>> format.schema-file = "/my/avrofile.avsc") ?
> > > >>>>>>>> @Xuefu: Yes, you are right that an external schema might not
> > > >> excatly
> > > >>>>>>>> match but this is true for both directions:
> > > >>>>>>>> table schema "derives" format schema and format schema
> "derives"
> > > >>> table
> > > >>>>>>>> schema.
> > > >>>>>>>>
> > > >>>>>>>> 7. Hive compatibility: @Xuefu: I agree that Hive is popular
> but
> > we
> > > >>>>>>>> should not just adopt everything from Hive as there syntax is
> > very
> > > >>>>>>>> batch-specific. We should come up with a superset of
> historical
> > > >> and
> > > >>>>>>>> future requirements. Supporting Hive queries can be an
> > > >> intermediate
> > > >>>>>>>> layer on top of Flink's DDL.
> > > >>>>>>>>
> > > >>>>>>>> 4. Time attributes: @Lin: I'm fine with changing the
> > > >>>> TimestampExtractor
> > > >>>>>>>> interface as this is also important for better separation of
> > > >>> connector
> > > >>>>>>>> and table module [1]. However, I'm wondering about watermark
> > > >>>>>> generation.
> > > >>>>>>>> 4a. timestamps are in the schema twice:
> > > >>>>>>>> @Jark: "existing field is Long/Timestamp, we can just use it
> as
> > > >>>>>>>> rowtime": yes, but we need to mark a field as such an
> attribute.
> > > >> How
> > > >>>>>>>> does the syntax for marking look like? Also in case of
> > timestamps
> > > >>> that
> > > >>>>>>>> are nested in the schema?
> > > >>>>>>>>
> > > >>>>>>>> 4b. how can we write out a timestamp into the message header?:
> > > >>>>>>>> I agree to simply ignore computed columns when writing out.
> This
> > > >> is
> > > >>>>>> like
> > > >>>>>>>> 'field-change: add' that I mentioned in the improvements
> > document.
> > > >>>>>>>> @Jark: "then the timestmap in StreamRecord will be write to
> > Kafka
> > > >>>>>>>> message header": Unfortunately, there is no timestamp in the
> > > >> stream
> > > >>>>>>>> record. Additionally, multiple time attributes can be in a
> > schema.
> > > >>> So
> > > >>>>>> we
> > > >>>>>>>> need a constraint that tells the sink which column to use
> > > >> (possibly
> > > >>>>>>>> computed as well)?
> > > >>>>>>>>
> > > >>>>>>>> 4c. separate all time attribute concerns into a special clause
> > > >> next
> > > >>> to
> > > >>>>>>>> the regular schema?
> > > >>>>>>>> @Jark: I don't have a strong opinion on this. I just have the
> > > >>> feeling
> > > >>>>>>>> that the "schema part" becomes quite messy because the actual
> > > >> schema
> > > >>>>>>>> with types and fields is accompanied by so much metadata about
> > > >>>>>>>> timestamps, watermarks, keys,... and we would need to
> introduce
> > a
> > > >>> new
> > > >>>>>>>> WATERMARK keyword within a schema that was close to standard
> up
> > to
> > > >>>> this
> > > >>>>>>>> point.
> > > >>>>>>>>
> > > >>>>>>>> Thanks everyone,
> > > >>>>>>>> Timo
> > > >>>>>>>>
> > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-9461
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Am 06.12.18 um 07:08 schrieb Jark Wu:
> > > >>>>>>>>> Hi Timo,
> > > >>>>>>>>>
> > > >>>>>>>>> Thank you for the valuable feedbacks.
> > > >>>>>>>>>
> > > >>>>>>>>> First of all, I think we don't need to align the SQL
> > > >> functionality
> > > >>> to
> > > >>>>>>>>> Descriptor. Because SQL is a more standard API, we should be
> as
> > > >>>>>>> cautious
> > > >>>>>>>> as
> > > >>>>>>>>> possible to extend the SQL syntax. If something can be done
> in
> > a
> > > >>>>>>> standard
> > > >>>>>>>>> way, we shouldn't introduce something new.
> > > >>>>>>>>>
> > > >>>>>>>>> Here are some of my thoughts:
> > > >>>>>>>>>
> > > >>>>>>>>> 1. Scope: Agree.
> > > >>>>>>>>> 2. Constraints: Agree.
> > > >>>>>>>>> 4. Time attributes:
> > > >>>>>>>>>      4a. timestamps are in the schema twice.
> > > >>>>>>>>>       If an existing field is Long/Timestamp, we can just use
> > it
> > > >> as
> > > >>>>>>>> rowtime,
> > > >>>>>>>>> no twice defined. If it is not a Long/Timestamp, we use
> > computed
> > > >>>>>> column
> > > >>>>>>>> to
> > > >>>>>>>>> get an expected timestamp column to be rowtime, is this what
> > you
> > > >>> mean
> > > >>>>>>>>> defined twice?  But I don't think it is a problem, but an
> > > >>> advantages,
> > > >>>>>>>>> because it is easy to use, user do not need to consider
> whether
> > > >> to
> > > >>>>>>>> "replace
> > > >>>>>>>>> the existing column" or "add a new column", he will not be
> > > >> confused
> > > >>>>>>>> what's
> > > >>>>>>>>> the real schema is, what's the index of rowtime in the
> schema?
> > > >>>>>>> Regarding
> > > >>>>>>>> to
> > > >>>>>>>>> the optimization, even if timestamps are in schema twice,
> when
> > > >> the
> > > >>>>>>>> original
> > > >>>>>>>>> timestamp is never used in query, then the projection
> pushdown
> > > >>>>>>>> optimization
> > > >>>>>>>>> can cut this field as early as possible, which is exactly the
> > > >> same
> > > >>> as
> > > >>>>>>>>> "replacing the existing column" in runtime.
> > > >>>>>>>>>
> > > >>>>>>>>>       4b. how can we write out a timestamp into the message
> > > >> header?
> > > >>>>>>>>>        That's a good point. I think computed column is just a
> > > >>> virtual
> > > >>>>>>>> column
> > > >>>>>>>>> on table which is only relative to reading. If we want to
> write
> > > >> to
> > > >>> a
> > > >>>>>>>> table
> > > >>>>>>>>> with computed column defined, we only need to provide the
> > columns
> > > >>>>>>> except
> > > >>>>>>>>> computed columns (see SQL Server [1]). The computed column is
> > > >>> ignored
> > > >>>>>>> in
> > > >>>>>>>>> the insert statement. Get back to the question, how can we
> > write
> > > >>> out
> > > >>>>>> a
> > > >>>>>>>>> timestamp into the message header? IMO, we can provide a
> > > >>>>>> configuration
> > > >>>>>>> to
> > > >>>>>>>>> support this, such as `kafka.writeTimestamp=true`, then the
> > > >>> timestmap
> > > >>>>>>> in
> > > >>>>>>>>> StreamRecord will be write to Kafka message header. What do
> you
> > > >>>>>> think?
> > > >>>>>>>>>        4c. separate all time attribute concerns into a
> special
> > > >>> clause
> > > >>>>>>> next
> > > >>>>>>>> to
> > > >>>>>>>>> the regular schema?
> > > >>>>>>>>>        Separating watermark into a special clause similar to
> > > >>>>>> PARTITIONED
> > > >>>>>>>> BY is
> > > >>>>>>>>> also a good idea. Conceptually, it's fine to put watermark in
> > > >>> schema
> > > >>>>>>> part
> > > >>>>>>>>> or out schema part. But if we want to support multiple
> > watermark
> > > >>>>>>>>> definition, maybe it would be better to put it in schema
> part.
> > It
> > > >>> is
> > > >>>>>>>>> similar to Index Definition that we can define several
> indexes
> > > >> on a
> > > >>>>>>> table
> > > >>>>>>>>> in schema part.
> > > >>>>>>>>>
> > > >>>>>>>>>        4d. How can people come up with a custom watermark
> > > >> strategy?
> > > >>>>>>>>>        In most cases, the built-in strategy can works good.
> If
> > we
> > > >>> need
> > > >>>>>> a
> > > >>>>>>>>> custom one, we can use a scalar function which restrict to
> only
> > > >>>>>> return
> > > >>>>>>> a
> > > >>>>>>>>> nullable Long, and use it in SQL like: WATERMARK for rowtime
> AS
> > > >>>>>>>>> watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined
> > > >> scalar
> > > >>>>>>>> function
> > > >>>>>>>>> accepts 3 parameters and return a nullable Long which can be
> > used
> > > >>> as
> > > >>>>>>>>> punctuated watermark assigner. Another choice is
> implementing a
> > > >>> class
> > > >>>>>>>>> extending the
> > > >>>>>>>>>
> `org.apache.flink.table.sources.wmstrategies.WatermarkStrategy`
> > > >> and
> > > >>>>>> use
> > > >>>>>>>> it
> > > >>>>>>>>> in SQL: WATERMARK for rowtime AS
> 'com.my.MyWatermarkStrategy'.
> > > >> But
> > > >>> if
> > > >>>>>>>>> scalar function can cover the requirements here, I would
> prefer
> > > >> it
> > > >>>>>>> here,
> > > >>>>>>>>> because it keeps standard compliant. BTW, this feature is not
> > in
> > > >>> MVP,
> > > >>>>>>> we
> > > >>>>>>>>> can discuss it more depth in the future when we need it.
> > > >>>>>>>>>
> > > >>>>>>>>> 5. Schema declaration:
> > > >>>>>>>>> I like the proposal to omit the schema if we can get the
> schema
> > > >>> from
> > > >>>>>>>>> external storage or something schema file. Actually, we have
> > > >>> already
> > > >>>>>>>>> encountered this requirement in out company.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> +1 to @Xuefu that we should be as close as possible to Hive
> > > >> syntax
> > > >>>>>>> while
> > > >>>>>>>>> keeping SQL ANSI standard. This will make it more acceptable
> > and
> > > >>>>>> reduce
> > > >>>>>>>> the
> > > >>>>>>>>> learning cost for user.
> > > >>>>>>>>>
> > > >>>>>>>>> [1]:
> > > >>>>>>>>>
> > > >>
> > >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Jark
> > > >>>>>>>>>
> > > >>>>>>>>> On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <
> > > >> xuefu.z@xxxxxxxxxxxxxxx
> > > >>>>>>>> wrote:
> > > >>>>>>>>>> Hi Timo/Shuyi/Lin,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for the discussions. It seems that we are converging
> to
> > > >>>>>>> something
> > > >>>>>>>>>> meaningful. Here are some of my thoughts:
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1. +1 on MVP DDL
> > > >>>>>>>>>> 3. Markers for source or sink seem more about permissions on
> > > >>> tables
> > > >>>>>>> that
> > > >>>>>>>>>> belong to a security component. Unless the table is created
> > > >>>>>>> differently
> > > >>>>>>>>>> based on source, sink, or both, it doesn't seem necessary to
> > use
> > > >>>>>> these
> > > >>>>>>>>>> keywords to enforce permissions.
> > > >>>>>>>>>> 5. It might be okay if schema declaration is always needed.
> > > >> While
> > > >>>>>>> there
> > > >>>>>>>>>> might be some duplication sometimes, it's not always true.
> For
> > > >>>>>>> example,
> > > >>>>>>>>>> external schema may not be exactly matching Flink schema.
> For
> > > >>>>>>> instance,
> > > >>>>>>>>>> data types. Even if so, perfect match is not required. For
> > > >>> instance,
> > > >>>>>>> the
> > > >>>>>>>>>> external schema file may evolve while table schema in Flink
> > may
> > > >>> stay
> > > >>>>>>>>>> unchanged. A responsible reader should be able to scan the
> > file
> > > >>>>>> based
> > > >>>>>>> on
> > > >>>>>>>>>> file schema and return the data based on table schema.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Other aspects:
> > > >>>>>>>>>>
> > > >>>>>>>>>> 7. Hive compatibility. Since Flink SQL will soon be able to
> > > >>> operate
> > > >>>>>> on
> > > >>>>>>>>>> Hive metadata and data, it's an add-on benefit if we can be
> > > >>>>>> compatible
> > > >>>>>>>> with
> > > >>>>>>>>>> Hive syntax/semantics while following ANSI standard. At
> least
> > we
> > > >>>>>>> should
> > > >>>>>>>> be
> > > >>>>>>>>>> as close as possible. Hive DDL can found at
> > > >>>>>>>>>>
> > > >>>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
> > > >>>>>>>>>> Thanks,
> > > >>>>>>>>>> Xuefu
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >> ------------------------------------------------------------------
> > > >>>>>>>>>> Sender:Lin Li <lincoln.86xy@xxxxxxxxx>
> > > >>>>>>>>>> Sent at:2018 Dec 6 (Thu) 10:49
> > > >>>>>>>>>> Recipient:dev <dev@xxxxxxxxxxxxxxxx>
> > > >>>>>>>>>> Subject:Re: [DISCUSS] Flink SQL DDL Design
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi Timo and Shuyi,
> > > >>>>>>>>>>      thanks for your feedback.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1. Scope
> > > >>>>>>>>>> agree with you we should focus on the MVP DDL first.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 2. Constraints
> > > >>>>>>>>>> yes, this can be a follow-up issue.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 3. Sources/Sinks
> > > >>>>>>>>>> If a TABLE has both read/write access requirements, should
> we
> > > >>>>>> declare
> > > >>>>>>> it
> > > >>>>>>>>>> using
> > > >>>>>>>>>> `CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further
> > > >>>>>> question,
> > > >>>>>>>> if a
> > > >>>>>>>>>> TABLE
> > > >>>>>>>>>> t1 firstly declared as read only (as a source table), then
> for
> > > >>> some
> > > >>>>>>> new
> > > >>>>>>>>>> requirements
> > > >>>>>>>>>> t1 will change to a sink table,  in this case we need
> updating
> > > >>> both
> > > >>>>>>> the
> > > >>>>>>>> DDL
> > > >>>>>>>>>> and catalogs.
> > > >>>>>>>>>> Further more, let's think about the BATCH query, update one
> > > >> table
> > > >>>>>>>> in-place
> > > >>>>>>>>>> can be a common case.
> > > >>>>>>>>>> e.g.,
> > > >>>>>>>>>> ```
> > > >>>>>>>>>> CREATE TABLE t1 (
> > > >>>>>>>>>>      col1 varchar,
> > > >>>>>>>>>>      col2 int,
> > > >>>>>>>>>>      col3 varchar
> > > >>>>>>>>>>      ...
> > > >>>>>>>>>> );
> > > >>>>>>>>>>
> > > >>>>>>>>>> INSERT [OVERWRITE] TABLE t1
> > > >>>>>>>>>> AS
> > > >>>>>>>>>> SELECT
> > > >>>>>>>>>>      (some computing ...)
> > > >>>>>>>>>> FROM t1;
> > > >>>>>>>>>> ```
> > > >>>>>>>>>> So, let's forget these SOURCE/SINK keywords in DDL. For the
> > > >>>>>> validation
> > > >>>>>>>>>> purpose, we can find out other ways.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 4. Time attributes
> > > >>>>>>>>>> As Shuyi mentioned before, there exists an
> > > >>>>>>>>>>
> > `org.apache.flink.table.sources.tsextractors.TimestampExtractor`
> > > >>> for
> > > >>>>>>>> custom
> > > >>>>>>>>>> defined time attributes usage, but this expression based
> class
> > > >> is
> > > >>>>>> more
> > > >>>>>>>>>> friendly for table api not the SQL.
> > > >>>>>>>>>> ```
> > > >>>>>>>>>> /**
> > > >>>>>>>>>>      * Provides the an expression to extract the timestamp
> > for a
> > > >>>>>> rowtime
> > > >>>>>>>>>> attribute.
> > > >>>>>>>>>>      */
> > > >>>>>>>>>> abstract class TimestampExtractor extends
> FieldComputer[Long]
> > > >> with
> > > >>>>>>>>>> Serializable {
> > > >>>>>>>>>>
> > > >>>>>>>>>>      /** Timestamp extractors compute the timestamp as Long.
> > */
> > > >>>>>>>>>>      override def getReturnType: TypeInformation[Long] =
> > > >>>>>>>>>> Types.LONG.asInstanceOf[TypeInformation[Long]]
> > > >>>>>>>>>> }
> > > >>>>>>>>>> ```
> > > >>>>>>>>>> BTW, I think both the Scalar function and the
> > TimestampExtractor
> > > >>> are
> > > >>>>>>>>>> expressing computing logic, the TimestampExtractor has no
> more
> > > >>>>>>>> advantage in
> > > >>>>>>>>>> SQL scenarios.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> 6. Partitioning and keys
> > > >>>>>>>>>> Primary Key is included in Constraint part, and partitioned
> > > >> table
> > > >>>>>>>> support
> > > >>>>>>>>>> can be another topic later.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 5. Schema declaration
> > > >>>>>>>>>> Agree with you that we can do better schema derivation for
> > user
> > > >>>>>>>>>> convenience, but this is not conflict with the syntax.
> > > >>>>>>>>>> Table properties can carry any useful informations both for
> > the
> > > >>>>>> users
> > > >>>>>>>> and
> > > >>>>>>>>>> the framework, I like your `contract name` proposal,
> > > >>>>>>>>>> e.g., `WITH (format.type = avro)`, the framework can
> recognize
> > > >>> some
> > > >>>>>>>>>> `contract name` like `format.type`, `connector.type` and
> etc.
> > > >>>>>>>>>> And also derive the table schema from an existing schema
> file
> > > >> can
> > > >>> be
> > > >>>>>>>> handy
> > > >>>>>>>>>> especially one with too many table columns.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Regards,
> > > >>>>>>>>>> Lin
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月5日周三 下午10:40写道:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi Jark and Shuyi,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> thanks for pushing the DDL efforts forward. I agree that we
> > > >>> should
> > > >>>>>>> aim
> > > >>>>>>>>>>> to combine both Shuyi's design and your design.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Here are a couple of concerns that I think we should
> address
> > in
> > > >>> the
> > > >>>>>>>>>> design:
> > > >>>>>>>>>>> 1. Scope: Let's focuses on a MVP DDL for CREATE TABLE
> > > >> statements
> > > >>>>>>> first.
> > > >>>>>>>>>>> I think this topic has already enough potential for long
> > > >>>>>> discussions
> > > >>>>>>>> and
> > > >>>>>>>>>>> is very helpful for users. We can discuss CREATE VIEW and
> > > >> CREATE
> > > >>>>>>>>>>> FUNCTION afterwards as they are not related to each other.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 2. Constraints: I think we should consider things like
> > > >>> nullability,
> > > >>>>>>>>>>> VARCHAR length, and decimal scale and precision in the
> future
> > > >> as
> > > >>>>>> they
> > > >>>>>>>>>>> allow for nice optimizations. However, since both the
> > > >> translation
> > > >>>>>> and
> > > >>>>>>>>>>> runtime operators do not support those features. I would
> not
> > > >>>>>>> introduce
> > > >>>>>>>> a
> > > >>>>>>>>>>> arbitrary default value but omit those parameters for now.
> > This
> > > >>> can
> > > >>>>>>> be
> > > >>>>>>>> a
> > > >>>>>>>>>>> follow-up issue once the basic DDL has been merged.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 3. Sources/Sinks: We had a discussion about CREATE TABLE vs
> > > >>> CREATE
> > > >>>>>>>>>>> [SOURCE|SINK|] TABLE before. In my opinion we should allow
> > for
> > > >>>>>> these
> > > >>>>>>>>>>> explicit declaration because in most production scenarios,
> > > >> teams
> > > >>>>>> have
> > > >>>>>>>>>>> strict read/write access requirements. For example, a data
> > > >>> science
> > > >>>>>>> team
> > > >>>>>>>>>>> should only consume from a event Kafka topic but should not
> > > >>>>>>> accidently
> > > >>>>>>>>>>> write back to the single source of truth.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 4. Time attributes: In general, I like your computed
> columns
> > > >>>>>> approach
> > > >>>>>>>>>>> because it makes defining a rowtime attributes transparent
> > and
> > > >>>>>>> simple.
> > > >>>>>>>>>>> However, there are downsides that we should discuss.
> > > >>>>>>>>>>> 4a. Jarks current design means that timestamps are in the
> > > >> schema
> > > >>>>>>> twice.
> > > >>>>>>>>>>> The design that is mentioned in [1] makes this more
> flexible
> > as
> > > >>> it
> > > >>>>>>>>>>> either allows to replace an existing column or add a
> computed
> > > >>>>>> column.
> > > >>>>>>>>>>> 4b. We need to consider the zoo of storage systems that is
> > out
> > > >>>>>> there
> > > >>>>>>>>>>> right now. Take Kafka as an example, how can we write out a
> > > >>>>>> timestamp
> > > >>>>>>>>>>> into the message header? We need to think of a reverse
> > > >> operation
> > > >>>>>> to a
> > > >>>>>>>>>>> computed column.
> > > >>>>>>>>>>> 4c. Does defining a watermark really fit into the schema
> part
> > > >> of
> > > >>> a
> > > >>>>>>>>>>> table? Shouldn't we separate all time attribute concerns
> > into a
> > > >>>>>>> special
> > > >>>>>>>>>>> clause next to the regular schema, similar how PARTITIONED
> BY
> > > >>> does
> > > >>>>>> it
> > > >>>>>>>> in
> > > >>>>>>>>>>> Hive?
> > > >>>>>>>>>>> 4d. How can people come up with a custom watermark
> strategy?
> > I
> > > >>>>>> guess
> > > >>>>>>>>>>> this can not be implemented in a scalar function and would
> > > >>> require
> > > >>>>>>> some
> > > >>>>>>>>>>> new type of UDF?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 6. Partitioning and keys: Another question that the DDL
> > design
> > > >>>>>> should
> > > >>>>>>>>>>> answer is how do we express primary keys (for upserts),
> > > >>>>>> partitioning
> > > >>>>>>>>>>> keys (for Hive, Kafka message keys). All part of the table
> > > >>> schema?
> > > >>>>>>>>>>> 5. Schema declaration: I find it very annoying that we want
> > to
> > > >>>>>> force
> > > >>>>>>>>>>> people to declare all columns and types again even though
> > this
> > > >> is
> > > >>>>>>>>>>> usually already defined in some company-wide format. I know
> > > >> that
> > > >>>>>>>> catalog
> > > >>>>>>>>>>> support will greatly improve this. But if no catalog is
> used,
> > > >>>>>> people
> > > >>>>>>>>>>> need to manually define a schema with 50+ fields in a Flink
> > > >> DDL.
> > > >>>>>>> What I
> > > >>>>>>>>>>> actually promoted having two ways of reading data:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 1. Either the format derives its schema from the table
> > schema.
> > > >>>>>>>>>>> CREATE TABLE (col INT) WITH (format.type = avro)
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 2. Or the table schema can be omitted and the format schema
> > > >>> defines
> > > >>>>>>> the
> > > >>>>>>>>>>> table schema (+ time attributes).
> > > >>>>>>>>>>> CREATE TABLE WITH (format.type = avro, format.schema-file =
> > > >>>>>>>>>>> "/my/avrofile.avsc")
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Please let me know what you think about each item. I will
> try
> > > >> to
> > > >>>>>>>>>>> incorporate your feedback in [1] this week.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Regards,
> > > >>>>>>>>>>> Timo
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> [1]
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf
> > > >>>>>>>>>>> Am 05.12.18 um 13:01 schrieb Jark Wu:
> > > >>>>>>>>>>>> Hi Shuyi,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> It's exciting to see we can make such a great progress
> here.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Regarding to the watermark:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Watermarks can be defined on any columns (including
> > > >>>>>> computed-column)
> > > >>>>>>>> in
> > > >>>>>>>>>>>> table schema.
> > > >>>>>>>>>>>> The computed column can be computed from existing columns
> > > >> using
> > > >>>>>>>> builtin
> > > >>>>>>>>>>>> functions and *UserDefinedFunctions* (ScalarFunction).
> > > >>>>>>>>>>>> So IMO, it can work out for almost all the scenarios not
> > only
> > > >>>>>> common
> > > >>>>>>>>>>>> scenarios.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I don't think using a `TimestampExtractor` to support
> custom
> > > >>>>>>> timestamp
> > > >>>>>>>>>>>> extractor in SQL is a good idea. Because
> > `TimestampExtractor`
> > > >>>>>>>>>>>> is not a SQL standard function. If we support
> > > >>> `TimestampExtractor`
> > > >>>>>>> in
> > > >>>>>>>>>>> SQL,
> > > >>>>>>>>>>>> do we need to support CREATE FUNCTION for
> > > >> `TimestampExtractor`?
> > > >>>>>>>>>>>> I think `ScalarFunction` can do the same thing with
> > > >>>>>>>>>> `TimestampExtractor`
> > > >>>>>>>>>>>> but more powerful and standard.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> The core idea of the watermark definition syntax is that
> the
> > > >>>>>> schema
> > > >>>>>>>>>> part
> > > >>>>>>>>>>>> defines all the columns of the table, it is exactly what
> the
> > > >>> query
> > > >>>>>>>>>> sees.
> > > >>>>>>>>>>>> The watermark part is something like a primary key
> > definition
> > > >> or
> > > >>>>>>>>>>> constraint
> > > >>>>>>>>>>>> on SQL Table, it has no side effect on the schema, only
> > > >> defines
> > > >>>>>> what
> > > >>>>>>>>>>>> watermark strategy is and makes which field as the rowtime
> > > >>>>>> attribute
> > > >>>>>>>>>>> field.
> > > >>>>>>>>>>>> If the rowtime field is not in the existing fields, we can
> > use
> > > >>>>>>>> computed
> > > >>>>>>>>>>>> column
> > > >>>>>>>>>>>> to generate it from other existing fields. The Descriptor
> > > >>> Pattern
> > > >>>>>>> API
> > > >>>>>>>>>> [1]
> > > >>>>>>>>>>>> is very useful when writing a Table API job, but is not
> > > >>>>>>> contradictory
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>> Watermark DDL from my perspective.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> [1]:
> > > >>>>>>>>>>>>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
> > > >>>>>>>>>>>> .
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Best,
> > > >>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <
> suez1224@xxxxxxxxx
> > >
> > > >>>>>> wrote:
> > > >>>>>>>>>>>>> Hi Jark and Shaoxuan,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks a lot for the summary. I think we are making great
> > > >>>>>> progress
> > > >>>>>>>>>> here.
> > > >>>>>>>>>>>>> Below are my thoughts.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> *(1) watermark definition
> > > >>>>>>>>>>>>> IMO, it's better to keep it consistent with the rowtime
> > > >>>>>> extractors
> > > >>>>>>>> and
> > > >>>>>>>>>>>>> watermark strategies defined in
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
> > > >>>>>>>>>>>>> .
> > > >>>>>>>>>>>>> Using built-in functions seems to be too much for most of
> > the
> > > >>>>>>> common
> > > >>>>>>>>>>>>> scenarios.
> > > >>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
> > > >>>>>>>>>>>>> Actually, I think we can put the source/sink type info
> into
> > > >> the
> > > >>>>>>> table
> > > >>>>>>>>>>>>> properties, so we can use CREATE TABLE.
> > > >>>>>>>>>>>>> (3) View DDL with properties
> > > >>>>>>>>>>>>> We can remove the view properties section now for the MVP
> > and
> > > >>> add
> > > >>>>>>> it
> > > >>>>>>>>>>> back
> > > >>>>>>>>>>>>> later if needed.
> > > >>>>>>>>>>>>> (4) Type Definition
> > > >>>>>>>>>>>>> I agree we can put the type length or precision into
> future
> > > >>>>>>> versions.
> > > >>>>>>>>>> As
> > > >>>>>>>>>>>>> for the grammar difference, currently, I am using the
> > grammar
> > > >>> in
> > > >>>>>>>>>> Calcite
> > > >>>>>>>>>>>>> type DDL, but since we'll extend the parser in Flink, so
> we
> > > >> can
> > > >>>>>>>>>>> definitely
> > > >>>>>>>>>>>>> change if needed.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Shuyi
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <
> imjark@xxxxxxxxx>
> > > >>>>>> wrote:
> > > >>>>>>>>>>>>>> Hi Shaoxuan,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks for pointing that out. Yes, the source/sink tag
> on
> > > >>> create
> > > >>>>>>>>>> table
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>>>> the another major difference.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Summarize the main differences again:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> *(1) watermark definition
> > > >>>>>>>>>>>>>> *(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
> > > >>>>>>>>>>>>>> (3) View DDL with properties
> > > >>>>>>>>>>>>>> (4) Type Definition
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <
> > > >>> wshaoxuan@xxxxxxxxx
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>> Hi Jark,
> > > >>>>>>>>>>>>>>> Thanks for the summary. Your plan for the 1st round
> > > >>>>>>> implementation
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>>> DDL
> > > >>>>>>>>>>>>>>> looks good to me.
> > > >>>>>>>>>>>>>>> Have we reached the agreement on simplifying/unifying
> > > >> "create
> > > >>>>>>>>>>>>>> [source/sink]
> > > >>>>>>>>>>>>>>> table" to "create table"? "Watermark definition" and
> > > >> "create
> > > >>>>>>> table"
> > > >>>>>>>>>>> are
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> major obstacles on the way to merge two design
> proposals
> > > >>> FMPOV.
> > > >>>>>>>>>>> @Shuyi,
> > > >>>>>>>>>>>>>> It
> > > >>>>>>>>>>>>>>> would be great if you can spend time and respond to
> these
> > > >> two
> > > >>>>>>> parts
> > > >>>>>>>>>>>>>> first.
> > > >>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>> Shaoxuan
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <
> > imjark@xxxxxxxxx>
> > > >>>>>>> wrote:
> > > >>>>>>>>>>>>>>>> Hi Shuyi,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> It seems that you have reviewed the DDL doc [1] that
> Lin
> > > >>> and I
> > > >>>>>>>>>>>>> drafted.
> > > >>>>>>>>>>>>>>>> This doc covers all the features running in Alibaba.
> > > >>>>>>>>>>>>>>>> But some of features might be not needed in the first
> > > >>> version
> > > >>>>>> of
> > > >>>>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>>> SQL
> > > >>>>>>>>>>>>>>>> DDL.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> So my suggestion would be to focus on the MVP DDLs and
> > > >> reach
> > > >>>>>>>>>>>>> agreement
> > > >>>>>>>>>>>>>>> ASAP
> > > >>>>>>>>>>>>>>>> based on the DDL draft [1] and the DDL design [2]
> Shuyi
> > > >>>>>>> proposed.
> > > >>>>>>>>>>>>>>>> And we can discuss on the main differences one by one.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> The following is the MVP DDLs should be included in
> the
> > > >>> first
> > > >>>>>>>>>> version
> > > >>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>> my
> > > >>>>>>>>>>>>>>>> opinion (feedbacks are welcome):
> > > >>>>>>>>>>>>>>>> (1) Table DDL:
> > > >>>>>>>>>>>>>>>>         (1.1) Type definition
> > > >>>>>>>>>>>>>>>>         (1.2) computed column definition
> > > >>>>>>>>>>>>>>>>         (1.3) watermark definition
> > > >>>>>>>>>>>>>>>>         (1.4) with properties
> > > >>>>>>>>>>>>>>>>         (1.5) table constraint (primary key/unique)
> > > >>>>>>>>>>>>>>>>         (1.6) column nullability (nice to have)
> > > >>>>>>>>>>>>>>>> (2) View DDL
> > > >>>>>>>>>>>>>>>> (3) Function DDL
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> The main differences from two DDL docs (sth maybe
> > missed,
> > > >>>>>>> welcome
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>> point
> > > >>>>>>>>>>>>>>>> out):
> > > >>>>>>>>>>>>>>>> *(1.3) watermark*: this is the main and the most
> > important
> > > >>>>>>>>>>>>> difference,
> > > >>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>> would be great if @Timo Walther <twalthr@xxxxxxxxxx>
> > > >>> @Fabian
> > > >>>>>>>>>> Hueske
> > > >>>>>>>>>>>>>>>> <fhueske@xxxxxxxxx>  give some feedbacks.
> > > >>>>>>>>>>>>>>>>      (1.1) Type definition:
> > > >>>>>>>>>>>>>>>>           (a) Should VARCHAR carry a length, e.g.
> > > >>> VARCHAR(128)
> > > >>>> ?
> > > >>>>>>>>>>>>>>>>                In most cases, the varchar length is
> not
> > > >> used
> > > >>>>>>> because
> > > >>>>>>>>>>> they
> > > >>>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>> stored as String in Flink. But it can be used to
> > optimize
> > > >> in
> > > >>>>>> the
> > > >>>>>>>>>>>>> future
> > > >>>>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>>>> we know the column is a fixed length VARCHAR.
> > > >>>>>>>>>>>>>>>>                So IMO, we can support VARCHAR with
> > length
> > > >> in
> > > >>>> the
> > > >>>>>>>>>> future,
> > > >>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>> just VARCHAR in this version.
> > > >>>>>>>>>>>>>>>>           (b) Should DECIMAL support custom scale and
> > > >>>> precision,
> > > >>>>>>>> e.g.
> > > >>>>>>>>>>>>>>>> DECIMAL(12, 5)?
> > > >>>>>>>>>>>>>>>>                If we clearly know the scale and
> > precision
> > > >> of
> > > >>>> the
> > > >>>>>>>>>>> Decimal,
> > > >>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>> can have some optimization on
> > > >> serialization/deserialization.
> > > >>>>>>> IMO,
> > > >>>>>>>>>> we
> > > >>>>>>>>>>>>>> can
> > > >