osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi all,

I think we are making good progress. Thanks for all the feedback so far.

3. Sources/Sinks:
It seems that I can not find supporters for explicit SOURCE/SINK declaration so I'm fine with not using those keywords. @Fabian: Maybe we don't haven have to change the TableFactory interface but just provide some helper functions in the TableFactoryService. This would solve the availability problem, but the permission problem would still not be solved. If you are fine with it, we could introduce a property instead?

5. Schema declaration:
@Lin: We should find an agreement on this as it requires changes to the TableFactory interface. We should minimize changes to this interface because it is user-facing. Especially, if format schema and table schema differ, the need for such a functionality is very important. Our goal is to connect to existing infrastructure. For example, if we are using Avro and the existing Avro format has enums but Flink SQL does not support enums, it would be helpful to let the Avro format derive a table schema. Otherwise your need to declare both schemas which leads to CREATE TABLE statements of 400 lines+.
I think the mentioned query:
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, format.schema-file = "/my/avrofile.avsc") is fine and should only be valid if the schema contains no non-computed columns.

7. Table Update Mode:
After thinking about it again, I agree. The mode of the sinks can be derived from the query and the existence of a PRIMARY KEY declaration. But Fabian raised a very good point. How do we deal with sources? Shall we introduce a new keywords similar to WATERMARKS such that a upsert/retract flag is not part of the visible schema?

4a. How to mark a field as attribute?
@Jark: Thanks for the explanation of the WATERMARK clause semantics. This is a nice way of marking existing fields. This sounds good to me.

4c) WATERMARK as constraint
I'm fine with leaving the WATERMARK clause in the schema definition.

4d) Custom watermark strategies:
I would already think about custom watermark strategies as the current descriptor design already supports this. ScalarFunction's don't work as a PeriodicWatermarkAssigner has different semantics. Why not simply entering the a full class name here as it is done in the current design?

4.b) Ingesting and writing timestamps to systems (like Kafka)
@Fabian: Yes, your suggestion sounds good to me. This behavior would be similar to our current `timestamps: from-source` design.

Once our discussion has found a conclusion, I would like to volunteer and summarize the outcome of this mailing thread. It nicely aligns with the update work on the connector improvements document (that I wanted to do anyway) and the ongoing external catalog discussion. Furthermore, I would also want to propose how to change existing interfaces by keeping the DDL, connector improvements, and external catalog support in mind. Would that be ok for you?

Thanks,
Timo



Am 07.12.18 um 14:48 schrieb Fabian Hueske:
Hi all,

Thanks for the discussion.
I'd like to share my point of view as well.

4) Event-Time Attributes and Watermarks:
4.a) I agree with Lin and Jark's proposal. Declaring a watermark on an
attribute declares it as an event-time attribute.
4.b) Ingesting and writing timestamps to systems (like Kafka). We could use
a special function like (ts AS SYSTEMROWTIME()). This function will
indicate that we read the timestamp directly from the system (and not the
data). We can also write the field back to the system when emitting the
table (violating the rule that computed fields are not emitted).
4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY
constraint and therefore keep it in the schema definition.
4d) For custom watermark strategies, a simple expressions or
ScalarFunctions won't be sufficient. Sophisticated approaches could collect
histograms, etc. But I think we can leave that out for later.

3) SOURCE / SINK / BOTH
As you said, there are two things to consider here: permission and
availability of a TableSource/TableSink.
I think that neither should be a reason to add a keyword at such a
sensitive position.
However, I also see Timo's point that it would be good to know up-front how
a table can be used without trying to instantiate a TableSource/Sink for a
query.
Maybe we can extend the TableFactory such that it provides information
about which sources/sinks it can provide.

7. Table Update Mode
Something that we definitely need to consider is how tables are ingested,
i.e., append, retract or upsert.
Especially, since upsert and retraction need a meta-data column that
indicates whether an event is an insert (or upsert) or a delete change.
This column needs to be identified somehow, most likely as part of the
input format. Ideally, this column should not be part of the table schema
(as it would be always true).
Emitting tables is not so much of an issue as the properties of the table
tell use what to do (append-only/update, unique key y/n).

Best,
Fabian


Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <imjark@xxxxxxxxx>:

Hi Timo,

Thanks for your quickly feedback! Here are some of my thoughts:

Append, upserts, retract mode on sinks is also a very complex problem. I
think append/upserts/retract is the ability of a table, user do not need to
specify a table is used for append or retraction or upsert. The query can
choose which mode the sink is. If an unbounded groupby is inserted into an
append sink (the sink only implements/supports append), an exception can be
thrown. A more complex problem is, if we want to write retractions/upserts
to Kafka, how to encode the change flag (add or retract/delete) on the
table? Maybe we should propose some protocal for the change flag encoding,
but I don't have a clear idea about this right now.

3. Sources/Sinks: The source/sink tag is similar to the
append/upsert/retract problem. Besides source/sink, actully we have stream
source, stream sink, batch source, batch sink, and the stream sink also
include append/upsert/retract three modes. Should we put all the tags on
the CREATE TABLE? IMO, the table's ability is defined by the table itself,
user do not need to specify it. If it is only a readable table, an
exception can be thrown when write to it. As the source/sink tag can be
omitted in CREATE TABLE, could we skip it and only support CREATE TABLE in
the first version, and add it back in the future when we really need it? It
keeps API compatible and make sure the MVP is what we consider clearly.

4a. How to mark a field as attribute?
The watermark definition includes two parts: use which field as time
attribute and use what generate strategy.
When we want to mark `ts` field as attribute: WATERMARK FOR `ts` AS OFFSET
'5' SECOND.
If we have a POJO{id, user, ts} field named "pojo", we can mark it like
this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND

4b. timestamp write to Kafka message header
Even though we can define multiple time attribute on a table, only one time
attribute can be actived/used in a query (in a stream). When we enable
`writeTiemstamp`, the only attribute actived in the stream will be write to
Kafka message header. What I mean the timestmap in StreamRecord is the time
attribute in the stream.

4c. Yes. We introduced the WATERMARK keyword similar to the INDEX, PRIMARY
KEY keywords.

@Timo, Do you have any other advice or questions on the watermark syntax ?
For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS "OFFSET"
VS ...


Cheers,
Jark

On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.86xy@xxxxxxxxx> wrote:

Hi Timo,
Thanks for your feedback, here's some thoughts of mine:

3. Sources/Sinks:
"Let's assume an interactive CLI session, people should be able to list
all
source table and sink tables to know upfront if they can use an INSERT
INTO
here or not."
This requirement can be simply resolved by a document that list all
supported source/sink/both connectors and the sql-client can perform a
quick check. It's only an implementation choice, not necessary for the
syntax.
For connector implementation, a connector may implement one or some or
all
of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can derive the
availability for any give query without the SOURCE/SINk keywords or
specific table properties in WITH clause.
Since there's still indeterminacy, shall we skip these two keywords for
the
MVP DDL? We can make further discussion after users' feedback.

6. Partitioning and keys
Agree with you that raise the priority of table constraint and
partitioned
table support for better connectivity to Hive and Kafka. I'll add
partitioned table syntax(compatible to hive) into the DDL Draft doc
later[1].

5. Schema declaration
"if users want to declare computed columns they have a "schema"
constraints
but without columns
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc") "

 From the point of my view, this ddl is invalid because the primary key
constraint already references two columns but types unseen.
And Xuefu pointed a important matching problem, so let's put schema
derivation as a follow-up extension ?




Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月6日周四 下午6:05写道:

Hi everyone,

great to have such a lively discussion. My next batch of feedback:

@Jark: We don't need to align the descriptor approach with SQL. I'm
open
for different approaches as long as we can serve a broad set of use
cases and systems. The descriptor approach was a first attempt to cover
all aspects and connector/format characteristics. Just another example,
that is missing in the DDL design: How can a user decide if append,
retraction, or upserts should be used to sink data into the target
system? Do we want to define all these improtant properties in the big
WITH property map? If yes, we are already close to the descriptor
approach. Regarding the "standard way", most DDL languages have very
custom syntax so there is not a real "standard".

3. Sources/Sinks: @Lin: If a table has both read/write access it can be
created using a regular CREATE TABLE (omitting a specific source/sink)
declaration. Regarding the transition from source/sink to both, yes we
would need to update the a DDL and catalogs. But is this a problem? One
also needs to add new queries that use the tables. @Xuefu: It is not
only about security aspects. Especially for streaming use cases, not
every connector can be used as a source easily. For example, a JDBC
sink
is easier than a JDBC source. Let's assume an interactive CLI session,
people should be able to list all source table and sink tables to know
upfront if they can use an INSERT INTO here or not.

6. Partitioning and keys: @Lin: I would like to include this in the
design given that Hive integration and Kafka key support are in the
making/are on our roadmap for this release.

5. Schema declaration: @Lin: You are right it is not conflicting. I
just
wanted to raise the point because if users want to declare computed
columns they have a "schema" constraints but without columns. Are we ok
with a syntax like ...
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc") ?
@Xuefu: Yes, you are right that an external schema might not excatly
match but this is true for both directions:
table schema "derives" format schema and format schema "derives" table
schema.

7. Hive compatibility: @Xuefu: I agree that Hive is popular but we
should not just adopt everything from Hive as there syntax is very
batch-specific. We should come up with a superset of historical and
future requirements. Supporting Hive queries can be an intermediate
layer on top of Flink's DDL.

4. Time attributes: @Lin: I'm fine with changing the TimestampExtractor
interface as this is also important for better separation of connector
and table module [1]. However, I'm wondering about watermark
generation.
4a. timestamps are in the schema twice:
@Jark: "existing field is Long/Timestamp, we can just use it as
rowtime": yes, but we need to mark a field as such an attribute. How
does the syntax for marking look like? Also in case of timestamps that
are nested in the schema?

4b. how can we write out a timestamp into the message header?:
I agree to simply ignore computed columns when writing out. This is
like
'field-change: add' that I mentioned in the improvements document.
@Jark: "then the timestmap in StreamRecord will be write to Kafka
message header": Unfortunately, there is no timestamp in the stream
record. Additionally, multiple time attributes can be in a schema. So
we
need a constraint that tells the sink which column to use (possibly
computed as well)?

4c. separate all time attribute concerns into a special clause next to
the regular schema?
@Jark: I don't have a strong opinion on this. I just have the feeling
that the "schema part" becomes quite messy because the actual schema
with types and fields is accompanied by so much metadata about
timestamps, watermarks, keys,... and we would need to introduce a new
WATERMARK keyword within a schema that was close to standard up to this
point.

Thanks everyone,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-9461



Am 06.12.18 um 07:08 schrieb Jark Wu:
Hi Timo,

Thank you for the valuable feedbacks.

First of all, I think we don't need to align the SQL functionality to
Descriptor. Because SQL is a more standard API, we should be as
cautious
as
possible to extend the SQL syntax. If something can be done in a
standard
way, we shouldn't introduce something new.

Here are some of my thoughts:

1. Scope: Agree.
2. Constraints: Agree.
4. Time attributes:
    4a. timestamps are in the schema twice.
     If an existing field is Long/Timestamp, we can just use it as
rowtime,
no twice defined. If it is not a Long/Timestamp, we use computed
column
to
get an expected timestamp column to be rowtime, is this what you mean
defined twice?  But I don't think it is a problem, but an advantages,
because it is easy to use, user do not need to consider whether to
"replace
the existing column" or "add a new column", he will not be confused
what's
the real schema is, what's the index of rowtime in the schema?
Regarding
to
the optimization, even if timestamps are in schema twice, when the
original
timestamp is never used in query, then the projection pushdown
optimization
can cut this field as early as possible, which is exactly the same as
"replacing the existing column" in runtime.

     4b. how can we write out a timestamp into the message header?
      That's a good point. I think computed column is just a virtual
column
on table which is only relative to reading. If we want to write to a
table
with computed column defined, we only need to provide the columns
except
computed columns (see SQL Server [1]). The computed column is ignored
in
the insert statement. Get back to the question, how can we write out
a
timestamp into the message header? IMO, we can provide a
configuration
to
support this, such as `kafka.writeTimestamp=true`, then the timestmap
in
StreamRecord will be write to Kafka message header. What do you
think?
      4c. separate all time attribute concerns into a special clause
next
to
the regular schema?
      Separating watermark into a special clause similar to
PARTITIONED
BY is
also a good idea. Conceptually, it's fine to put watermark in schema
part
or out schema part. But if we want to support multiple watermark
definition, maybe it would be better to put it in schema part. It is
similar to Index Definition that we can define several indexes on a
table
in schema part.

      4d. How can people come up with a custom watermark strategy?
      In most cases, the built-in strategy can works good. If we need
a
custom one, we can use a scalar function which restrict to only
return
a
nullable Long, and use it in SQL like: WATERMARK for rowtime AS
watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined scalar
function
accepts 3 parameters and return a nullable Long which can be used as
punctuated watermark assigner. Another choice is implementing a class
extending the
`org.apache.flink.table.sources.wmstrategies.WatermarkStrategy` and
use
it
in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'. But if
scalar function can cover the requirements here, I would prefer it
here,
because it keeps standard compliant. BTW, this feature is not in MVP,
we
can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the schema from
external storage or something schema file. Actually, we have already
encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive syntax
while
keeping SQL ANSI standard. This will make it more acceptable and
reduce
the
learning cost for user.

[1]:

https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <xuefu.z@xxxxxxxxxxxxxxx>
wrote:
Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging to
something
meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on tables
that
belong to a security component. Unless the table is created
differently
based on source, sink, or both, it doesn't seem necessary to use
these
keywords to enforce permissions.
5. It might be okay if schema declaration is always needed. While
there
might be some duplication sometimes, it's not always true. For
example,
external schema may not be exactly matching Flink schema. For
instance,
data types. Even if so, perfect match is not required. For instance,
the
external schema file may evolve while table schema in Flink may stay
unchanged. A responsible reader should be able to scan the file
based
on
file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to operate
on
Hive metadata and data, it's an add-on benefit if we can be
compatible
with
Hive syntax/semantics while following ANSI standard. At least we
should
be
as close as possible. Hive DDL can found at
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Thanks,
Xuefu



------------------------------------------------------------------
Sender:Lin Li <lincoln.86xy@xxxxxxxxx>
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev <dev@xxxxxxxxxxxxxxxx>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
    thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should we
declare
it
using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further
question,
if a
TABLE
t1 firstly declared as read only (as a source table), then for some
new
requirements
t1 will change to a sink table,  in this case we need updating both
the
DDL
and catalogs.
Further more, let's think about the BATCH query, update one table
in-place
can be a common case.
e.g.,
```
CREATE TABLE t1 (
    col1 varchar,
    col2 int,
    col3 varchar
    ...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
    (some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the
validation
purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an
`org.apache.flink.table.sources.tsextractors.TimestampExtractor` for
custom
defined time attributes usage, but this expression based class is
more
friendly for table api not the SQL.
```
/**
    * Provides the an expression to extract the timestamp for a
rowtime
attribute.
    */
abstract class TimestampExtractor extends FieldComputer[Long] with
Serializable {

    /** Timestamp extractors compute the timestamp as Long. */
    override def getReturnType: TypeInformation[Long] =
Types.LONG.asInstanceOf[TypeInformation[Long]]
}
```
BTW, I think both the Scalar function and the TimestampExtractor are
expressing computing logic, the TimestampExtractor has no more
advantage in
SQL scenarios.


6. Partitioning and keys
Primary Key is included in Constraint part, and partitioned table
support
can be another topic later.

5. Schema declaration
Agree with you that we can do better schema derivation for user
convenience, but this is not conflict with the syntax.
Table properties can carry any useful informations both for the
users
and
the framework, I like your `contract name` proposal,
e.g., `WITH (format.type = avro)`, the framework can recognize some
`contract name` like `format.type`, `connector.type` and etc.
And also derive the table schema from an existing schema file can be
handy
especially one with too many table columns.

Regards,
Lin


Timo Walther <twalthr@xxxxxxxxxx> 于2018年12月5日周三 下午10:40写道:

Hi Jark and Shuyi,

thanks for pushing the DDL efforts forward. I agree that we should
aim
to combine both Shuyi's design and your design.

Here are a couple of concerns that I think we should address in the
design:
1. Scope: Let's focuses on a MVP DDL for CREATE TABLE statements
first.
I think this topic has already enough potential for long
discussions
and
is very helpful for users. We can discuss CREATE VIEW and CREATE
FUNCTION afterwards as they are not related to each other.

2. Constraints: I think we should consider things like nullability,
VARCHAR length, and decimal scale and precision in the future as
they
allow for nice optimizations. However, since both the translation
and
runtime operators do not support those features. I would not
introduce
a
arbitrary default value but omit those parameters for now. This can
be
a
follow-up issue once the basic DDL has been merged.

3. Sources/Sinks: We had a discussion about CREATE TABLE vs CREATE
[SOURCE|SINK|] TABLE before. In my opinion we should allow for
these
explicit declaration because in most production scenarios, teams
have
strict read/write access requirements. For example, a data science
team
should only consume from a event Kafka topic but should not
accidently
write back to the single source of truth.

4. Time attributes: In general, I like your computed columns
approach
because it makes defining a rowtime attributes transparent and
simple.
However, there are downsides that we should discuss.
4a. Jarks current design means that timestamps are in the schema
twice.
The design that is mentioned in [1] makes this more flexible as it
either allows to replace an existing column or add a computed
column.
4b. We need to consider the zoo of storage systems that is out
there
right now. Take Kafka as an example, how can we write out a
timestamp
into the message header? We need to think of a reverse operation
to a
computed column.
4c. Does defining a watermark really fit into the schema part of a
table? Shouldn't we separate all time attribute concerns into a
special
clause next to the regular schema, similar how PARTITIONED BY does
it
in
Hive?
4d. How can people come up with a custom watermark strategy? I
guess
this can not be implemented in a scalar function and would require
some
new type of UDF?

6. Partitioning and keys: Another question that the DDL design
should
answer is how do we express primary keys (for upserts),
partitioning
keys (for Hive, Kafka message keys). All part of the table schema?

5. Schema declaration: I find it very annoying that we want to
force
people to declare all columns and types again even though this is
usually already defined in some company-wide format. I know that
catalog
support will greatly improve this. But if no catalog is used,
people
need to manually define a schema with 50+ fields in a Flink DDL.
What I
actually promoted having two ways of reading data:

1. Either the format derives its schema from the table schema.
CREATE TABLE (col INT) WITH (format.type = avro)

2. Or the table schema can be omitted and the format schema defines
the
table schema (+ time attributes).
CREATE TABLE WITH (format.type = avro, format.schema-file =
"/my/avrofile.avsc")

Please let me know what you think about each item. I will try to
incorporate your feedback in [1] this week.

Regards,
Timo

[1]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf
Am 05.12.18 um 13:01 schrieb Jark Wu:
Hi Shuyi,

It's exciting to see we can make such a great progress here.

Regarding to the watermark:

Watermarks can be defined on any columns (including
computed-column)
in
table schema.
The computed column can be computed from existing columns using
builtin
functions and *UserDefinedFunctions* (ScalarFunction).
So IMO, it can work out for almost all the scenarios not only
common
scenarios.

I don't think using a `TimestampExtractor` to support custom
timestamp
extractor in SQL is a good idea. Because `TimestampExtractor`
is not a SQL standard function. If we support `TimestampExtractor`
in
SQL,
do we need to support CREATE FUNCTION for `TimestampExtractor`?
I think `ScalarFunction` can do the same thing with
`TimestampExtractor`
but more powerful and standard.

The core idea of the watermark definition syntax is that the
schema
part
defines all the columns of the table, it is exactly what the query
sees.
The watermark part is something like a primary key definition or
constraint
on SQL Table, it has no side effect on the schema, only defines
what
watermark strategy is and makes which field as the rowtime
attribute
field.
If the rowtime field is not in the existing fields, we can use
computed
column
to generate it from other existing fields. The Descriptor Pattern
API
[1]
is very useful when writing a Table API job, but is not
contradictory
to
the
Watermark DDL from my perspective.

[1]:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.

Best,
Jark

On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <suez1224@xxxxxxxxx>
wrote:
Hi Jark and Shaoxuan,

Thanks a lot for the summary. I think we are making great
progress
here.
Below are my thoughts.

*(1) watermark definition
IMO, it's better to keep it consistent with the rowtime
extractors
and
watermark strategies defined in


https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.
Using built-in functions seems to be too much for most of the
common
scenarios.
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
Actually, I think we can put the source/sink type info into the
table
properties, so we can use CREATE TABLE.
(3) View DDL with properties
We can remove the view properties section now for the MVP and add
it
back
later if needed.
(4) Type Definition
I agree we can put the type length or precision into future
versions.
As
for the grammar difference, currently, I am using the grammar in
Calcite
type DDL, but since we'll extend the parser in Flink, so we can
definitely
change if needed.

Shuyi

On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <imjark@xxxxxxxxx>
wrote:
Hi Shaoxuan,

Thanks for pointing that out. Yes, the source/sink tag on create
table
is
the another major difference.

Summarize the main differences again:

*(1) watermark definition
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
(3) View DDL with properties
(4) Type Definition

Best,
Jark

On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <wshaoxuan@xxxxxxxxx
wrote:
Hi Jark,
Thanks for the summary. Your plan for the 1st round
implementation
of
DDL
looks good to me.
Have we reached the agreement on simplifying/unifying "create
[source/sink]
table" to "create table"? "Watermark definition" and "create
table"
are
the
major obstacles on the way to merge two design proposals FMPOV.
@Shuyi,
It
would be great if you can spend time and respond to these two
parts
first.
Regards,
Shaoxuan


On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <imjark@xxxxxxxxx>
wrote:
Hi Shuyi,

It seems that you have reviewed the DDL doc [1] that Lin and I
drafted.
This doc covers all the features running in Alibaba.
But some of features might be not needed in the first version
of
Flink
SQL
DDL.

So my suggestion would be to focus on the MVP DDLs and reach
agreement
ASAP
based on the DDL draft [1] and the DDL design [2] Shuyi
proposed.
And we can discuss on the main differences one by one.

The following is the MVP DDLs should be included in the first
version
in
my
opinion (feedbacks are welcome):
(1) Table DDL:
       (1.1) Type definition
       (1.2) computed column definition
       (1.3) watermark definition
       (1.4) with properties
       (1.5) table constraint (primary key/unique)
       (1.6) column nullability (nice to have)
(2) View DDL
(3) Function DDL

The main differences from two DDL docs (sth maybe missed,
welcome
to
point
out):
*(1.3) watermark*: this is the main and the most important
difference,
it
would be great if @Timo Walther <twalthr@xxxxxxxxxx>  @Fabian
Hueske
<fhueske@xxxxxxxxx>  give some feedbacks.
    (1.1) Type definition:
         (a) Should VARCHAR carry a length, e.g. VARCHAR(128) ?
              In most cases, the varchar length is not used
because
they
are
stored as String in Flink. But it can be used to optimize in
the
future
if
we know the column is a fixed length VARCHAR.
              So IMO, we can support VARCHAR with length in the
future,
and
just VARCHAR in this version.
         (b) Should DECIMAL support custom scale and precision,
e.g.
DECIMAL(12, 5)?
              If we clearly know the scale and precision of the
Decimal,
we
can have some optimization on serialization/deserialization.
IMO,
we
can
support just support DECIMAL in this version,
              which means DECIMAL(38, 18) as default. And
support
custom
scale
and precision in the future.
    (2) View DDL: Do we need WITH properties in View DDL
(proposed
in
doc[2])?
What are the properties on the view used for?


The features could be supported and discussed in the future:
(1) period definition on table
(2) Type DDL
(3) Index DDL
(4) Library DDL
(5) Drop statement

[1] Flink DDL draft by Lin and Jark:


https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit#
[2] Flink SQL DDL design by Shuyi:


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit#
Cheers,
Jark

On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang <
wshaoxuan@xxxxxxxxx>
wrote:
Sure Shuyu,
What I hope is that we can reach an agreement on DDL gramma
as
soon
as
possible. There are a few differences between your proposal
and
ours.
Once
Lin and Jark propose our design, we can quickly discuss on
the
those
differences, and see how far away towards a unified design.

WRT the external catalog, I think it is an orthogonal topic,
we
can
design
it in parallel. I believe @Xuefu, @Bowen are already working
on.
We
should/will definitely involve them to review the final
design
of
DDL
implementation. I would suggest that we should give it a
higher
priority
on
the DDL implementation, as it is a crucial component for the
user
experience of SQL_CLI.

Regards,
Shaoxuan



On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen <
suez1224@xxxxxxxxx
wrote:
Thanks a lot, Shaoxuan, Jack and Lin. We should definitely
collaborate
here, we have also our own DDL implementation running in
production
for
almost 2 years at Uber. With the joint experience from both
companies,
we
can definitely make the Flink SQL DDL better.

As @shaoxuan suggest, Jark can come up with a doc that talks
about
the
current DDL design in Alibaba, and we can discuss and merge
them
into
one,
make it as a FLIP, and plan the tasks for implementation.
Also,
we
should
take into account the new external catalog effort in the
design.
What
do
you guys think?

Shuyi

On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <imjark@xxxxxxxxx>
wrote:
Hi Shaoxuan,

I think summarizing it into a google doc is a good idea. We
will
prepare
it
in the next few days.

Thanks,
Jark

Shaoxuan Wang <wshaoxuan@xxxxxxxxx> 于2018年11月28日周三
下午9:17写道:
Hi Lin and Jark,
Thanks for sharing those details. Can you please consider
summarizing
your
DDL design into a google doc.
We can still continue the discussions on Shuyi's proposal.
But
having a
separate google doc will be easy for the DEV to
understand/comment/discuss
on your proposed DDL implementation.

Regards,
Shaoxuan


On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <imjark@xxxxxxxxx
wrote:
Hi Shuyi,

Thanks for bringing up this discussion and the awesome
work!
I
have
left
some comments in the doc.

I want to share something more about the watermark
definition
learned
from
Alibaba.

      1.

      Table should be able to accept multiple watermark
definition.
      Because a table may have more than one rowtime
field.
For
example,
one
      rowtime field is from existing field but missing in
some
records,
another
      is the ingestion timestamp in Kafka but not very
accurate.
In
this
case,
      user may define two rowtime fields with watermarks
in
the
Table
and
choose
      one in different situation.
      2.

      Watermark stragety always work with rowtime field
together.
Based on the two points metioned above, I think we should
combine
the
watermark strategy and rowtime field selection (i.e.
which
existing
field
used to generate watermark) in one clause, so that we can
define
multiple
watermarks in one Table.

Here I will share the watermark syntax used in Alibaba
(simply
modified):
watermarkDefinition:
WATERMARK [watermarkName] FOR <rowtime_field> AS
wm_strategy
wm_strategy:
     BOUNDED WITH OFFSET 'string' timeUnit
|
     ASCENDING

The “WATERMARK” keyword starts a watermark definition.
The
“FOR”
keyword
defines which existing field used to generate watermark,
this
field
should
already exist in the schema (we can use computed-column
to
derive
from
other fields). The “AS” keyword defines watermark
strategy,
such
as
BOUNDED
WITH OFFSET (covers almost all the requirements) and
ASCENDING.
When the expected rowtime field does not exist in the
schema,
we
can
use
computed-column syntax to derive it from other existing
fields
using
built-in functions or user defined functions. So the
rowtime/watermark
definition doesn’t need to care about “field-change”
strategy
(replace/add/from-field). And the proctime field
definition
can
also
be
defined using computed-column. Such as pt as PROCTIME()
which
defines a
proctime field named “pt” in the schema.

Looking forward to working with you guys!

Best,
Jark Wu


Lin Li <lincoln.86xy@xxxxxxxxx> 于2018年11月28日周三 下午6:33写道:

@Shuyi
Thanks for the proposal!  We have a simple DDL
implementation
(extends
Calcite's parser) which been running for almost two
years
on
production
and
works well.
I think the most valued things we'd learned is keeping
simplicity
and
standard compliance.
Here's the approximate grammar, FYI
CREATE TABLE

CREATE TABLE tableName(
           columnDefinition [, columnDefinition]*
           [ computedColumnDefinition [,
computedColumnDefinition]*
]
           [ tableConstraint [, tableConstraint]* ]
           [ tableIndex [, tableIndex]* ]
       [ PERIOD FOR SYSTEM_TIME ]
           [ WATERMARK watermarkName FOR rowTimeColumn AS
withOffset(rowTimeColumn, offset) ]     ) [ WITH (
tableOption
[
,
tableOption]* ) ] [ ; ]

columnDefinition ::=
           columnName dataType [ NOT NULL ]

dataType  ::=
           {
             [ VARCHAR ]
             | [ BOOLEAN ]
             | [ TINYINT ]
             | [ SMALLINT ]
             | [ INT ]
             | [ BIGINT ]
             | [ FLOAT ]
             | [ DECIMAL ]
             | [ DOUBLE ]
             | [ DATE ]
             | [ TIME ]
             | [ TIMESTAMP ]
             | [ VARBINARY ]
           }

computedColumnDefinition ::=
           columnName AS computedColumnExpression

tableConstraint ::=
       { PRIMARY KEY | UNIQUE }
           (columnName [, columnName]* )

tableIndex ::=
           [ UNIQUE ] INDEX indexName
            (columnName [, columnName]* )

rowTimeColumn ::=
           columnName

tableOption ::=
           property=value
           offset ::=
           positive integer (unit: ms)

CREATE VIEW

CREATE VIEW viewName
     [
           ( columnName [, columnName]* )
     ]
           AS queryStatement;

CREATE FUNCTION

    CREATE FUNCTION functionName
     AS 'className';

    className ::=
           fully qualified name


Shuyi Chen <suez1224@xxxxxxxxx> 于2018年11月28日周三
上午3:28写道:
Thanks a lot, Timo and Xuefu. Yes, I think we can
finalize
the
design
doc
first and start implementation w/o the unified
connector
API
ready
by
skipping some featue.

Xuefu, I like the idea of making Flink specific
properties
into
generic
key-value pairs, so that it will make integration with
Hive
DDL
(or
others,
e.g. Beam DDL) easier.

I'll run a final pass over the design doc and finalize
the
design
in
the
next few days. And we can start creating tasks and
collaborate
on
the
implementation. Thanks a lot for all the comments and
inputs.
Cheers!
Shuyi

On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
xuefu.z@xxxxxxxxxxxxxxx>
wrote:

Yeah! I agree with Timo that DDL can actually proceed
w/o
being
blocked
by
connector API. We can leave the unknown out while
defining
the
basic
syntax.
@Shuyi

As commented in the doc, I think we can probably
stick
with
simple
syntax
with general properties, without extending the syntax
too
much
that
it
mimics the descriptor API.

Part of our effort on Flink-Hive integration is also
to
make
DDL
syntax
compatible with Hive's. The one in the current
proposal
seems
making
our
effort more challenging.

We can help and collaborate. At this moment, I think
we
can
finalize
on
the proposal and then we can divide the tasks for
better
collaboration.
Please let me know if there are  any questions or
suggestions.
Thanks,
Xuefu





------------------------------------------------------------------
Sender:Timo Walther <twalthr@xxxxxxxxxx>
Sent at:2018 Nov 27 (Tue) 16:21
Recipient:dev <dev@xxxxxxxxxxxxxxxx>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks for offering your help here, Xuefu. It would
be
great
to
move
these efforts forward. I agree that the DDL is
somehow
releated
to
the
unified connector API design but we can also start
with
the
basic
functionality now and evolve the DDL during this
release
and
next
releases.
For example, we could identify the MVP DDL syntax
that
skips
defining
key constraints and maybe even time attributes. This
DDL
could
be
used
for batch usecases, ETL, and materializing SQL
queries
(no
time
operations like windows).

The unified connector API is high on our priority
list
for
the
1.8
release. I will try to update the document until mid
of
next
week.
Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:
Thanks a lot, Xuefu. I was busy for some other
stuff
for
the
last 2
weeks,
but we are definitely interested in moving this
forward.
I
think
once
the
unified connector API design [1] is done, we can
finalize
the
DDL
design
as
well and start creating concrete subtasks to
collaborate
on
the
implementation with the community.

Shuyi

[1]

https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
xuefu.z@xxxxxxxxxxxxxxx>
wrote:

Hi Shuyi,

I'm wondering if you folks still have the
bandwidth
working
on
this.
We have some dedicated resource and like to move
this
forward.
We
can
collaborate.

Thanks,

Xuefu



------------------------------------------------------------------
发件人:wenlong.lwl<wenlong88.lwl@xxxxxxxxx>
日 期:2018年11月05日 11:15:35
收件人:<dev@xxxxxxxxxxxxxxxx>
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from the
ddl,
because
it
is
not
necessary, the framework determine the table
referred
is a
source
or a
sink
according to the context of the query using the
table.
it
will
be
more
convenient for use defining a table which can be
both
a
source
and
sink,
and more convenient for catalog to persistent and
manage
the
meta
infos.
2. how about just keeping one pure string map as
parameters
for
table,
like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode =
‘latest-offset’,
connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string map
properties,
defining
parameters by string-map can be the closest way to
mapping
how
user
use
the
parameters.
2. The table descriptor can be extended by user,
like
what
is
done
in
Kafka
and Json, it means that the parameter keys in
connector
or
format
can
be
different in different implementation, we can not
restrict
the
key
in
a
specified set, so we need a map in connector scope
and a
map
in
connector.properties scope. why not just give
user a
single
map,
let
them
put parameters in a format they like, which is
also
the
simplest
way
to
implement DDL parser.
3. whether we can define a format clause or not,
depends
on
the
implementation of the connector, using different
clause
in
DDL
may
make
a
misunderstanding that we can combine the
connectors
with
arbitrary
formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński <
wossyn@xxxxxxxxx
wrote:
+1, Thanks for the proposal.

I guess this is a long-awaited change. This can
vastly
increase
the
functionalities of the SQL Client as it will be
possible
to
use
complex
extensions like for example those provided by
Apache
Bahir[1].
Best Regards,
Dom.

[1]
https://github.com/apache/bahir-flink

sob., 3 lis 2018 o 17:17 Rong Rong <
walterddr@xxxxxxxxx>
napisał(a):
+1. Thanks for putting the proposal together
Shuyi.
DDL has been brought up in a couple of times
previously
[1,2].
Utilizing
DDL will definitely be a great extension to the
current
Flink
SQL
to
systematically support some of the previously
brought
up
features
such
as
[3]. And it will also be beneficial to see the
document
closely
aligned
with the previous discussion for unified SQL
connector
API
[4].
I also left a few comments on the doc. Looking
forward
to
the
alignment
with the other couple of efforts and
contributing
to
them!
Best,
Rong

[1]


http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
[2]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
[3]
https://issues.apache.org/jira/browse/FLINK-8003
[4]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3C6676cb66-6f31-23e1-eff5-2e9c19f88483@xxxxxxxxxx%3E
On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <
bowenli86@xxxxxxxxx
wrote:
Thanks Shuyi!

I left some comments there. I think the design
of
SQL
DDL
and
Flink-Hive
integration/External catalog enhancements will
work
closely
with
each
other. Hope we are well aligned on the
directions
of
the
two
designs,
and I
look forward to working with you guys on both!

Bowen


On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen <
suez1224@xxxxxxxxx
wrote:
Hi everyone,

SQL DDL support has been a long-time ask from
the
community.
Current
Flink
SQL support only DML (e.g. SELECT and INSERT
statements).
In
its
current
form, Flink SQL users still need to
define/create
table
sources
and
sinks
programmatically in Java/Scala. Also, in SQL
Client,
without
DDL
support,
the current implementation does not allow
dynamical
creation
of
table,
type
or functions with SQL, this adds friction for
its
adoption.
I drafted a design doc [1] with a few other
community
members
that
proposes
the design and implementation for adding DDL
support
in
Flink.
The
initial
design considers DDL for table, view, type,
library
and
function.
It
will
be great to get feedback on the design from
the
community,
and
align
with
latest effort in unified SQL connector API [2]
and
Flink
Hive
integration
[3].

Any feedback is highly appreciated.

Thanks
Shuyi Chen

[1]


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
[2]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
[3]


https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
--
"So you have to trust that the dots will
somehow
connect
in
your
future."
--
"So you have to trust that the dots will somehow
connect
in
your
future."
--
"So you have to trust that the dots will somehow connect in
your
future."
--
"So you have to trust that the dots will somehow connect in your
future."