osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi all,

I think we should discuss what we consider an MVP DDL. For me, an MVP DDL was to just focus on a CREATE TABLE statement. It would be great to come up with a solution that finally solves the issue of connecting different kind of systems. One reason why we postponed DDL statements for quite some time is that we cannot change it easily once released.

However, the current state of the discussion can be summarized by the following functionality:

1. Only support append source tables (because the distinction of update/retract table is not clear). 2. Only support append and update sink tables (because a changeflag is missing). 3. Don't support outputting to Kafka with time attributes (because we cannot set a timestamp).

Personally, I would like to have more use cases enabled by solving the header timestamps and change flag discussion. And I don't see a reason why we have to rush here.

8). Support row/map/array data type
How do we want to support object arrays vs. primitive arrays? Currently, we need to make this clear distinction for between external system and Java [1] (E.g. byte[] arrays vs. object arrays) and users can choose between Types.PRIMITIVE_ARRAY and Types.OBJECT_ARRAY. Otherwise we need to support NULL/NOT NULL for array elements.

4) Event-Time Attributes and Watermarks
I completely agree with Rong here. `ts AS SYSTEMROWTIME()` indicates that the system takes care of this column and for unification this would mean both for sources and sinks. It is still a computed column but gives hints to connectors. Implementing connectors can choose if they want to use this hint or not. The Flink Kafka connector would make use of it. @Jark: I think a PERSISTED keyword would confuse users (as shown by your Stackoverflow question) and would only make sense for SYSTEMROWTIME and no other computed column.

3) SOURCE / SINK / BOTH
@Jark: My initial suggestion was to make the SOURCE/SINK optional such that users can only use CREATE TABLE depending on the use case. But as I said before, since I cannot find support here, we can drop the keywords.

7) Table Update Mode
@Jark: The questions that you posted are exactly the ones that we should find an answer for. Because a DDL should just be the front end to the characteristics of an engine. After thinking about it again a change flag is actually more similar to a PARTITION BY clause because it defines a field that is not in the table's schema but in the schema of the physical format. However, the columns defined by a PARTITION BY are shown when describing/projecting a table whereas a change flag column must not be shown.

If a table source supports append, upserts, and retractions, we need a way to express how we want to connect to the system.

hasPrimaryKey() && !hasChangeFlag() -> append mode
hasPrimaryKey() && hasChangeFlag() -> upsert mode
!hasPrimaryKey() && hasChangeFlag() -> retract mode

Are we fine with this?

Regarding reading `topic`, `partition`, `offset` or custom properties from message headers. I already discussed this in my unified connector document. We don't need built-in functions for all these properties. Those things depend on the connector and format, it is their responsibility to extend the table schema in order to expose those properties (e.g. by providing a Map<String, String> for all these kind of properties).

Example:

CREATE TABLE myTopic (
    col1 INT,
    col2 VARCHAR,
    col3 MAP<VARCHAR, VARCHAR>,
    col4 AS SYSTEMROWTIME()
)
PARTITION BY (col0 LONG)
WITH (
  connector.type = kafka
  format.type = key-value-metadata
  format.key-format.type = avro
  format.value-format.type = json
)

The format defines to use a KeyedDeserializationSchema that extends the schema by a metadata column. The PARTITION BY declares the columns for Kafka's key in Avro format. col1 till col2 are Kafka's JSON columns.

Thanks for your feedback,
Timo

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings


Am 13.12.18 um 09:50 schrieb Jark Wu:
Hi all,

Here are a bunch of my thoughts:

8). support row/map/array data type
That's fine with me if we want to support them in the MVP. In my mind, we
can have the field type syntax like this:

```
filedType ::=
             {
                 simpleType
              | MAP<simpleType, fieldType>
              | ARRAY<fieldType>
              | ROW<columnDefinition [, columnDefinition]*>
             }
```

I have included this in @Shuyi's summary doc [1] . Please leave feedbacks
there!

[1]
https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit

3) SOURCE / SINK / BOTH
@Timo, CREATE TABLE statement is registering a virtual table in the session
or catalog. I don't think it is immutable, as we might also want to support
CREATE INDEX statements in the future. On the other hand, ACL is not a part
of the table definition, it should belong to the permission system which is
usually stored in somewhere else. So GRANT/INVOKE sounds like a more
standard option.

7) Table Update Mode
I agree with @Shuyi that table update mode can be left out from the MVP.
Because IMO, the update mode will not break the current MVP design. It
should be something to add, like the CHANGE_FLAG you proposed. We can
continue this discussion when we finalize the MVP.

Meanwhile, the update mode is a big topic which may involve several weeks
to discuss. For example, (a) do we support CHANGE_FLAG when the table
supports upsert (or when the table defined a primary key)?  (b) the
CHANGE_FLAG should support write and read both. (c) currently, we only
support true (add) and false (retract) flag type, are they enough? (d) How
to connect an external storage which also support insert/delete flag like
mysql binlog?

Regarding to the CHANGE_FLAG @Timo proposed, I think this is a good
direction. But should isRetraction be a physical field and make CHANGE_FLAG
like a constraint on that? If yes, then what the type of isRetraction?

4.b) Ingesting and writing timestamps to systems.
@Shuyi, PERSISTED can solve the problem of the field is not physically
stored. However, it doesn't solve the problem that how to write a field
back to the computed column, because "A computed column cannot be the
target of an INSERT or UPDATE statement" even if the computed column is
persisted. If we want to write a rowtime back the the external system, the
DML should look like this: "INSERT INTO sink SELECT a, rowtime FROM
source". The point is that the `rowtime` must be specified in the INSERT
statement, that's why I hope the `rowtime` field in Table is not a computed
column. See more information about PERSISTED [2] [3].

Another point to consider is SYSTEMROWTIME() only solve reading timestamp
from message header in systems. There are many similar requirements here,
such as reading `topic`, `partition`, `offset` or custom properties from
message headers, do we plan to support a bunch of built-in functions like
SYSTEMROWTIME()?  Do we have some clean and easy way for this?

[2]:
https://docs.microsoft.com/en-us/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-2017
[3]:
https://stackoverflow.com/questions/51390531/sql-server-persisted-computed-columns-versus-actual-normal-column

Looking forward to collaborate with you guys!

Best,
Jark


On Thu, 13 Dec 2018 at 01:38, Rong Rong <walterddr@xxxxxxxxx> wrote:

Thanks for the summary effort @shuyi. Sorry for jumping in the discussion
so late.

As of the scope of MVP, I think we might want to consider adding "table
update mode" problem to it. I agree with @timo that might not be easily
changed in the future if the flags has to be part of the schema/column
definition.

Regarding the components under discussion.
4) Event-Time Attributes and Watermarks
b, c) I actually like the special indicator way @fabian suggested to hint
Flink to read time attributes directly from the system not the data `(ts AS
SYSTEMROWTIME())`. It should also address the "compute field not emitted"
problem by carrying the "virtual column" concept like @shuyi suggested.
However if I understand correctly, this also required to be defined as part
of the schema/column definition.

3) SOURCE / SINK / BOTH
+1 on not adding properties to `CREATE TABLE` to manage ACL/permission.

On a higher level, I think one question I have is whether we can
definitively come to an agreement that the features under discussion (and
potential solutions) can be cleanly adjusted/added from what we are
providing on MVP (e.g. the schema/column definition might be hard to
achieve but if we all agree ACL/permission should not be part of the
`CREATE TABLE` and a decision can be made later). @shuyi I can also help in
drafting the FLIP doc by summarizing the features under discussion and the
concerns to whether included in the MVP, so that we can carry on the
discussions alongside with the MVP implementation effort. I think each one
of these features deserves a subsection dedicated for it.

Many thanks,
Rong


On Wed, Dec 12, 2018 at 1:14 AM Shuyi Chen <suez1224@xxxxxxxxx> wrote:

Hi all,

I summarize the MVP based on the features that we agreed upon. For table
update mode and custom watermark strategy and ts extractor, I found there
are some discussions, so I decided to leave them out for the MVP.
For row/map/array data type, I think we can add it as well if everyone
agrees.


4) Event-Time Attributes and Watermarks
Cited from SQL Server 2017 document (


https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2017
),
"A
computed column is a virtual column that is not physically stored in the
table, unless the column is marked PERSISTED. A computed column
expression
can use data from other columns to calculate a value for the column to
which it belongs. " I think we can also use introduce the PERSISTED
keyword
for computed column to indicate that the field can be stored back to the
table, i.e. ts AS SYSTEMROWTIME() PERSISTED.

3) SOURCE / SINK / BOTH
GRANT/INVOKE sounds like a more standard option than adding a property to
CREATE TABLE to manage the ACL/permission. The ACL can be stored
somewhere
in a database, and allow/disallow access to a dynamic table depending on
whether it's a "INSERT INTO" or "SELECT".

I can volunteer to put the discussion as a FLIP.  I can try to summarize
the current discussion, and share edit permission with you to collaborate
on the documents. After we finalized the doc, we can publish it as a
FLIP.
What do you think?

Shuyi



On Tue, Dec 11, 2018 at 9:13 AM Timo Walther <twalthr@xxxxxxxxxx> wrote:

Hi all,

thanks for summarizing the discussion @Shuyi. I think we need to
include
the "table update mode" problem as it might not be changed easily in
the
future. Regarding "support row/map/array data type", I don't see a
problem why we should not support them now as the data types are
already
included in the runtime. The "support custom timestamp extractor" is
solved by the computed columns approach. The "custom watermark
strategy"
can be added by supplying a class name as paramter in my opinion.

Regarding the comments of Lin and Jark:

@Lin: Instantiating a TableSource/Sink should not cost much, but we
should not mix catalog discussion and DDL at this point.

4) Event-Time Attributes and Watermarks
4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will
violate the rule": there is no explicit rule of doing so. Computed
column are also not standard compliant, if we can use information that
is encoded in constraints we should use it. Adding more and more
top-level properties makes the interaction with connectors more
difficult. An additional HEADER keyword sounds too connector-specific
and also not SQL compliant to me.

3) SOURCE / SINK / BOTH
GRANT/INVOKE are mutating an existing table, right? In my opinion,
independent of SQL databases but focusing on Flink user requirements, a
CREATE TABLE statement should be an immutable definition of a
connection
to an external system.

7) Table Update Mode
As far as I can see, the only thing missing for enabling all table
modes
is the declaration of a change flag. We could introduce a new keyword
here similar to WATERMARK:

CREATE TABLE output_kafka_t1(
    id bigint,
    msg varchar,
    CHANGE_FLAG FOR isRetraction
) WITH (
    type=kafka
    ,...
);

CREATE TABLE output_kafka_t1(
    CHANGE_FLAG FOR isUpsert
    id bigint,
    msg varchar,
    PRIMARY_KEY(id)
) WITH (
    type=kafka
    ,...
);

What do you think?

@Jark: We should definitely stage the discussions and mention the
opinions and advantages/disadvantages that have been proposed already
in
the FLIP.

Regards,
Timo

Am 10.12.18 um 08:10 schrieb Jark Wu:
Hi all,

It's great to see we have an agreement on MVP.

4.b) Ingesting and writing timestamps to systems.
I would treat the field as a physical column not a virtual column. If
we
treat it as computed column, it will be confused that the behavior is
different when it is a source or sink.
When it is a physical column, the behavior could be unified. Then the
problem is how to mapping from the field to kafka message timestamp?
One is Lin proposed above and is also used in KSQL[1]. Another idea
is
introducing a HEADER column which strictly map by name to the fields
in
message header.
For example,

CREATE TABLE output_kafka_t1(
    id bigint,
    ts timestamp HEADER,
    msg varchar
) WITH (
    type=kafka
    ,...
);

This is used in Alibaba but not included in the DDL draft. It will
further
extend the SQL syntax, which is we should be cautious about. What do
you
think about this two solutions?

4.d) Custom watermark strategies:
@Timo,  I don't have a strong opinion on this.

3) SOURCE/SINK/BOTH
Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and
standard way to manage the permission, which is also adopted by
HIVE[2]
and
many databases.

[1]:
https://docs.confluent.io/current/ksql/docs/tutorials/examples.html
[2]:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges
@Timo, it's great if someone can conclude the discussion and
summarize
into
a FLIP.
@Shuyi, Thanks a lot for putting it all together. The google doc
looks
good
to me, and I left some minor comments there.

Regarding to the FLIP, I have some suggestions:
1. The FLIP can contain MILESTONE1 and FUTURE WORKS.
2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax.
3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and
ADOPTED.
We
can derive MILESTONE2 from this easily when it is ready.

I summarized the Future Works based on Shuyi's work:

Adopted: (Should detailed described here...)
1. support data type nullability and precision.
2. comment on table and columns.

Under Discussion: (Should briefly describe some options...)
1. Ingesting and writing timestamps to systems.
2. support custom watermark strategy.
3. support table update mode
4. support row/map/array data type
5. support schema derivation
6. support system versioned temporal table
7. support table index

We can continue the further discussion here, also can separate to an
other
DISCUSS topic if it is a sophisticated problem such as Table Update
Mode,
Temporal Table.

Best,
Jark

On Mon, 10 Dec 2018 at 11:54, Lin Li <lincoln.86xy@xxxxxxxxx> wrote:

hi all,
Thanks for your valuable input!

4) Event-Time Attributes and Watermarks:
4.b) @Fabian As you mentioned using a computed columns `ts AS
SYSTEMROWTIME()`
for writing out to kafka table sink will violate the rule that
computed
fields are not emitted.
Since the timestamp column in kafka's header area is a specific
materialization protocol,
why don't we treat it as an connector property? For an example:
```
CREATE TABLE output_kafka_t1(
    id bigint,
    ts timestamp,
    msg varchar
) WITH (
    type=kafka,
    header.timestamp=ts
    ,...
);
```

4d) For custom watermark strategies
@Fabian Agree with you that opening another topic about this feature
later.
3) SOURCE / SINK / BOTH
I think the permissions and availabilities are two separately
things,
permissions
can be managed well by using GRANT/INVOKE(you can call it DCL)
solutions
which
commonly used in different DBs. The permission part can be an new
topic
for
later discussion, what do you think?

For the availabilities, @Fabian @Timo  I've another question,
does instantiate a TableSource/Sink cost much or has some other
downsides?
IMO, create a new source/sink object via the construct seems not
costly.
When receiving a DDL we should associate it with the catalog object
(reusing an existence or create a new one).
Am I lost something important?

5. Schema declaration:
@Timo  yes, your concern about the user convenience is very
important.
But
I haven't seen a clear way to solve this so far.
Do we put it later and wait for more inputs from the community?

Shuyi Chen <suez1224@xxxxxxxxx> 于2018年12月8日周六 下午4:27写道:

Hi all,

Thanks a lot for the great discussion. I think we can continue the
discussion here while carving out a MVP so that the community can
start
working on. Based on the discussion so far, I try to summarize what
we
will
do for the MVP:

MVP

     1. support CREATE TABLE
     2. support exisiting data type in Flink SQL, ignore nullability
and
     precision
     3. support table comments and column comments
     4. support table constraint PRIMARY KEY and UNIQUE
     5. support table properties using key-value pairs
     6. support partitioned by
     7. support computed column
     8. support from-field and from-source timestamp extractors
     9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE
watermark
     strategies.
     10. support a table property to allow explicit enforcement of
     read/write(source/sink) permission of a table

I try to put up the DDL grammar (


https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing
)
based on the MVP features above and the previous design docs.
Please
take a
look and comment on it.


Also, I summarize the future Improvement on CREATE TABLE as the
followings:
     1. support table update mode
     2. support data type nullability and precision
     3. support row/map/array data type
     4. support custom timestamp extractor and watermark strategy
     5. support schema derivation
     6. support system versioned temporal table
     7. support table index

I suggest we first agree on the MVP feature list and the MVP
grammar.
And
then we can either continue the discussion of the future
improvements
here,
or create separate JIRAs for each item and discuss further in the
JIRA.
What do you guys think?

Shuyi

On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twalthr@xxxxxxxxxx>
wrote:
Hi all,

I think we are making good progress. Thanks for all the feedback
so
far.
3. Sources/Sinks:
It seems that I can not find supporters for explicit SOURCE/SINK
declaration so I'm fine with not using those keywords.
@Fabian: Maybe we don't haven have to change the TableFactory
interface
but just provide some helper functions in the TableFactoryService.
This
would solve the availability problem, but the permission problem
would
still not be solved. If you are fine with it, we could introduce a
property instead?

5. Schema declaration:
@Lin: We should find an agreement on this as it requires changes
to
the
TableFactory interface. We should minimize changes to this
interface
because it is user-facing. Especially, if format schema and table
schema
differ, the need for such a functionality is very important. Our
goal
is
to connect to existing infrastructure. For example, if we are
using
Avro
and the existing Avro format has enums but Flink SQL does not
support
enums, it would be helpful to let the Avro format derive a table
schema.
Otherwise your need to declare both schemas which leads to CREATE
TABLE
statements of 400 lines+.
I think the mentioned query:
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc")
is fine and should only be valid if the schema contains no
non-computed
columns.

7. Table Update Mode:
After thinking about it again, I agree. The mode of the sinks can
be
derived from the query and the existence of a PRIMARY KEY
declaration.
But Fabian raised a very good point. How do we deal with sources?
Shall
we introduce a new keywords similar to WATERMARKS such that a
upsert/retract flag is not part of the visible schema?

4a. How to mark a field as attribute?
@Jark: Thanks for the explanation of the WATERMARK clause
semantics.
This is a nice way of marking existing fields. This sounds good to
me.
4c) WATERMARK as constraint
I'm fine with leaving the WATERMARK clause in the schema
definition.
4d) Custom watermark strategies:
I would already think about custom watermark strategies as the
current
descriptor design already supports this. ScalarFunction's don't
work
as
a PeriodicWatermarkAssigner has different semantics. Why not
simply
entering the a full class name here as it is done in the current
design?
4.b) Ingesting and writing timestamps to systems (like Kafka)
@Fabian: Yes, your suggestion sounds good to me. This behavior
would
be
similar to our current `timestamps: from-source` design.

Once our discussion has found a conclusion, I would like to
volunteer
and summarize the outcome of this mailing thread. It nicely aligns
with
the update work on the connector improvements document (that I
wanted
to
do anyway) and the ongoing external catalog discussion.
Furthermore, I
would also want to propose how to change existing interfaces by
keeping
the DDL, connector improvements, and external catalog support in
mind.
Would that be ok for you?

Thanks,
Timo



Am 07.12.18 um 14:48 schrieb Fabian Hueske:
Hi all,

Thanks for the discussion.
I'd like to share my point of view as well.

4) Event-Time Attributes and Watermarks:
4.a) I agree with Lin and Jark's proposal. Declaring a watermark
on
an
attribute declares it as an event-time attribute.
4.b) Ingesting and writing timestamps to systems (like Kafka). We
could
use
a special function like (ts AS SYSTEMROWTIME()). This function
will
indicate that we read the timestamp directly from the system (and
not
the
data). We can also write the field back to the system when
emitting
the
table (violating the rule that computed fields are not emitted).
4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE
KEY
constraint and therefore keep it in the schema definition.
4d) For custom watermark strategies, a simple expressions or
ScalarFunctions won't be sufficient. Sophisticated approaches
could
collect
histograms, etc. But I think we can leave that out for later.

3) SOURCE / SINK / BOTH
As you said, there are two things to consider here: permission
and
availability of a TableSource/TableSink.
I think that neither should be a reason to add a keyword at such
a
sensitive position.
However, I also see Timo's point that it would be good to know
up-front
how
a table can be used without trying to instantiate a
TableSource/Sink
for
a
query.
Maybe we can extend the TableFactory such that it provides
information
about which sources/sinks it can provide.

7. Table Update Mode
Something that we definitely need to consider is how tables are
ingested,
i.e., append, retract or upsert.
Especially, since upsert and retraction need a meta-data column
that
indicates whether an event is an insert (or upsert) or a delete
change.
This column needs to be identified somehow, most likely as part
of
the
input format. Ideally, this column should not be part of the
table
schema
(as it would be always true).
Emitting tables is not so much of an issue as the properties of
the
table
tell use what to do (append-only/update, unique key y/n).

Best,
Fabian


Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <
imjark@xxxxxxxxx
:
Hi Timo,

Thanks for your quickly feedback! Here are some of my thoughts:

Append, upserts, retract mode on sinks is also a very complex
problem. I
think append/upserts/retract is the ability of a table, user do
not
need to
specify a table is used for append or retraction or upsert. The
query
can
choose which mode the sink is. If an unbounded groupby is
inserted
into
an
append sink (the sink only implements/supports append), an
exception
can be
thrown. A more complex problem is, if we want to write
retractions/upserts
to Kafka, how to encode the change flag (add or retract/delete)
on
the
table? Maybe we should propose some protocal for the change flag
encoding,
but I don't have a clear idea about this right now.

3. Sources/Sinks: The source/sink tag is similar to the
append/upsert/retract problem. Besides source/sink, actully we
have
stream
source, stream sink, batch source, batch sink, and the stream
sink
also
include append/upsert/retract three modes. Should we put all the
tags
on
the CREATE TABLE? IMO, the table's ability is defined by the
table
itself,
user do not need to specify it. If it is only a readable table,
an
exception can be thrown when write to it. As the source/sink tag
can
be
omitted in CREATE TABLE, could we skip it and only support
CREATE
TABLE
in
the first version, and add it back in the future when we really
need
it? It
keeps API compatible and make sure the MVP is what we consider
clearly.
4a. How to mark a field as attribute?
The watermark definition includes two parts: use which field as
time
attribute and use what generate strategy.
When we want to mark `ts` field as attribute: WATERMARK FOR `ts`
AS
OFFSET
'5' SECOND.
If we have a POJO{id, user, ts} field named "pojo", we can mark
it
like
this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND

4b. timestamp write to Kafka message header
Even though we can define multiple time attribute on a table,
only
one
time
attribute can be actived/used in a query (in a stream). When we
enable
`writeTiemstamp`, the only attribute actived in the stream will
be
write to
Kafka message header. What I mean the timestmap in StreamRecord
is
the
time
attribute in the stream.

4c. Yes. We introduced the WATERMARK keyword similar to the
INDEX,
PRIMARY
KEY keywords.

@Timo, Do you have any other advice or questions on the
watermark
syntax ?
For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS
"OFFSET"
VS ...


Cheers,
Jark

On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.86xy@xxxxxxxxx>
wrote:
Hi Timo,
Thanks for your feedback, here's some thoughts of mine:

3. Sources/Sinks:
"Let's assume an interactive CLI session, people should be able
to
list
all
source table and sink tables to know upfront if they can use an
INSERT
INTO
here or not."
This requirement can be simply resolved by a document that list
all
supported source/sink/both connectors and the sql-client can
perform
a
quick check. It's only an implementation choice, not necessary
for
the
syntax.
For connector implementation, a connector may implement one or
some
or
all
of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can
derive
the
availability for any give query without the SOURCE/SINk
keywords
or
specific table properties in WITH clause.
Since there's still indeterminacy, shall we skip these two
keywords
for
the
MVP DDL? We can make further discussion after users' feedback.

6. Partitioning and keys
Agree with you that raise the priority of table constraint and
partitioned
table support for better connectivity to Hive and Kafka. I'll
add
partitioned table syntax(compatible to hive) into the DDL Draft
doc
later[1].

5. Schema declaration
"if users want to declare computed columns they have a "schema"
constraints
but without columns
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc") "

   From the point of my view, this ddl is invalid because the
primary
key
constraint already references two columns but types unseen.
And Xuefu pointed a important matching problem, so let's put
schema
derivation as a follow-up extension ?




Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月6日周四 下午6:05写道:

Hi everyone,

great to have such a lively discussion. My next batch of
feedback:
@Jark: We don't need to align the descriptor approach with
SQL.
I'm
open
for different approaches as long as we can serve a broad set
of
use
cases and systems. The descriptor approach was a first attempt
to
cover
all aspects and connector/format characteristics. Just another
example,
that is missing in the DDL design: How can a user decide if
append,
retraction, or upserts should be used to sink data into the
target
system? Do we want to define all these improtant properties in
the
big
WITH property map? If yes, we are already close to the
descriptor
approach. Regarding the "standard way", most DDL languages
have
very
custom syntax so there is not a real "standard".

3. Sources/Sinks: @Lin: If a table has both read/write access
it
can
be
created using a regular CREATE TABLE (omitting a specific
source/sink)
declaration. Regarding the transition from source/sink to
both,
yes
we
would need to update the a DDL and catalogs. But is this a
problem?
One
also needs to add new queries that use the tables. @Xuefu: It
is
not
only about security aspects. Especially for streaming use
cases,
not
every connector can be used as a source easily. For example, a
JDBC
sink
is easier than a JDBC source. Let's assume an interactive CLI
session,
people should be able to list all source table and sink tables
to
know
upfront if they can use an INSERT INTO here or not.

6. Partitioning and keys: @Lin: I would like to include this
in
the
design given that Hive integration and Kafka key support are
in
the
making/are on our roadmap for this release.

5. Schema declaration: @Lin: You are right it is not
conflicting.
I
just
wanted to raise the point because if users want to declare
computed
columns they have a "schema" constraints but without columns.
Are
we
ok
with a syntax like ...
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc") ?
@Xuefu: Yes, you are right that an external schema might not
excatly
match but this is true for both directions:
table schema "derives" format schema and format schema
"derives"
table
schema.

7. Hive compatibility: @Xuefu: I agree that Hive is popular
but
we
should not just adopt everything from Hive as there syntax is
very
batch-specific. We should come up with a superset of
historical
and
future requirements. Supporting Hive queries can be an
intermediate
layer on top of Flink's DDL.

4. Time attributes: @Lin: I'm fine with changing the
TimestampExtractor
interface as this is also important for better separation of
connector
and table module [1]. However, I'm wondering about watermark
generation.
4a. timestamps are in the schema twice:
@Jark: "existing field is Long/Timestamp, we can just use it
as
rowtime": yes, but we need to mark a field as such an
attribute.
How
does the syntax for marking look like? Also in case of
timestamps
that
are nested in the schema?

4b. how can we write out a timestamp into the message header?:
I agree to simply ignore computed columns when writing out.
This
is
like
'field-change: add' that I mentioned in the improvements
document.
@Jark: "then the timestmap in StreamRecord will be write to
Kafka
message header": Unfortunately, there is no timestamp in the
stream
record. Additionally, multiple time attributes can be in a
schema.
So
we
need a constraint that tells the sink which column to use
(possibly
computed as well)?

4c. separate all time attribute concerns into a special clause
next
to
the regular schema?
@Jark: I don't have a strong opinion on this. I just have the
feeling
that the "schema part" becomes quite messy because the actual
schema
with types and fields is accompanied by so much metadata about
timestamps, watermarks, keys,... and we would need to
introduce
a
new
WATERMARK keyword within a schema that was close to standard
up
to
this
point.

Thanks everyone,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-9461



Am 06.12.18 um 07:08 schrieb Jark Wu:
Hi Timo,

Thank you for the valuable feedbacks.

First of all, I think we don't need to align the SQL
functionality
to
Descriptor. Because SQL is a more standard API, we should be
as
cautious
as
possible to extend the SQL syntax. If something can be done
in
a
standard
way, we shouldn't introduce something new.

Here are some of my thoughts:

1. Scope: Agree.
2. Constraints: Agree.
4. Time attributes:
      4a. timestamps are in the schema twice.
       If an existing field is Long/Timestamp, we can just use
it
as
rowtime,
no twice defined. If it is not a Long/Timestamp, we use
computed
column
to
get an expected timestamp column to be rowtime, is this what
you
mean
defined twice?  But I don't think it is a problem, but an
advantages,
because it is easy to use, user do not need to consider
whether
to
"replace
the existing column" or "add a new column", he will not be
confused
what's
the real schema is, what's the index of rowtime in the
schema?
Regarding
to
the optimization, even if timestamps are in schema twice,
when
the
original
timestamp is never used in query, then the projection
pushdown
optimization
can cut this field as early as possible, which is exactly the
same
as
"replacing the existing column" in runtime.

       4b. how can we write out a timestamp into the message
header?
        That's a good point. I think computed column is just a
virtual
column
on table which is only relative to reading. If we want to
write
to
a
table
with computed column defined, we only need to provide the
columns
except
computed columns (see SQL Server [1]). The computed column is
ignored
in
the insert statement. Get back to the question, how can we
write
out
a
timestamp into the message header? IMO, we can provide a
configuration
to
support this, such as `kafka.writeTimestamp=true`, then the
timestmap
in
StreamRecord will be write to Kafka message header. What do
you
think?
        4c. separate all time attribute concerns into a
special
clause
next
to
the regular schema?
        Separating watermark into a special clause similar to
PARTITIONED
BY is
also a good idea. Conceptually, it's fine to put watermark in
schema
part
or out schema part. But if we want to support multiple
watermark
definition, maybe it would be better to put it in schema
part.
It
is
similar to Index Definition that we can define several
indexes
on a
table
in schema part.

        4d. How can people come up with a custom watermark
strategy?
        In most cases, the built-in strategy can works good.
If
we
need
a
custom one, we can use a scalar function which restrict to
only
return
a
nullable Long, and use it in SQL like: WATERMARK for rowtime
AS
watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined
scalar
function
accepts 3 parameters and return a nullable Long which can be
used
as
punctuated watermark assigner. Another choice is
implementing a
class
extending the

`org.apache.flink.table.sources.wmstrategies.WatermarkStrategy`
and
use
it
in SQL: WATERMARK for rowtime AS
'com.my.MyWatermarkStrategy'.
But
if
scalar function can cover the requirements here, I would
prefer
it
here,
because it keeps standard compliant. BTW, this feature is not
in
MVP,
we
can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the
schema
from
external storage or something schema file. Actually, we have
already
encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive
syntax
while
keeping SQL ANSI standard. This will make it more acceptable
and
reduce
the
learning cost for user.

[1]:

https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <
xuefu.z@xxxxxxxxxxxxxxx
wrote:
Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging
to
something
meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on
tables
that
belong to a security component. Unless the table is created
differently
based on source, sink, or both, it doesn't seem necessary to
use
these
keywords to enforce permissions.
5. It might be okay if schema declaration is always needed.
While
there
might be some duplication sometimes, it's not always true.
For
example,
external schema may not be exactly matching Flink schema.
For
instance,
data types. Even if so, perfect match is not required. For
instance,
the
external schema file may evolve while table schema in Flink
may
stay
unchanged. A responsible reader should be able to scan the
file
based
on
file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to
operate
on
Hive metadata and data, it's an add-on benefit if we can be
compatible
with
Hive syntax/semantics while following ANSI standard. At
least
we
should
be
as close as possible. Hive DDL can found at

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Thanks,
Xuefu




------------------------------------------------------------------
Sender:Lin Li <lincoln.86xy@xxxxxxxxx>
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev <dev@xxxxxxxxxxxxxxxx>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
      thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should
we
declare
it
using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further
question,
if a
TABLE
t1 firstly declared as read only (as a source table), then
for
some
new
requirements
t1 will change to a sink table,  in this case we need
updating
both
the
DDL
and catalogs.
Further more, let's think about the BATCH query, update one
table
in-place
can be a common case.
e.g.,
```
CREATE TABLE t1 (
      col1 varchar,
      col2 int,
      col3 varchar
      ...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
      (some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the
validation
purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an

`org.apache.flink.table.sources.tsextractors.TimestampExtractor`
for
custom
defined time attributes usage, but this expression based
class
is
more
friendly for table api not the SQL.
```
/**
      * Provides the an expression to extract the timestamp
for a
rowtime
attribute.
      */
abstract class TimestampExtractor extends
FieldComputer[Long]
with
Serializable {

      /** Timestamp extractors compute the timestamp as Long.
*/
      override def getReturnType: TypeInformation[Long] =
Types.LONG.asInstanceOf[TypeInformation[Long]]
}
```
BTW, I think both the Scalar function and the
TimestampExtractor
are
expressing computing logic, the TimestampExtractor has no
more
advantage in
SQL scenarios.


6. Partitioning and keys
Primary Key is included in Constraint part, and partitioned
table
support
can be another topic later.

5. Schema declaration
Agree with you that we can do better schema derivation for
user
convenience, but this is not conflict with the syntax.
Table properties can carry any useful informations both for
the
users
and
the framework, I like your `contract name` proposal,
e.g., `WITH (format.type = avro)`, the framework can
recognize
some
`contract name` like `format.type`, `connector.type` and
etc.
And also derive the table schema from an existing schema
file
can
be
handy
especially one with too many table columns.

Regards,
Lin


Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月5日周三 下午10:40写道:

Hi Jark and Shuyi,

thanks for pushing the DDL efforts forward. I agree that we
should
aim
to combine both Shuyi's design and your design.

Here are a couple of concerns that I think we should
address
in
the
design:
1. Scope: Let's focuses on a MVP DDL for CREATE TABLE
statements
first.
I think this topic has already enough potential for long
discussions
and
is very helpful for users. We can discuss CREATE VIEW and
CREATE
FUNCTION afterwards as they are not related to each other.

2. Constraints: I think we should consider things like
nullability,
VARCHAR length, and decimal scale and precision in the
future
as
they
allow for nice optimizations. However, since both the
translation
and
runtime operators do not support those features. I would
not
introduce
a
arbitrary default value but omit those parameters for now.
This
can
be
a
follow-up issue once the basic DDL has been merged.

3. Sources/Sinks: We had a discussion about CREATE TABLE vs
CREATE
[SOURCE|SINK|] TABLE before. In my opinion we should allow
for
these
explicit declaration because in most production scenarios,
teams
have
strict read/write access requirements. For example, a data
science
team
should only consume from a event Kafka topic but should not
accidently
write back to the single source of truth.

4. Time attributes: In general, I like your computed
columns
approach
because it makes defining a rowtime attributes transparent
and
simple.
However, there are downsides that we should discuss.
4a. Jarks current design means that timestamps are in the
schema
twice.
The design that is mentioned in [1] makes this more
flexible
as
it
either allows to replace an existing column or add a
computed
column.
4b. We need to consider the zoo of storage systems that is
out
there
right now. Take Kafka as an example, how can we write out a
timestamp
into the message header? We need to think of a reverse
operation
to a
computed column.
4c. Does defining a watermark really fit into the schema
part
of
a
table? Shouldn't we separate all time attribute concerns
into a
special
clause next to the regular schema, similar how PARTITIONED
BY
does
it
in
Hive?
4d. How can people come up with a custom watermark
strategy?
I
guess
this can not be implemented in a scalar function and would
require
some
new type of UDF?

6. Partitioning and keys: Another question that the DDL
design
should
answer is how do we express primary keys (for upserts),
partitioning
keys (for Hive, Kafka message keys). All part of the table
schema?
5. Schema declaration: I find it very annoying that we want
to
force
people to declare all columns and types again even though
this
is
usually already defined in some company-wide format. I know
that
catalog
support will greatly improve this. But if no catalog is
used,
people
need to manually define a schema with 50+ fields in a Flink
DDL.
What I
actually promoted having two ways of reading data:

1. Either the format derives its schema from the table
schema.
CREATE TABLE (col INT) WITH (format.type = avro)

2. Or the table schema can be omitted and the format schema
defines
the
table schema (+ time attributes).
CREATE TABLE WITH (format.type = avro, format.schema-file =
"/my/avrofile.avsc")

Please let me know what you think about each item. I will
try
to
incorporate your feedback in [1] this week.

Regards,
Timo

[1]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf
Am 05.12.18 um 13:01 schrieb Jark Wu:
Hi Shuyi,

It's exciting to see we can make such a great progress
here.
Regarding to the watermark:

Watermarks can be defined on any columns (including
computed-column)
in
table schema.
The computed column can be computed from existing columns
using
builtin
functions and *UserDefinedFunctions* (ScalarFunction).
So IMO, it can work out for almost all the scenarios not
only
common
scenarios.

I don't think using a `TimestampExtractor` to support
custom
timestamp
extractor in SQL is a good idea. Because
`TimestampExtractor`
is not a SQL standard function. If we support
`TimestampExtractor`
in
SQL,
do we need to support CREATE FUNCTION for
`TimestampExtractor`?
I think `ScalarFunction` can do the same thing with
`TimestampExtractor`
but more powerful and standard.

The core idea of the watermark definition syntax is that
the
schema
part
defines all the columns of the table, it is exactly what
the
query
sees.
The watermark part is something like a primary key
definition
or
constraint
on SQL Table, it has no side effect on the schema, only
defines
what
watermark strategy is and makes which field as the rowtime
attribute
field.
If the rowtime field is not in the existing fields, we can
use
computed
column
to generate it from other existing fields. The Descriptor
Pattern
API
[1]
is very useful when writing a Table API job, but is not
contradictory
to
the
Watermark DDL from my perspective.

[1]:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.

Best,
Jark

On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <
suez1224@xxxxxxxxx
wrote:
Hi Jark and Shaoxuan,

Thanks a lot for the summary. I think we are making great
progress
here.
Below are my thoughts.

*(1) watermark definition
IMO, it's better to keep it consistent with the rowtime
extractors
and
watermark strategies defined in


https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.
Using built-in functions seems to be too much for most of
the
common
scenarios.
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
Actually, I think we can put the source/sink type info
into
the
table
properties, so we can use CREATE TABLE.
(3) View DDL with properties
We can remove the view properties section now for the MVP
and
add
it
back
later if needed.
(4) Type Definition
I agree we can put the type length or precision into
future
versions.
As
for the grammar difference, currently, I am using the
grammar
in
Calcite
type DDL, but since we'll extend the parser in Flink, so
we
can
definitely
change if needed.

Shuyi

On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <
imjark@xxxxxxxxx>
wrote:
Hi Shaoxuan,

Thanks for pointing that out. Yes, the source/sink tag
on
create
table
is
the another major difference.

Summarize the main differences again:

*(1) watermark definition
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
(3) View DDL with properties
(4) Type Definition

Best,
Jark

On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <
wshaoxuan@xxxxxxxxx
wrote:
Hi Jark,
Thanks for the summary. Your plan for the 1st round
implementation
of
DDL
looks good to me.
Have we reached the agreement on simplifying/unifying
"create
[source/sink]
table" to "create table"? "Watermark definition" and
"create
table"
are
the
major obstacles on the way to merge two design
proposals
FMPOV.
@Shuyi,
It
would be great if you can spend time and respond to
these
two
parts
first.
Regards,
Shaoxuan


On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <
imjark@xxxxxxxxx>
wrote:
Hi Shuyi,

It seems that you have reviewed the DDL doc [1] that
Lin
and I
drafted.
This doc covers all the features running in Alibaba.
But some of features might be not needed in the first
version
of
Flink
SQL
DDL.

So my suggestion would be to focus on the MVP DDLs and
reach
agreement
ASAP
based on the DDL draft [1] and the DDL design [2]
Shuyi
proposed.
And we can discuss on the main differences one by one.

The following is the MVP DDLs should be included in
the
first
version
in
my
opinion (feedbacks are welcome):
(1) Table DDL:
         (1.1) Type definition
         (1.2) computed column definition
         (1.3) watermark definition
         (1.4) with properties
         (1.5) table constraint (primary key/unique)
         (1.6) column nullability (nice to have)
(2) View DDL
(3) Function DDL

The main differences from two DDL docs (sth maybe
missed,
welcome
to
point
out):
*(1.3) watermark*: this is the main and the most
important
difference,
it
would be great if @Timo Walther <twalthr@xxxxxxxxxx>
@Fabian
Hueske
<fhueske@xxxxxxxxx>  give some feedbacks.
      (1.1) Type definition:
           (a) Should VARCHAR carry a length, e.g.
VARCHAR(128)
?
                In most cases, the varchar length is
not
used
because
they
are
stored as String in Flink. But it can be used to
optimize
in
the
future
if
we know the column is a fixed length VARCHAR.
                So IMO, we can support VARCHAR with
length
in
the
future,
and
just VARCHAR in this version.
           (b) Should DECIMAL support custom scale and
precision,
e.g.
DECIMAL(12, 5)?
                If we clearly know the scale and
precision
of
the
Decimal,
we
can have some optimization on
serialization/deserialization.
IMO,
we
can
support just support DECIMAL in this version,
                which means DECIMAL(38, 18) as default.
And
support
custom
scale
and precision in the future.
      (2) View DDL: Do we need WITH properties in View
DDL
(proposed
in
doc[2])?
What are the properties on the view used for?


The features could be supported and discussed in the
future:
(1) period definition on table
(2) Type DDL
(3) Index DDL
(4) Library DDL
(5) Drop statement

[1] Flink DDL draft by Lin and Jark:


https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit#
[2] Flink SQL DDL design by Shuyi:


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit#
Cheers,
Jark

On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang <
wshaoxuan@xxxxxxxxx>
wrote:
Sure Shuyu,
What I hope is that we can reach an agreement on DDL
gramma
as
soon
as
possible. There are a few differences between your
proposal
and
ours.
Once
Lin and Jark propose our design, we can quickly
discuss
on
the
those
differences, and see how far away towards a unified
design.
WRT the external catalog, I think it is an orthogonal
topic,
we
can
design
it in parallel. I believe @Xuefu, @Bowen are already
working
on.
We
should/will definitely involve them to review the
final
design
of
DDL
implementation. I would suggest that we should give
it
a
higher
priority
on
the DDL implementation, as it is a crucial component
for
the
user
experience of SQL_CLI.

Regards,
Shaoxuan



On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen <
suez1224@xxxxxxxxx
wrote:
Thanks a lot, Shaoxuan, Jack and Lin. We should
definitely
collaborate
here, we have also our own DDL implementation
running
in
production
for
almost 2 years at Uber. With the joint experience
from
both
companies,
we
can definitely make the Flink SQL DDL better.

As @shaoxuan suggest, Jark can come up with a doc
that
talks
about
the
current DDL design in Alibaba, and we can discuss
and
merge
them
into
one,
make it as a FLIP, and plan the tasks for
implementation.
Also,
we
should
take into account the new external catalog effort in
the
design.
What
do
you guys think?

Shuyi

On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <
imjark@xxxxxxxxx
wrote:
Hi Shaoxuan,

I think summarizing it into a google doc is a good
idea.
We
will
prepare
it
in the next few days.

Thanks,
Jark

Shaoxuan Wang <wshaoxuan@xxxxxxxxx> 于2018年11月28日周三
下午9:17写道:
Hi Lin and Jark,
Thanks for sharing those details. Can you please
consider
summarizing
your
DDL design into a google doc.
We can still continue the discussions on Shuyi's
proposal.
But
having a
separate google doc will be easy for the DEV to
understand/comment/discuss
on your proposed DDL implementation.

Regards,
Shaoxuan


On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <
imjark@xxxxxxxxx
wrote:
Hi Shuyi,

Thanks for bringing up this discussion and the
awesome
work!
I
have
left
some comments in the doc.

I want to share something more about the
watermark
definition
learned
from
Alibaba.

        1.

        Table should be able to accept multiple
watermark
definition.
        Because a table may have more than one
rowtime
field.
For
example,
one
        rowtime field is from existing field but
missing
in
some
records,
another
        is the ingestion timestamp in Kafka but
not
very
accurate.
In
this
case,
        user may define two rowtime fields with
watermarks
in
the
Table
and
choose
        one in different situation.
        2.

        Watermark stragety always work with
rowtime
field
together.
Based on the two points metioned above, I think
we
should
combine
the
watermark strategy and rowtime field selection
(i.e.
which
existing
field
used to generate watermark) in one clause, so
that
we
can
define
multiple
watermarks in one Table.

Here I will share the watermark syntax used in
Alibaba
(simply
modified):
watermarkDefinition:
WATERMARK [watermarkName] FOR <rowtime_field> AS
wm_strategy
wm_strategy:
       BOUNDED WITH OFFSET 'string' timeUnit
|
       ASCENDING

The “WATERMARK” keyword starts a watermark
definition.
The
“FOR”
keyword
defines which existing field used to generate
watermark,
this
field
should
already exist in the schema (we can use
computed-column
to
derive
from
other fields). The “AS” keyword defines watermark
strategy,
such
as
BOUNDED
WITH OFFSET (covers almost all the requirements)
and
ASCENDING.
When the expected rowtime field does not exist in
the
schema,
we
can
use
computed-column syntax to derive it from other
existing
fields
using
built-in functions or user defined functions. So
the
rowtime/watermark
definition doesn’t need to care about
“field-change”
strategy
(replace/add/from-field). And the proctime field
definition
can
also
be
defined using computed-column. Such as pt as
PROCTIME()
which
defines a
proctime field named “pt” in the schema.

Looking forward to working with you guys!

Best,
Jark Wu


Lin Li <lincoln.86xy@xxxxxxxxx> 于2018年11月28日周三
下午6:33写道:
@Shuyi
Thanks for the proposal!  We have a simple DDL
implementation
(extends
Calcite's parser) which been running for almost
two
years
on
production
and
works well.
I think the most valued things we'd learned is
keeping
simplicity
and
standard compliance.
Here's the approximate grammar, FYI
CREATE TABLE

CREATE TABLE tableName(
             columnDefinition [,
columnDefinition]*
             [ computedColumnDefinition [,
computedColumnDefinition]*
]
             [ tableConstraint [,
tableConstraint]* ]
             [ tableIndex [, tableIndex]* ]
         [ PERIOD FOR SYSTEM_TIME ]
             [ WATERMARK watermarkName FOR
rowTimeColumn
AS
withOffset(rowTimeColumn, offset) ]     ) [
WITH (
tableOption
[
,
tableOption]* ) ] [ ; ]

columnDefinition ::=
             columnName dataType [ NOT NULL ]

dataType  ::=
             {
               [ VARCHAR ]
               | [ BOOLEAN ]
               | [ TINYINT ]
               | [ SMALLINT ]
               | [ INT ]
               | [ BIGINT ]
               | [ FLOAT ]
               | [ DECIMAL ]
               | [ DOUBLE ]
               | [ DATE ]
               | [ TIME ]
               | [ TIMESTAMP ]
               | [ VARBINARY ]
             }

computedColumnDefinition ::=
             columnName AS
computedColumnExpression
tableConstraint ::=
         { PRIMARY KEY | UNIQUE }
             (columnName [, columnName]* )

tableIndex ::=
             [ UNIQUE ] INDEX indexName
              (columnName [, columnName]* )

rowTimeColumn ::=
             columnName

tableOption ::=
             property=value
             offset ::=
             positive integer (unit: ms)

CREATE VIEW

CREATE VIEW viewName
       [
             ( columnName [, columnName]* )
       ]
             AS queryStatement;

CREATE FUNCTION

      CREATE FUNCTION functionName
       AS 'className';

      className ::=
             fully qualified name


Shuyi Chen <suez1224@xxxxxxxxx> 于2018年11月28日周三
上午3:28写道:
Thanks a lot, Timo and Xuefu. Yes, I think we
can
finalize
the
design
doc
first and start implementation w/o the unified
connector
API
ready
by
skipping some featue.

Xuefu, I like the idea of making Flink specific
properties
into
generic
key-value pairs, so that it will make
integration
with
Hive
DDL
(or
others,
e.g. Beam DDL) easier.

I'll run a final pass over the design doc and
finalize
the
design
in
the
next few days. And we can start creating tasks
and
collaborate
on
the
implementation. Thanks a lot for all the
comments
and
inputs.
Cheers!
Shuyi

On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
xuefu.z@xxxxxxxxxxxxxxx>
wrote:

Yeah! I agree with Timo that DDL can actually
proceed
w/o
being
blocked
by
connector API. We can leave the unknown out
while
defining
the
basic
syntax.
@Shuyi

As commented in the doc, I think we can
probably
stick
with
simple
syntax
with general properties, without extending the
syntax
too
much
that
it
mimics the descriptor API.

Part of our effort on Flink-Hive integration
is
also
to
make
DDL
syntax
compatible with Hive's. The one in the current
proposal
seems
making
our
effort more challenging.

We can help and collaborate. At this moment, I
think
we
can
finalize
on
the proposal and then we can divide the tasks
for
better
collaboration.
Please let me know if there are  any questions
or
suggestions.
Thanks,
Xuefu





------------------------------------------------------------------
Sender:Timo Walther <twalthr@xxxxxxxxxx>
Sent at:2018 Nov 27 (Tue) 16:21
Recipient:dev <dev@xxxxxxxxxxxxxxxx>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks for offering your help here, Xuefu. It
would
be
great
to
move
these efforts forward. I agree that the DDL is
somehow
releated
to
the
unified connector API design but we can also
start
with
the
basic
functionality now and evolve the DDL during
this
release
and
next
releases.
For example, we could identify the MVP DDL
syntax
that
skips
defining
key constraints and maybe even time
attributes.
This
DDL
could
be
used
for batch usecases, ETL, and materializing SQL
queries
(no
time
operations like windows).

The unified connector API is high on our
priority
list
for
the
1.8
release. I will try to update the document
until
mid
of
next
week.
Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:
Thanks a lot, Xuefu. I was busy for some
other
stuff
for
the
last 2
weeks,
but we are definitely interested in moving
this
forward.
I
think
once
the
unified connector API design [1] is done, we
can
finalize
the
DDL
design
as
well and start creating concrete subtasks to
collaborate
on
the
implementation with the community.

Shuyi

[1]

https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu
<
xuefu.z@xxxxxxxxxxxxxxx>
wrote:

Hi Shuyi,

I'm wondering if you folks still have the
bandwidth
working
on
this.
We have some dedicated resource and like to
move
this
forward.
We
can
collaborate.

Thanks,

Xuefu



------------------------------------------------------------------
发件人:wenlong.lwl<wenlong88.lwl@xxxxxxxxx>
日 期:2018年11月05日 11:15:35
收件人:<dev@xxxxxxxxxxxxxxxx>
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark
from
the
ddl,
because
it
is
not
necessary, the framework determine the table
referred
is a
source
or a
sink
according to the context of the query using
the
table.
it
will
be
more
convenient for use defining a table which
can
be
both
a
source
and
sink,
and more convenient for catalog to
persistent
and
manage
the
meta
infos.
2. how about just keeping one pure string
map
as
parameters
for
table,
like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic =
‘test-kafka-topic’,
connector.properties.startup-mode =
‘latest-offset’,
connector.properties.specific-offset =
‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a
string
map
properties,
defining
parameters by string-map can be the closest
way
to
mapping
how
user
use
the
parameters.
2. The table descriptor can be extended by
user,
like
what
is
done
in
Kafka
and Json, it means that the parameter keys
in
connector
or
format
can
be
different in different implementation, we
can
not
restrict
the
key
in
a
specified set, so we need a map in connector
scope
and a
map
in
connector.properties scope. why not just
give
user a
single
map,
let
them
put parameters in a format they like, which
is
also
the
simplest
way
to
implement DDL parser.
3. whether we can define a format clause or
not,
depends
on
the
implementation of the connector, using
different
clause
in
DDL
may
make
a
misunderstanding that we can combine the
connectors
with
arbitrary
formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik
Wosiński
<
wossyn@xxxxxxxxx
wrote:
+1, Thanks for the proposal.

I guess this is a long-awaited change. This
can
vastly
increase
the
functionalities of the SQL Client as it
will
be
possible
to
use
complex
extensions like for example those provided
by
Apache
Bahir[1].
Best Regards,
Dom.

[1]
https://github.com/apache/bahir-flink

sob., 3 lis 2018 o 17:17 Rong Rong <
walterddr@xxxxxxxxx>
napisał(a):
+1. Thanks for putting the proposal
together
Shuyi.
DDL has been brought up in a couple of
times
previously
[1,2].
Utilizing
DDL will definitely be a great extension
to
the
current
Flink
SQL
to
systematically support some of the
previously
brought
up
features
such
as
[3]. And it will also be beneficial to see
the
document
closely
aligned
with the previous discussion for unified
SQL
connector
API
[4].
I also left a few comments on the doc.
Looking
forward
to
the
alignment
with the other couple of efforts and
contributing
to
them!
Best,
Rong

[1]


http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
[2]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
[3]
https://issues.apache.org/jira/browse/FLINK-8003
[4]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3C6676cb66-6f31-23e1-eff5-2e9c19f88483@xxxxxxxxxx%3E
On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <
bowenli86@xxxxxxxxx
wrote:
Thanks Shuyi!

I left some comments there. I think the
design
of
SQL
DDL
and
Flink-Hive
integration/External catalog enhancements
will
work
closely
with
each
other. Hope we are well aligned on the
directions
of
the
two
designs,
and I
look forward to working with you guys on
both!
Bowen


On Thu, Nov 1, 2018 at 10:57 PM Shuyi
Chen
<
suez1224@xxxxxxxxx
wrote:
Hi everyone,

SQL DDL support has been a long-time ask
from
the
community.
Current
Flink
SQL support only DML (e.g. SELECT and
INSERT
statements).
In
its
current
form, Flink SQL users still need to
define/create
table
sources
and
sinks
programmatically in Java/Scala. Also, in
SQL
Client,
without
DDL
support,
the current implementation does not
allow
dynamical
creation
of
table,
type
or functions with SQL, this adds
friction
for
its
adoption.
I drafted a design doc [1] with a few
other
community
members
that
proposes
the design and implementation for adding
DDL
support
in
Flink.
The
initial
design considers DDL for table, view,
type,
library
and
function.
It
will
be great to get feedback on the design
from
the
community,
and
align
with
latest effort in unified SQL connector
API
[2]
and
Flink
Hive
integration
[3].

Any feedback is highly appreciated.

Thanks
Shuyi Chen

[1]


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
[2]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
[3]


https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
--
"So you have to trust that the dots will
somehow
connect
in
your
future."
--
"So you have to trust that the dots will
somehow
connect
in
your
future."
--
"So you have to trust that the dots will somehow
connect
in
your
future."
--
"So you have to trust that the dots will somehow connect
in
your
future."

--
"So you have to trust that the dots will somehow connect in your
future."

--
"So you have to trust that the dots will somehow connect in your future."