osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Flink SQL DDL Design


Hi Jark and Shuyi,

thanks for pushing the DDL efforts forward. I agree that we should aim to combine both Shuyi's design and your design.

Here are a couple of concerns that I think we should address in the design:

1. Scope: Let's focuses on a MVP DDL for CREATE TABLE statements first. I think this topic has already enough potential for long discussions and is very helpful for users. We can discuss CREATE VIEW and CREATE FUNCTION afterwards as they are not related to each other.

2. Constraints: I think we should consider things like nullability, VARCHAR length, and decimal scale and precision in the future as they allow for nice optimizations. However, since both the translation and runtime operators do not support those features. I would not introduce a arbitrary default value but omit those parameters for now. This can be a follow-up issue once the basic DDL has been merged.

3. Sources/Sinks: We had a discussion about CREATE TABLE vs CREATE [SOURCE|SINK|] TABLE before. In my opinion we should allow for these explicit declaration because in most production scenarios, teams have strict read/write access requirements. For example, a data science team should only consume from a event Kafka topic but should not accidently write back to the single source of truth.

4. Time attributes: In general, I like your computed columns approach because it makes defining a rowtime attributes transparent and simple. However, there are downsides that we should discuss. 4a. Jarks current design means that timestamps are in the schema twice. The design that is mentioned in [1] makes this more flexible as it either allows to replace an existing column or add a computed column. 4b. We need to consider the zoo of storage systems that is out there right now. Take Kafka as an example, how can we write out a timestamp into the message header? We need to think of a reverse operation to a computed column. 4c. Does defining a watermark really fit into the schema part of a table? Shouldn't we separate all time attribute concerns into a special clause next to the regular schema, similar how PARTITIONED BY does it in Hive? 4d. How can people come up with a custom watermark strategy? I guess this can not be implemented in a scalar function and would require some new type of UDF?

6. Partitioning and keys: Another question that the DDL design should answer is how do we express primary keys (for upserts), partitioning keys (for Hive, Kafka message keys). All part of the table schema?

5. Schema declaration: I find it very annoying that we want to force people to declare all columns and types again even though this is usually already defined in some company-wide format. I know that catalog support will greatly improve this. But if no catalog is used, people need to manually define a schema with 50+ fields in a Flink DDL. What I actually promoted having two ways of reading data:

1. Either the format derives its schema from the table schema.
CREATE TABLE (col INT) WITH (format.type = avro)

2. Or the table schema can be omitted and the format schema defines the table schema (+ time attributes). CREATE TABLE WITH (format.type = avro, format.schema-file = "/my/avrofile.avsc")

Please let me know what you think about each item. I will try to incorporate your feedback in [1] this week.

Regards,
Timo

[1] https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf


Am 05.12.18 um 13:01 schrieb Jark Wu:
Hi Shuyi,

It's exciting to see we can make such a great progress here.

Regarding to the watermark:

Watermarks can be defined on any columns (including computed-column) in
table schema.
The computed column can be computed from existing columns using builtin
functions and *UserDefinedFunctions* (ScalarFunction).
So IMO, it can work out for almost all the scenarios not only common
scenarios.

I don't think using a `TimestampExtractor` to support custom timestamp
extractor in SQL is a good idea. Because `TimestampExtractor`
is not a SQL standard function. If we support `TimestampExtractor` in SQL,
do we need to support CREATE FUNCTION for `TimestampExtractor`?
I think `ScalarFunction` can do the same thing with `TimestampExtractor`
but more powerful and standard.

The core idea of the watermark definition syntax is that the schema part
defines all the columns of the table, it is exactly what the query sees.
The watermark part is something like a primary key definition or constraint
on SQL Table, it has no side effect on the schema, only defines what
watermark strategy is and makes which field as the rowtime attribute field.
If the rowtime field is not in the existing fields, we can use computed
column
to generate it from other existing fields. The Descriptor Pattern API [1]
is very useful when writing a Table API job, but is not contradictory to
the
Watermark DDL from my perspective.

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.

Best,
Jark

On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <suez1224@xxxxxxxxx> wrote:

Hi Jark and Shaoxuan,

Thanks a lot for the summary. I think we are making great progress here.
Below are my thoughts.

*(1) watermark definition
IMO, it's better to keep it consistent with the rowtime extractors and
watermark strategies defined in

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.
Using built-in functions seems to be too much for most of the common
scenarios.
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
Actually, I think we can put the source/sink type info into the table
properties, so we can use CREATE TABLE.
(3) View DDL with properties
We can remove the view properties section now for the MVP and add it back
later if needed.
(4) Type Definition
I agree we can put the type length or precision into future versions. As
for the grammar difference, currently, I am using the grammar in Calcite
type DDL, but since we'll extend the parser in Flink, so we can definitely
change if needed.

Shuyi

On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <imjark@xxxxxxxxx> wrote:

Hi Shaoxuan,

Thanks for pointing that out. Yes, the source/sink tag on create table is
the another major difference.

Summarize the main differences again:

*(1) watermark definition
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
(3) View DDL with properties
(4) Type Definition

Best,
Jark

On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <wshaoxuan@xxxxxxxxx> wrote:

Hi Jark,
Thanks for the summary. Your plan for the 1st round implementation of
DDL
looks good to me.
Have we reached the agreement on simplifying/unifying "create
[source/sink]
table" to "create table"? "Watermark definition" and "create table" are
the
major obstacles on the way to merge two design proposals FMPOV. @Shuyi,
It
would be great if you can spend time and respond to these two parts
first.
Regards,
Shaoxuan


On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <imjark@xxxxxxxxx> wrote:

Hi Shuyi,

It seems that you have reviewed the DDL doc [1] that Lin and I
drafted.
This doc covers all the features running in Alibaba.
But some of features might be not needed in the first version of
Flink
SQL
DDL.

So my suggestion would be to focus on the MVP DDLs and reach
agreement
ASAP
based on the DDL draft [1] and the DDL design [2] Shuyi proposed.
And we can discuss on the main differences one by one.

The following is the MVP DDLs should be included in the first version
in
my
opinion (feedbacks are welcome):
(1) Table DDL:
     (1.1) Type definition
     (1.2) computed column definition
     (1.3) watermark definition
     (1.4) with properties
     (1.5) table constraint (primary key/unique)
     (1.6) column nullability (nice to have)
(2) View DDL
(3) Function DDL

The main differences from two DDL docs (sth maybe missed, welcome to
point
out):
*(1.3) watermark*: this is the main and the most important
difference,
it
would be great if @Timo Walther <twalthr@xxxxxxxxxx>  @Fabian Hueske
<fhueske@xxxxxxxxx>  give some feedbacks.
  (1.1) Type definition:
       (a) Should VARCHAR carry a length, e.g. VARCHAR(128) ?
            In most cases, the varchar length is not used because they
are
stored as String in Flink. But it can be used to optimize in the
future
if
we know the column is a fixed length VARCHAR.
            So IMO, we can support VARCHAR with length in the future,
and
just VARCHAR in this version.
       (b) Should DECIMAL support custom scale and precision, e.g.
DECIMAL(12, 5)?
            If we clearly know the scale and precision of the Decimal,
we
can have some optimization on serialization/deserialization. IMO, we
can
support just support DECIMAL in this version,
            which means DECIMAL(38, 18) as default. And support custom
scale
and precision in the future.
  (2) View DDL: Do we need WITH properties in View DDL (proposed in
doc[2])?
What are the properties on the view used for?


The features could be supported and discussed in the future:
(1) period definition on table
(2) Type DDL
(3) Index DDL
(4) Library DDL
(5) Drop statement

[1] Flink DDL draft by Lin and Jark:


https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit#
[2] Flink SQL DDL design by Shuyi:


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit#
Cheers,
Jark

On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang <wshaoxuan@xxxxxxxxx>
wrote:
Sure Shuyu,
What I hope is that we can reach an agreement on DDL gramma as soon
as
possible. There are a few differences between your proposal and
ours.
Once
Lin and Jark propose our design, we can quickly discuss on the
those
differences, and see how far away towards a unified design.

WRT the external catalog, I think it is an orthogonal topic, we can
design
it in parallel. I believe @Xuefu, @Bowen are already working on. We
should/will definitely involve them to review the final design of
DDL
implementation. I would suggest that we should give it a higher
priority
on
the DDL implementation, as it is a crucial component for the user
experience of SQL_CLI.

Regards,
Shaoxuan



On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen <suez1224@xxxxxxxxx>
wrote:
Thanks a lot, Shaoxuan, Jack and Lin. We should definitely
collaborate
here, we have also our own DDL implementation running in
production
for
almost 2 years at Uber. With the joint experience from both
companies,
we
can definitely make the Flink SQL DDL better.

As @shaoxuan suggest, Jark can come up with a doc that talks
about
the
current DDL design in Alibaba, and we can discuss and merge them
into
one,
make it as a FLIP, and plan the tasks for implementation. Also,
we
should
take into account the new external catalog effort in the design.
What
do
you guys think?

Shuyi

On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <imjark@xxxxxxxxx>
wrote:
Hi Shaoxuan,

I think summarizing it into a google doc is a good idea. We
will
prepare
it
in the next few days.

Thanks,
Jark

Shaoxuan Wang <wshaoxuan@xxxxxxxxx> 于2018年11月28日周三 下午9:17写道:

Hi Lin and Jark,
Thanks for sharing those details. Can you please consider
summarizing
your
DDL design into a google doc.
We can still continue the discussions on Shuyi's proposal.
But
having a
separate google doc will be easy for the DEV to
understand/comment/discuss
on your proposed DDL implementation.

Regards,
Shaoxuan


On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <imjark@xxxxxxxxx>
wrote:
Hi Shuyi,

Thanks for bringing up this discussion and the awesome
work!
I
have
left
some comments in the doc.

I want to share something more about the watermark
definition
learned
from
Alibaba.

    1.

    Table should be able to accept multiple watermark
definition.
    Because a table may have more than one rowtime field.
For
example,
one
    rowtime field is from existing field but missing in some
records,
another
    is the ingestion timestamp in Kafka but not very
accurate.
In
this
case,
    user may define two rowtime fields with watermarks in
the
Table
and
choose
    one in different situation.
    2.

    Watermark stragety always work with rowtime field
together.
Based on the two points metioned above, I think we should
combine
the
watermark strategy and rowtime field selection (i.e. which
existing
field
used to generate watermark) in one clause, so that we can
define
multiple
watermarks in one Table.

Here I will share the watermark syntax used in Alibaba
(simply
modified):
watermarkDefinition:
WATERMARK [watermarkName] FOR <rowtime_field> AS
wm_strategy
wm_strategy:
   BOUNDED WITH OFFSET 'string' timeUnit
|
   ASCENDING

The “WATERMARK” keyword starts a watermark definition. The
“FOR”
keyword
defines which existing field used to generate watermark,
this
field
should
already exist in the schema (we can use computed-column to
derive
from
other fields). The “AS” keyword defines watermark strategy,
such
as
BOUNDED
WITH OFFSET (covers almost all the requirements) and
ASCENDING.
When the expected rowtime field does not exist in the
schema,
we
can
use
computed-column syntax to derive it from other existing
fields
using
built-in functions or user defined functions. So the
rowtime/watermark
definition doesn’t need to care about “field-change”
strategy
(replace/add/from-field). And the proctime field definition
can
also
be
defined using computed-column. Such as pt as PROCTIME()
which
defines a
proctime field named “pt” in the schema.

Looking forward to working with you guys!

Best,
Jark Wu


Lin Li <lincoln.86xy@xxxxxxxxx> 于2018年11月28日周三 下午6:33写道:

@Shuyi
Thanks for the proposal!  We have a simple DDL
implementation
(extends
Calcite's parser) which been running for almost two years
on
production
and
works well.
I think the most valued things we'd learned is keeping
simplicity
and
standard compliance.
Here's the approximate grammar, FYI
CREATE TABLE

CREATE TABLE tableName(
         columnDefinition [, columnDefinition]*
         [ computedColumnDefinition [,
computedColumnDefinition]*
]
         [ tableConstraint [, tableConstraint]* ]
         [ tableIndex [, tableIndex]* ]
     [ PERIOD FOR SYSTEM_TIME ]
         [ WATERMARK watermarkName FOR rowTimeColumn AS
withOffset(rowTimeColumn, offset) ]     ) [ WITH (
tableOption
[
,
tableOption]* ) ] [ ; ]

columnDefinition ::=
         columnName dataType [ NOT NULL ]

dataType  ::=
         {
           [ VARCHAR ]
           | [ BOOLEAN ]
           | [ TINYINT ]
           | [ SMALLINT ]
           | [ INT ]
           | [ BIGINT ]
           | [ FLOAT ]
           | [ DECIMAL ]
           | [ DOUBLE ]
           | [ DATE ]
           | [ TIME ]
           | [ TIMESTAMP ]
           | [ VARBINARY ]
         }

computedColumnDefinition ::=
         columnName AS computedColumnExpression

tableConstraint ::=
     { PRIMARY KEY | UNIQUE }
         (columnName [, columnName]* )

tableIndex ::=
         [ UNIQUE ] INDEX indexName
          (columnName [, columnName]* )

rowTimeColumn ::=
         columnName

tableOption ::=
         property=value
         offset ::=
         positive integer (unit: ms)

CREATE VIEW

CREATE VIEW viewName
   [
         ( columnName [, columnName]* )
   ]
         AS queryStatement;

CREATE FUNCTION

  CREATE FUNCTION functionName
   AS 'className';

  className ::=
         fully qualified name


Shuyi Chen <suez1224@xxxxxxxxx> 于2018年11月28日周三 上午3:28写道:

Thanks a lot, Timo and Xuefu. Yes, I think we can
finalize
the
design
doc
first and start implementation w/o the unified
connector
API
ready
by
skipping some featue.

Xuefu, I like the idea of making Flink specific
properties
into
generic
key-value pairs, so that it will make integration with
Hive
DDL
(or
others,
e.g. Beam DDL) easier.

I'll run a final pass over the design doc and finalize
the
design
in
the
next few days. And we can start creating tasks and
collaborate
on
the
implementation. Thanks a lot for all the comments and
inputs.
Cheers!
Shuyi

On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
xuefu.z@xxxxxxxxxxxxxxx>
wrote:

Yeah! I agree with Timo that DDL can actually proceed
w/o
being
blocked
by
connector API. We can leave the unknown out while
defining
the
basic
syntax.
@Shuyi

As commented in the doc, I think we can probably
stick
with
simple
syntax
with general properties, without extending the syntax
too
much
that
it
mimics the descriptor API.

Part of our effort on Flink-Hive integration is also
to
make
DDL
syntax
compatible with Hive's. The one in the current
proposal
seems
making
our
effort more challenging.

We can help and collaborate. At this moment, I think
we
can
finalize
on
the proposal and then we can divide the tasks for
better
collaboration.
Please let me know if there are  any questions or
suggestions.
Thanks,
Xuefu





------------------------------------------------------------------
Sender:Timo Walther <twalthr@xxxxxxxxxx>
Sent at:2018 Nov 27 (Tue) 16:21
Recipient:dev <dev@xxxxxxxxxxxxxxxx>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks for offering your help here, Xuefu. It would
be
great
to
move
these efforts forward. I agree that the DDL is
somehow
releated
to
the
unified connector API design but we can also start
with
the
basic
functionality now and evolve the DDL during this
release
and
next
releases.
For example, we could identify the MVP DDL syntax
that
skips
defining
key constraints and maybe even time attributes. This
DDL
could
be
used
for batch usecases, ETL, and materializing SQL
queries
(no
time
operations like windows).

The unified connector API is high on our priority
list
for
the
1.8
release. I will try to update the document until mid
of
next
week.

Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:
Thanks a lot, Xuefu. I was busy for some other
stuff
for
the
last 2
weeks,
but we are definitely interested in moving this
forward.
I
think
once
the
unified connector API design [1] is done, we can
finalize
the
DDL
design
as
well and start creating concrete subtasks to
collaborate
on
the
implementation with the community.

Shuyi

[1]

https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
xuefu.z@xxxxxxxxxxxxxxx>
wrote:

Hi Shuyi,

I'm wondering if you folks still have the
bandwidth
working
on
this.
We have some dedicated resource and like to move
this
forward.
We
can
collaborate.

Thanks,

Xuefu



------------------------------------------------------------------
发件人:wenlong.lwl<wenlong88.lwl@xxxxxxxxx>
日 期:2018年11月05日 11:15:35
收件人:<dev@xxxxxxxxxxxxxxxx>
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from the
ddl,
because
it
is
not
necessary, the framework determine the table
referred
is a
source
or a
sink
according to the context of the query using the
table.
it
will
be
more
convenient for use defining a table which can be
both
a
source
and
sink,
and more convenient for catalog to persistent and
manage
the
meta
infos.
2. how about just keeping one pure string map as
parameters
for
table,
like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode =
‘latest-offset’,
connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string map
properties,
defining
parameters by string-map can be the closest way to
mapping
how
user
use
the
parameters.
2. The table descriptor can be extended by user,
like
what
is
done
in
Kafka
and Json, it means that the parameter keys in
connector
or
format
can
be
different in different implementation, we can not
restrict
the
key
in
a
specified set, so we need a map in connector scope
and a
map
in
connector.properties scope. why not just give
user a
single
map,
let
them
put parameters in a format they like, which is
also
the
simplest
way
to
implement DDL parser.
3. whether we can define a format clause or not,
depends
on
the
implementation of the connector, using different
clause
in
DDL
may
make
a
misunderstanding that we can combine the
connectors
with
arbitrary
formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński <
wossyn@xxxxxxxxx
wrote:
+1, Thanks for the proposal.

I guess this is a long-awaited change. This can
vastly
increase
the
functionalities of the SQL Client as it will be
possible
to
use
complex
extensions like for example those provided by
Apache
Bahir[1].
Best Regards,
Dom.

[1]
https://github.com/apache/bahir-flink

sob., 3 lis 2018 o 17:17 Rong Rong <
walterddr@xxxxxxxxx>
napisał(a):
+1. Thanks for putting the proposal together
Shuyi.
DDL has been brought up in a couple of times
previously
[1,2].
Utilizing
DDL will definitely be a great extension to the
current
Flink
SQL
to
systematically support some of the previously
brought
up
features
such
as
[3]. And it will also be beneficial to see the
document
closely
aligned
with the previous discussion for unified SQL
connector
API
[4].
I also left a few comments on the doc. Looking
forward
to
the
alignment
with the other couple of efforts and
contributing
to
them!
Best,
Rong

[1]


http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
[2]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
[3]
https://issues.apache.org/jira/browse/FLINK-8003
[4]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3C6676cb66-6f31-23e1-eff5-2e9c19f88483@xxxxxxxxxx%3E
On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <
bowenli86@xxxxxxxxx
wrote:
Thanks Shuyi!

I left some comments there. I think the design
of
SQL
DDL
and
Flink-Hive
integration/External catalog enhancements will
work
closely
with
each
other. Hope we are well aligned on the
directions
of
the
two
designs,
and I
look forward to working with you guys on both!

Bowen


On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen <
suez1224@xxxxxxxxx
wrote:
Hi everyone,

SQL DDL support has been a long-time ask from
the
community.
Current
Flink
SQL support only DML (e.g. SELECT and INSERT
statements).
In
its
current
form, Flink SQL users still need to
define/create
table
sources
and
sinks
programmatically in Java/Scala. Also, in SQL
Client,
without
DDL
support,
the current implementation does not allow
dynamical
creation
of
table,
type
or functions with SQL, this adds friction for
its
adoption.
I drafted a design doc [1] with a few other
community
members
that
proposes
the design and implementation for adding DDL
support
in
Flink.
The
initial
design considers DDL for table, view, type,
library
and
function.
It
will
be great to get feedback on the design from
the
community,
and
align
with
latest effort in unified SQL connector API [2]
and
Flink
Hive
integration
[3].

Any feedback is highly appreciated.

Thanks
Shuyi Chen

[1]


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
[2]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
[3]


https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
--
"So you have to trust that the dots will
somehow
connect
in
your
future."


--
"So you have to trust that the dots will somehow
connect
in
your
future."

--
"So you have to trust that the dots will somehow connect in your
future."

--
"So you have to trust that the dots will somehow connect in your future."