osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Support Interactive Programming in Flink Table API


Another argument for Piotr's point is that lazily changing properties of a
node affects all down stream consumers but does not necessarily have to
happen before these consumers are defined. From a user's perspective this
can be quite confusing:

b = a.map(...)
c = a.map(...)

a.cache()
d = a.map(...)

now b, c and d will consume from a cached operator. In this case, the user
would most likely expect that only d reads from a cached result.

Cheers,
Till

On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
wrote:

> Hey Shaoxuan and Becket,
>
> > Can you explain a bit more one what are the side effects? So far my
> > understanding is that such side effects only exist if a table is mutable.
> > Is that the case?
>
> Not only that. There are also performance implications and those are
> another implicit side effects of using `void cache()`. As I wrote before,
> reading from cache might not always be desirable, thus it can cause
> performance degradation and I’m fine with that - user's or optimiser’s
> choice. What I do not like is that this implicit side effect can manifest
> in completely different part of code, that wasn’t touched by a user while
> he was adding `void cache()` call somewhere else. And even if caching
> improves performance, it’s still a side effect of `void cache()`. Almost
> from the definition `void` methods have only side effects. As I wrote
> before, there are couple of scenarios where this might be undesirable
> and/or unexpected, for example:
>
> 1.
> Table b = …;
> b.cache()
> x = b.join(…)
> y = b.count()
> // ...
> // 100
> // hundred
> // lines
> // of
> // code
> // later
> z = b.filter(…).groupBy(…) // this might be even hidden in a different
> method/file/package/dependency
>
> 2.
>
> Table b = ...
> If (some_condition) {
>   foo(b)
> }
> Else {
>   bar(b)
> }
> z = b.filter(…).groupBy(…)
>
>
> Void foo(Table b) {
>   b.cache()
>   // do something with b
> }
>
> In both above examples, `b.cache()` will implicitly affect (semantic of a
> program in case of sources being mutable and performance) `z =
> b.filter(…).groupBy(…)` which might be far from obvious.
>
> On top of that, there is still this argument of mine that having a
> `MaterializedTable` or `CachedTable` handle is more flexible for us for the
> future and for the user (as a manual option to bypass cache reads).
>
> >  But Jiangjie is correct,
> > the source table in batching should be immutable. It is the user’s
> > responsibility to ensure it, otherwise even a regular failover may lead
> > to inconsistent results.
>
> Yes, I agree that’s what perfect world/good deployment should be. But its
> often isn’t and while I’m not trying to fix this (since the proper fix is
> to support transactions), I’m just trying to minimise confusion for the
> users that are not fully aware what’s going on and operate in less then
> perfect setup. And if something bites them after adding `b.cache()` call,
> to make sure that they at least know all of the places that adding this
> line can affect.
>
> Thanks, Piotrek
>
> > On 1 Dec 2018, at 15:39, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> >
> > Hi Piotrek,
> >
> > Thanks again for the clarification. Some more replies are following.
> >
> > But keep in mind that `.cache()` will/might not only be used in
> interactive
> >> programming and not only in batching.
> >
> > It is true. Actually in stream processing, cache() has the same semantic
> as
> > batch processing. The semantic is following:
> > For a table created via a series of computation, save that table for
> later
> > reference to avoid running the computation logic to regenerate the table.
> > Once the application exits, drop all the cache.
> > This semantic is same for both batch and stream processing. The
> difference
> > is that stream applications will only run once as they are long running.
> > And the batch applications may be run multiple times, hence the cache may
> > be created and dropped each time the application runs.
> > Admittedly, there will probably be some resource management requirements
> > for the streaming cached table, such as time based / size based
> retention,
> > to address the infinite data issue. But such requirement does not change
> > the semantic.
> > You are right that interactive programming is just one use case of
> cache().
> > It is not the only use case.
> >
> > For me the more important issue is of not having the `void cache()` with
> >> side effects.
> >
> > This is indeed the key point. The argument around whether cache() should
> > return something already indicates that cache() and materialize() address
> > different issues.
> > Can you explain a bit more one what are the side effects? So far my
> > understanding is that such side effects only exist if a table is mutable.
> > Is that the case?
> >
> > I don’t know, probably initially we should make CachedTable read-only. I
> >> don’t find it more confusing than the fact that user can not write to
> views
> >> or materialised views in SQL or that user currently can not write to a
> >> Table.
> >
> > I don't think anyone should insert something to a cache. By definition
> the
> > cache should only be updated when the corresponding original table is
> > updated. What I am wondering is that given the following two facts:
> > 1. If and only if a table is mutable (with something like insert()), a
> > CachedTable may have implicit behavior.
> > 2. A CachedTable extends a Table.
> > We can come to the conclusion that a CachedTable is mutable and users can
> > insert into the CachedTable directly. This is where I thought confusing.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
> > wrote:
> >
> >> Hi all,
> >>
> >> Regarding naming `cache()` vs `materialize()`. One more explanation why
> I
> >> think `materialize()` is more natural to me is that I think of all
> “Table”s
> >> in Table-API as views. They behave the same way as SQL views, the only
> >> difference for me is that their live scope is short - current session
> which
> >> is limited by different execution model. That’s why “cashing” a view
> for me
> >> is just materialising it.
> >>
> >> However I see and I understand your point of view. Coming from
> >> DataSet/DataStream and generally speaking non-SQL world, `cache()` is
> more
> >> natural. But keep in mind that `.cache()` will/might not only be used in
> >> interactive programming and not only in batching. But naming is one
> issue,
> >> and not that critical to me. Especially that once we implement proper
> >> materialised views, we can always deprecate/rename `cache()` if we deem
> so.
> >>
> >>
> >> For me the more important issue is of not having the `void cache()` with
> >> side effects. Exactly for the reasons that you have mentioned. True:
> >> results might be non deterministic if underlying source table are
> changing.
> >> Problem is that `void cache()` implicitly changes the semantic of
> >> subsequent uses of the cached/materialized Table. It can cause “wtf”
> moment
> >> for a user if he inserts “b.cache()” call in some place in his code and
> >> suddenly some other random places are behaving differently. If
> >> `materialize()` or `cache()` returns a Table handle, we force user to
> >> explicitly use the cache which removes the “random” part from the
> "suddenly
> >> some other random places are behaving differently”.
> >>
> >> This argument and others that I’ve raised (greater flexibility/allowing
> >> user to explicitly bypass the cache) are independent of `cache()` vs
> >> `materialize()` discussion.
> >>
> >>> Does that mean one can also insert into the CachedTable? This sounds
> >> pretty confusing.
> >>
> >> I don’t know, probably initially we should make CachedTable read-only. I
> >> don’t find it more confusing than the fact that user can not write to
> views
> >> or materialised views in SQL or that user currently can not write to a
> >> Table.
> >>
> >> Piotrek
> >>
> >>> On 30 Nov 2018, at 17:38, Xingcan Cui <xingcanc@xxxxxxxxx> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I agree with @Becket that `cache()` and `materialize()` should be
> >> considered as two different methods where the later one is more
> >> sophisticated.
> >>>
> >>> According to my understanding, the initial idea is just to introduce a
> >> simple cache or persist mechanism, but as the TableAPI is a high-level
> API,
> >> it’s naturally for as to think in a SQL way.
> >>>
> >>> Maybe we can add the `cache()` method to the DataSet API and force
> users
> >> to translate a Table to a Dataset before caching it. Then the users
> should
> >> manually register the cached dataset to a table again (we may need some
> >> table replacement mechanisms for datasets with an identical schema but
> >> different contents here). After all, it’s the dataset rather than the
> >> dynamic table that need to be cached, right?
> >>>
> >>> Best,
> >>> Xingcan
> >>>
> >>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <becket.qin@xxxxxxxxx>
> wrote:
> >>>>
> >>>> Hi Piotrek and Jark,
> >>>>
> >>>> Thanks for the feedback and explanation. Those are good arguments.
> But I
> >>>> think those arguments are mostly about materialized view. Let me try
> to
> >>>> explain the reason I believe cache() and materialize() are different.
> >>>>
> >>>> I think cache() and materialize() have quite different implications.
> An
> >>>> analogy I can think of is save()/publish(). When users call cache(),
> it
> >> is
> >>>> just like they are saving an intermediate result as a draft of their
> >> work,
> >>>> this intermediate result may not have any realistic meaning. Calling
> >>>> cache() does not mean users want to publish the cached table in any
> >> manner.
> >>>> But when users call materialize(), that means "I have something
> >> meaningful
> >>>> to be reused by others", now users need to think about the validation,
> >>>> update & versioning, lifecycle of the result, etc.
> >>>>
> >>>> Piotrek's suggestions on variations of the materialize() methods are
> >> very
> >>>> useful. It would be great if Flink have them. The concept of
> >> materialized
> >>>> view is actually a pretty big feature, not to say the related stuff
> like
> >>>> triggers/hooks you mentioned earlier. I think the materialized view
> >> itself
> >>>> should be discussed in a more thorough and systematic manner. And I
> >> found
> >>>> that discussion is kind of orthogonal and way beyond interactive
> >>>> programming experience.
> >>>>
> >>>> The example you gave was interesting. I still have some questions,
> >> though.
> >>>>
> >>>> Table source = … // some source that scans files from a directory
> >>>>> “/foo/bar/“
> >>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> >>>>> Table t2 = t1.materialize() // (or `cache()`)
> >>>>
> >>>> t2.count() // initialise cache (if it’s lazily initialised)
> >>>>> int a1 = t1.count()
> >>>>> int b1 = t2.count()
> >>>>> // something in the background (or we trigger it) writes new files to
> >>>>> /foo/bar
> >>>>> int a2 = t1.count()
> >>>>> int b2 = t2.count()
> >>>>> t2.refresh() // possible future extension, not to be implemented in
> the
> >>>>> initial version
> >>>>>
> >>>>
> >>>> what if someone else added some more files to /foo/bar at this point?
> In
> >>>> that case, a3 won't equals to b3, and the result become
> >> non-deterministic,
> >>>> right?
> >>>>
> >>>> int a3 = t1.count()
> >>>>> int b3 = t2.count()
> >>>>> t2.drop() // another possible future extension, manual “cache”
> dropping
> >>>>
> >>>>
> >>>> When we talk about interactive programming, in most cases, we are
> >> talking
> >>>> about batch applications. A fundamental assumption of such case is
> that
> >> the
> >>>> source data is complete before the data processing begins, and the
> data
> >>>> will not change during the data processing. IMO, if additional rows
> >> needs
> >>>> to be added to some source during the processing, it should be done in
> >> ways
> >>>> like union the source with another table containing the rows to be
> >> added.
> >>>>
> >>>> There are a few cases that computations are executed repeatedly on the
> >>>> changing data source.
> >>>>
> >>>> For example, people may run a ML training job every hour with the
> >> samples
> >>>> newly added in the past hour. In that case, the source data between
> will
> >>>> indeed change. But still, the data remain unchanged within one run.
> And
> >>>> usually in that case, the result will need versioning, i.e. for a
> given
> >>>> result, it tells that the result is a result from the source data by a
> >>>> certain timestamp.
> >>>>
> >>>> Another example is something like data warehouse. In this case, there
> >> are a
> >>>> few source of original/raw data. On top of those sources, many
> >> materialized
> >>>> view / queries / reports / dashboards can be created to generate
> derived
> >>>> data. Those derived data needs to be updated when the underlying
> >> original
> >>>> data changes. In that case, the processing logic that derives the
> >> original
> >>>> data needs to be executed repeatedly to update those reports/views.
> >> Again,
> >>>> all those derived data also need to have version management, such as
> >>>> timestamp.
> >>>>
> >>>> In any of the above two cases, during a single run of the processing
> >> logic,
> >>>> the data cannot change. Otherwise the behavior of the processing logic
> >> may
> >>>> be undefined. In the above two examples, when writing the processing
> >> logic,
> >>>> Users can use .cache() to hint Flink that those results should be
> saved
> >> to
> >>>> avoid repeated computation. And then for the result of my application
> >>>> logic, I'll call materialize(), so that these results could be managed
> >> by
> >>>> the system with versioning, metadata management, lifecycle management,
> >>>> ACLs, etc.
> >>>>
> >>>> It is true we can use materialize() to do the cache() job, but I am
> >> really
> >>>> reluctant to shoehorn cache() into materialize() and force users to
> >> worry
> >>>> about a bunch of implications that they needn't have to. I am
> >> absolutely on
> >>>> your side that redundant API is bad. But it is equally frustrating, if
> >> not
> >>>> more, that the same API does different things.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jiangjie (Becket) Qin
> >>>>
> >>>>
> >>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <wshaoxuan@xxxxxxxxx>
> >> wrote:
> >>>>
> >>>>> Thanks Piotrek,
> >>>>> You provided a very good example, it explains all the confusions I
> >> have.
> >>>>> It is clear that there is something we have not considered in the
> >> initial
> >>>>> proposal. We intend to force the user to reuse the
> cached/materialized
> >>>>> table, if its cache() method is executed. We did not expect that user
> >> may
> >>>>> want to re-executed the plan from the source table. Let me re-think
> >> about
> >>>>> it and get back to you later.
> >>>>>
> >>>>> In the meanwhile, this example/observation also infers that we cannot
> >> fully
> >>>>> involve the optimizer to decide the plan if a cache/materialize is
> >>>>> explicitly used, because weather to reuse the cache data or
> re-execute
> >> the
> >>>>> query from source data may lead to different results. (But I guess
> >>>>> optimizer can still help in some cases ---- as long as it does not
> >>>>> re-execute from the varied source, we should be safe).
> >>>>>
> >>>>> Regards,
> >>>>> Shaoxuan
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski <
> >> piotr@xxxxxxxxxxxxxxxxx>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Shaoxuan,
> >>>>>>
> >>>>>> Re 2:
> >>>>>>
> >>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is modified to->
> t1’
> >>>>>>
> >>>>>> What do you mean that “ t1 is modified to-> t1’ ” ? That
> >>>>>> `methodThatAppliesOperators()` method has changed it’s plan?
> >>>>>>
> >>>>>> I was thinking more about something like this:
> >>>>>>
> >>>>>> Table source = … // some source that scans files from a directory
> >>>>>> “/foo/bar/“
> >>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> >>>>>> Table t2 = t1.materialize() // (or `cache()`)
> >>>>>>
> >>>>>> t2.count() // initialise cache (if it’s lazily initialised)
> >>>>>>
> >>>>>> int a1 = t1.count()
> >>>>>> int b1 = t2.count()
> >>>>>>
> >>>>>> // something in the background (or we trigger it) writes new files
> to
> >>>>>> /foo/bar
> >>>>>>
> >>>>>> int a2 = t1.count()
> >>>>>> int b2 = t2.count()
> >>>>>>
> >>>>>> t2.refresh() // possible future extension, not to be implemented in
> >> the
> >>>>>> initial version
> >>>>>>
> >>>>>> int a3 = t1.count()
> >>>>>> int b3 = t2.count()
> >>>>>>
> >>>>>> t2.drop() // another possible future extension, manual “cache”
> >> dropping
> >>>>>>
> >>>>>> assertTrue(a1 == b1) // same results, but b1 comes from the “cache"
> >>>>>> assertTrue(b1 == b2) // both values come from the same cache
> >>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed full
> table
> >>>>> scan
> >>>>>> and has more data
> >>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache
> >>>>>> assertTrue(b3 == a2 == a3)
> >>>>>>
> >>>>>> Piotrek
> >>>>>>
> >>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imjark@xxxxxxxxx> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> It is an very interesting and useful design!
> >>>>>>>
> >>>>>>> Here I want to share some of my thoughts:
> >>>>>>>
> >>>>>>> 1. Agree with that cache() method should return some Table to avoid
> >>>>> some
> >>>>>>> unexpected problems because of the mutable object.
> >>>>>>> All the existing methods of Table are returning a new Table
> instance.
> >>>>>>>
> >>>>>>> 2. I think materialize() would be more consistent with SQL, this
> >> makes
> >>>>> it
> >>>>>>> possible to support the same feature for SQL (materialize view) and
> >>>>> keep
> >>>>>>> the same API for users in the future.
> >>>>>>> But I'm also fine if we choose cache().
> >>>>>>>
> >>>>>>> 3. In the proposal, a TableService (or FlinkService?) is used to
> >> cache
> >>>>>> the
> >>>>>>> result of the (intermediate) table.
> >>>>>>> But the name of TableService may be a bit general which is not
> quite
> >>>>>>> understanding correctly in the first glance (a metastore for
> >> tables?).
> >>>>>>> Maybe a more specific name would be better, such as
> TableCacheSerive
> >>>>> or
> >>>>>>> TableMaterializeSerivce or something else.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <fhueske@xxxxxxxxx>
> >> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Thanks for the clarification Becket!
> >>>>>>>>
> >>>>>>>> I have a few thoughts to share / questions:
> >>>>>>>>
> >>>>>>>> 1) I'd like to know how you plan to implement the feature on a
> plan
> >> /
> >>>>>>>> planner level.
> >>>>>>>>
> >>>>>>>> I would imaging the following to happen when Table.cache() is
> >> called:
> >>>>>>>>
> >>>>>>>> 1) immediately optimize the Table and internally convert it into a
> >>>>>>>> DataSet/DataStream. This is necessary, to avoid that operators of
> >>>>> later
> >>>>>>>> queries on top of the Table are pushed down.
> >>>>>>>> 2) register the DataSet/DataStream as a DataSet/DataStream-backed
> >>>>> Table
> >>>>>> X
> >>>>>>>> 3) add a sink to the DataSet/DataStream. This is the
> materialization
> >>>>> of
> >>>>>> the
> >>>>>>>> Table X
> >>>>>>>>
> >>>>>>>> Based on your proposal the following would happen:
> >>>>>>>>
> >>>>>>>> Table t1 = ....
> >>>>>>>> t1.cache(); // cache() returns void. The logical plan of t1 is
> >>>>> replaced
> >>>>>> by
> >>>>>>>> a scan of X. There is also a reference to the materialization of
> X.
> >>>>>>>>
> >>>>>>>> t1.count(); // this executes the program, including the
> >>>>>> DataSet/DataStream
> >>>>>>>> that backs X and the sink that writes the materialization of X
> >>>>>>>> t1.count(); // this executes the program, but reads X from the
> >>>>>>>> materialization.
> >>>>>>>>
> >>>>>>>> My question is, how do you determine when whether the scan of t1
> >>>>> should
> >>>>>> go
> >>>>>>>> against the DataSet/DataStream program and when against the
> >>>>>>>> materialization?
> >>>>>>>> AFAIK, there is no hook that will tell you that a part of the
> >> program
> >>>>>> was
> >>>>>>>> executed. Flipping a switch during optimization or plan generation
> >> is
> >>>>>> not
> >>>>>>>> sufficient as there is no guarantee that the plan is also
> executed.
> >>>>>>>>
> >>>>>>>> Overall, this behavior is somewhat similar to what I proposed in
> >>>>>>>> FLINK-8950, which does not include persisting the table, but just
> >>>>>>>> optimizing and reregistering it as DataSet/DataStream scan.
> >>>>>>>>
> >>>>>>>> 2) I think Piotr has a point about the implicit behavior and side
> >>>>>> effects
> >>>>>>>> of the cache() method if it does not return anything.
> >>>>>>>> Consider the following example:
> >>>>>>>>
> >>>>>>>> Table t1 = ???
> >>>>>>>> Table t2 = methodThatAppliesOperators(t1);
> >>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1);
> >>>>>>>>
> >>>>>>>> In this case, the behavior/performance of the plan that results
> from
> >>>>> the
> >>>>>>>> second method call depends on whether t1 was modified by the first
> >>>>>> method
> >>>>>>>> or not.
> >>>>>>>> This is the classic issue of mutable vs. immutable objects.
> >>>>>>>> Also, as Piotr pointed out, it might also be good to have the
> >> original
> >>>>>> plan
> >>>>>>>> of t1, because in some cases it is possible to push filters down
> >> such
> >>>>>> that
> >>>>>>>> evaluating the query from scratch might be more efficient than
> >>>>> accessing
> >>>>>>>> the cache.
> >>>>>>>> Moreover, a CachedTable could extend Table() and offer a method
> >>>>>> refresh().
> >>>>>>>> This sounds quite useful in an interactive session mode.
> >>>>>>>>
> >>>>>>>> 3) Regarding the name, I can see both arguments. IMO,
> materialize()
> >>>>>> seems
> >>>>>>>> to be more future proof.
> >>>>>>>>
> >>>>>>>> Best, Fabian
> >>>>>>>>
> >>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang <
> >>>>>>>> wshaoxuan@xxxxxxxxx>:
> >>>>>>>>
> >>>>>>>>> Hi Piotr,
> >>>>>>>>>
> >>>>>>>>> Thanks for sharing your ideas on the method naming. We will think
> >>>>> about
> >>>>>>>>> your suggestions. But I don't understand why we need to change
> the
> >>>>>> return
> >>>>>>>>> type of cache().
> >>>>>>>>>
> >>>>>>>>> Cache() is a physical operation, it does not change the logic of
> >>>>>>>>> the `Table`. On the tableAPI layer, we should not introduce a new
> >>>>> table
> >>>>>>>>> type unless the logic of table has been changed. If we introduce
> a
> >>>>> new
> >>>>>>>>> table type `CachedTable`, we need create the same set of methods
> of
> >>>>>>>> `Table`
> >>>>>>>>> for it. I don't think it is worth doing this. Or can you please
> >>>>>> elaborate
> >>>>>>>>> more on what could be the "implicit behaviours/side effects" you
> >> are
> >>>>>>>>> thinking about?
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Shaoxuan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski <
> >>>>>> piotr@xxxxxxxxxxxxxxxxx>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Becket,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the response.
> >>>>>>>>>>
> >>>>>>>>>> 1. I wasn’t saying that materialised view must be mutable or
> not.
> >>>>> The
> >>>>>>>>> same
> >>>>>>>>>> thing applies to caches as well. To the contrary, I would expect
> >>>>> more
> >>>>>>>>>> consistency and updates from something that is called “cache” vs
> >>>>>>>>> something
> >>>>>>>>>> that’s a “materialised view”. In other words, IMO most caches do
> >> not
> >>>>>>>>> serve
> >>>>>>>>>> you invalid/outdated data and they handle updates on their own.
> >>>>>>>>>>
> >>>>>>>>>> 2. I don’t think that having in the future two very similar
> >> concepts
> >>>>>> of
> >>>>>>>>>> `materialized` view and `cache` is a good idea. It would be
> >>>>> confusing
> >>>>>>>> for
> >>>>>>>>>> the users. I think it could be handled by variations/overloading
> >> of
> >>>>>>>>>> materialised view concept. We could start with:
> >>>>>>>>>>
> >>>>>>>>>> `MaterializedTable materialize()` - immutable, session life
> scope
> >>>>>>>>>> (basically the same semantic as you are proposing
> >>>>>>>>>>
> >>>>>>>>>> And then in the future (if ever) build on top of that/expand it
> >>>>> with:
> >>>>>>>>>>
> >>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or
> >> `MaterializedTable
> >>>>>>>>>> materialize(refreshHook=…)`
> >>>>>>>>>>
> >>>>>>>>>> Or with cross session support:
> >>>>>>>>>>
> >>>>>>>>>> `MaterializedTable materializeInto(connector=…)` or
> >>>>> `MaterializedTable
> >>>>>>>>>> materializeInto(tableFactory=…)`
> >>>>>>>>>>
> >>>>>>>>>> I’m not saying that we should implement cross session/refreshing
> >> now
> >>>>>> or
> >>>>>>>>>> even in the near future. I’m just arguing that naming current
> >>>>>> immutable
> >>>>>>>>>> session life scope method `materialize()` is more future proof
> and
> >>>>>> more
> >>>>>>>>>> consistent with SQL (on which after all table-api is heavily
> >> basing
> >>>>>>>> on).
> >>>>>>>>>>
> >>>>>>>>>> 3. Even if we agree on naming it `cache()`, I would still insist
> >> on
> >>>>>>>>>> `cache()` returning `CachedTable` handle to avoid implicit
> >>>>>>>>> behaviours/side
> >>>>>>>>>> effects and to give both us & users more flexibility.
> >>>>>>>>>>
> >>>>>>>>>> Piotrek
> >>>>>>>>>>
> >>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <becket.qin@xxxxxxxxx>
> >> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Just to add a little bit, the materialized view is probably
> more
> >>>>>>>>> similar
> >>>>>>>>>> to
> >>>>>>>>>>> the persistent() brought up earlier in the thread. So it is
> >> usually
> >>>>>>>>> cross
> >>>>>>>>>>> session and could be used in a larger scope. For example, a
> >>>>>>>>> materialized
> >>>>>>>>>>> view created by user A may be visible to user B. It is probably
> >>>>>>>>> something
> >>>>>>>>>>> we want to have in the future. I'll put it in the future work
> >>>>>>>> section.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <
> becket.qin@xxxxxxxxx
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the explanation.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Right now we are mostly thinking of the cached table as
> >>>>> immutable. I
> >>>>>>>>> can
> >>>>>>>>>>>> see the Materialized view would be useful in the future. That
> >>>>> said,
> >>>>>>>> I
> >>>>>>>>>> think
> >>>>>>>>>>>> a simple cache mechanism is probably still needed. So to me,
> >>>>> cache()
> >>>>>>>>> and
> >>>>>>>>>>>> materialize() should be two separate method as they address
> >>>>>>>> different
> >>>>>>>>>>>> needs. Materialize() is a higher level concept usually
> implying
> >>>>>>>>>> periodical
> >>>>>>>>>>>> update, while cache() has much simpler semantic. For example,
> >> one
> >>>>>>>> may
> >>>>>>>>>>>> create a materialized view and use cache() method in the
> >>>>>>>> materialized
> >>>>>>>>>> view
> >>>>>>>>>>>> creation logic. So that during the materialized view update,
> >> they
> >>>>> do
> >>>>>>>>> not
> >>>>>>>>>>>> need to worry about the case that the cached table is also
> >>>>> changed.
> >>>>>>>>>> Maybe
> >>>>>>>>>>>> under the hood, materialized() and cache() could share some
> >>>>>>>> mechanism,
> >>>>>>>>>> but
> >>>>>>>>>>>> I think a simple cache() method would be handy in a lot of
> >> cases.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski <
> >>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> >>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Becket,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable
> >> that
> >>>>>>>>> they
> >>>>>>>>>>>>> cannot do on a Table?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Maybe not in the initial implementation, but various DBs
> offer
> >>>>>>>>>> different
> >>>>>>>>>>>>> ways to “refresh” the materialised view. Hooks, triggers,
> >> timers,
> >>>>>>>>>> manually
> >>>>>>>>>>>>> etc. Having `MaterializedTable` would help us to handle that
> in
> >>>>> the
> >>>>>>>>>> future.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> After users call *table.cache(), *users can just use that
> >> table
> >>>>>>>> and
> >>>>>>>>> do
> >>>>>>>>>>>>> anything that is supported on a Table, including SQL.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This is some implicit behaviour with side effects. Imagine if
> >>>>> user
> >>>>>>>>> has
> >>>>>>>>>> a
> >>>>>>>>>>>>> long and complicated program, that touches table `b` multiple
> >>>>>>>> times,
> >>>>>>>>>> maybe
> >>>>>>>>>>>>> scattered around different methods. If he modifies his
> program
> >> by
> >>>>>>>>>> inserting
> >>>>>>>>>>>>> in one place
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> b.cache()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This implicitly alters the semantic and behaviour of his code
> >> all
> >>>>>>>>> over
> >>>>>>>>>>>>> the place, maybe in a ways that might cause problems. For
> >> example
> >>>>>>>>> what
> >>>>>>>>>> if
> >>>>>>>>>>>>> underlying data is changing?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Having invisible side effects is also not very clean, for
> >> example
> >>>>>>>>> think
> >>>>>>>>>>>>> about something like this (but more complicated):
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Table b = ...;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If (some_condition) {
> >>>>>>>>>>>>> processTable1(b)
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>> else {
> >>>>>>>>>>>>> processTable2(b)
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> // do more stuff with b
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> And user adds `b.cache()` call to only one of the
> >> `processTable1`
> >>>>>>>> or
> >>>>>>>>>>>>> `processTable2` methods.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On the other hand
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Table materialisedB = b.materialize()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Avoids (at least some of) the side effect issues and forces
> >> user
> >>>>> to
> >>>>>>>>>>>>> explicitly use `materialisedB` where it’s appropriate and
> >> forces
> >>>>>>>> user
> >>>>>>>>>> to
> >>>>>>>>>>>>> think what does it actually mean. And if something doesn’t
> work
> >>>>> in
> >>>>>>>>> the
> >>>>>>>>>> end
> >>>>>>>>>>>>> for the user, he will know what has he changed instead of
> >> blaming
> >>>>>>>>>> Flink for
> >>>>>>>>>>>>> some “magic” underneath. In the above example, after
> >>>>> materialising
> >>>>>>>> b
> >>>>>>>>> in
> >>>>>>>>>>>>> only one of the methods, he should/would realise about the
> >> issue
> >>>>>>>> when
> >>>>>>>>>>>>> handling the return value `MaterializedTable` of that method.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I guess it comes down to personal preferences if you like
> >> things
> >>>>> to
> >>>>>>>>> be
> >>>>>>>>>>>>> implicit or not. The more power is the user, probably the
> more
> >>>>>>>> likely
> >>>>>>>>>> he is
> >>>>>>>>>>>>> to like/understand implicit behaviour. And we as Table API
> >>>>>>>> designers
> >>>>>>>>>> are
> >>>>>>>>>>>>> the most power users out there, so I would proceed with
> caution
> >>>>> (so
> >>>>>>>>>> that we
> >>>>>>>>>>>>> do not end up in the crazy perl realm with it’s lovely
> implicit
> >>>>>>>>> method
> >>>>>>>>>>>>> arguments ;)  <https://stackoverflow.com/a/14922656/8149051
> >)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Table API to also support non-relational processing cases,
> >>>>> cache()
> >>>>>>>>>>>>> might be slightly better.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think even such extended Table API could benefit from
> >> sticking
> >>>>>>>>>> to/being
> >>>>>>>>>>>>> consistent with SQL where both SQL and Table API are
> basically
> >>>>> the
> >>>>>>>>>> same.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> One more thing. `MaterializedTable materialize()` could be
> more
> >>>>>>>>>>>>> powerful/flexible allowing the user to operate both on
> >>>>> materialised
> >>>>>>>>>> and not
> >>>>>>>>>>>>> materialised view at the same time for whatever reasons
> >>>>> (underlying
> >>>>>>>>>> data
> >>>>>>>>>>>>> changing/better optimisation opportunities after pushing down
> >>>>> more
> >>>>>>>>>> filters
> >>>>>>>>>>>>> etc). For example:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Table b = …;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> MaterlizedTable mb = b.materialize();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Val min = mb.min();
> >>>>>>>>>>>>> Val max = mb.max();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Could be more efficient compared to `b.cache()` if
> >>>>> `filter(‘userId
> >>>>>>>> =
> >>>>>>>>>>>>> 42);` allows for much more aggressive optimisations.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <fhueske@xxxxxxxxx>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm not suggesting to add support for Ignite. This was just
> an
> >>>>>>>>>> example.
> >>>>>>>>>>>>>> Plasma and Arrow sound interesting, too.
> >>>>>>>>>>>>>> For the sake of this proposal, it would be up to the user to
> >>>>>>>>>> implement a
> >>>>>>>>>>>>>> TableFactory and corresponding TableSource / TableSink
> classes
> >>>>> to
> >>>>>>>>>>>>> persist
> >>>>>>>>>>>>>> and read the data.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio
> Pompermaier
> >> <
> >>>>>>>>>>>>>> pompermaier@xxxxxxxx>:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as an
> >> alternative
> >>>>> to
> >>>>>>>>>>>>> Apache
> >>>>>>>>>>>>>>> Ignite?
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> >> https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske <
> >>>>>>>> fhueske@xxxxxxxxx>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the proposal!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> To summarize, you propose a new method Table.cache():
> Table
> >>>>> that
> >>>>>>>>>> will
> >>>>>>>>>>>>>>>> trigger a job and write the result into some temporary
> >> storage
> >>>>>>>> as
> >>>>>>>>>>>>> defined
> >>>>>>>>>>>>>>>> by a TableFactory.
> >>>>>>>>>>>>>>>> The cache() call blocks while the job is running and
> >>>>> eventually
> >>>>>>>>>>>>> returns a
> >>>>>>>>>>>>>>>> Table object that represents a scan of the temporary
> table.
> >>>>>>>>>>>>>>>> When the "session" is closed (closing to be defined?), the
> >>>>>>>>> temporary
> >>>>>>>>>>>>>>> tables
> >>>>>>>>>>>>>>>> are all dropped.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I think this behavior makes sense and is a good first step
> >>>>>>>> towards
> >>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>> interactive workloads.
> >>>>>>>>>>>>>>>> However, its performance suffers from writing to and
> reading
> >>>>>>>> from
> >>>>>>>>>>>>>>> external
> >>>>>>>>>>>>>>>> systems.
> >>>>>>>>>>>>>>>> I think this is OK for now. Changes that would
> significantly
> >>>>>>>>> improve
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory across jobs) would
> >>>>> have
> >>>>>>>>>> large
> >>>>>>>>>>>>>>>> impacts on many components of Flink.
> >>>>>>>>>>>>>>>> Users could use in-memory filesystems or storage grids
> >> (Apache
> >>>>>>>>>>>>> Ignite) to
> >>>>>>>>>>>>>>>> mitigate some of the performance effects.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin <
> >>>>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> >>>>>>>>>>>>>>>>> :
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Is there any extra thing user can do on a
> MaterializedTable
> >>>>>>>> that
> >>>>>>>>>> they
> >>>>>>>>>>>>>>>>> cannot do on a Table? After users call *table.cache(),
> >> *users
> >>>>>>>> can
> >>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>> that table and do anything that is supported on a Table,
> >>>>>>>>> including
> >>>>>>>>>>>>> SQL.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Naming wise, either cache() or materialize() sounds fine
> to
> >>>>> me.
> >>>>>>>>>>>>> cache()
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> a bit more general than materialize(). Given that we are
> >>>>>>>>> enhancing
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> Table API to also support non-relational processing
> cases,
> >>>>>>>>> cache()
> >>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> slightly better.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski <
> >>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Becket,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to reuse
> >> existing
> >>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed that you
> >>>>> want
> >>>>>>>> to
> >>>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>> alternate way of writing the data.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Now that I hopefully understand the proposal, maybe we
> >> could
> >>>>>>>>>> rename
> >>>>>>>>>>>>>>>>>> `cache()` to
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> void materialize()
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> or going step further
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> MaterializedTable materialize()
> >>>>>>>>>>>>>>>>>> MaterializedTable createMaterializedView()
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> ?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The second option with returning a handle I think is
> more
> >>>>>>>>> flexible
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> could provide features such as “refresh”/“delete” or
> >>>>> generally
> >>>>>>>>>>>>>>> speaking
> >>>>>>>>>>>>>>>>>> manage the the view. In the future we could also think
> >> about
> >>>>>>>>>> adding
> >>>>>>>>>>>>>>>> hooks
> >>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is also more
> >> explicit
> >>>>> -
> >>>>>>>>>>>>>>>>>> materialization returning a new table handle will not
> have
> >>>>> the
> >>>>>>>>>> same
> >>>>>>>>>>>>>>>>>> implicit side effects as adding a simple line of code
> like
> >>>>>>>>>>>>>>> `b.cache()`
> >>>>>>>>>>>>>>>>>> would have.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It would also be more SQL like, making it more intuitive
> >> for
> >>>>>>>>> users
> >>>>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>> familiar with the SQL.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin <
> >> becket.qin@xxxxxxxxx
> >>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is equivalent to
> >>>>>>>>> creating
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> BUILT-IN
> >>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That functionality
> is
> >>>>>>>>> missing
> >>>>>>>>>>>>>>>>> today,
> >>>>>>>>>>>>>>>>>>> though. Not sure if I understand your question. Do you
> >> mean
> >>>>>>>> we
> >>>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> the functionality and just need a syntax sugar?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> What's more interesting in the proposal is do we want
> to
> >>>>> stop
> >>>>>>>>> at
> >>>>>>>>>>>>>>>>> creating
> >>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to extend that in
> >> the
> >>>>>>>>> future
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> useful unified data store distributed with Flink? And
> do
> >> we
> >>>>>>>>> want
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job pattern with
> their
> >>>>> own
> >>>>>>>>>> user
> >>>>>>>>>>>>>>>>>> defined
> >>>>>>>>>>>>>>>>>>> services. These considerations are much more
> >> architectural.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski <
> >>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand the
> problem.
> >>>>>>>> Isn’t
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data to a sink
> >> and
> >>>>>>>>> later
> >>>>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live scope/live
> >>>>> time?
> >>>>>>>>> And
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> sink
> >>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a file sink?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a materialised
> >>>>> view
> >>>>>>>>>> from a
> >>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and reusing this
> >>>>>>>>> materialised
> >>>>>>>>>>>>>>>> view
> >>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to clean up
> >>>>>>>>> materialised
> >>>>>>>>>>>>>>>> views
> >>>>>>>>>>>>>>>>>> (for
> >>>>>>>>>>>>>>>>>>>> example when current session finishes)? Maybe we need
> >> some
> >>>>>>>>>>>>>>> syntactic
> >>>>>>>>>>>>>>>>>> sugar
> >>>>>>>>>>>>>>>>>>>> on top of it?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin <
> >>>>> becket.qin@xxxxxxxxx
> >>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a persist() with
> >>>>>>>>>>>>>>>>> lifecycle/defined
> >>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the future work for
> >>>>> this.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <
> >>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the name of
> >>>>>>>> `cache()`, I
> >>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>>> why
> >>>>>>>>>>>>>>>>>>>>>> you designed this way!
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a lifecycle
> for
> >>>>>>>> data
> >>>>>>>>>>>>>>>>>> persistence?
> >>>>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so that
> the
> >>>>> user
> >>>>>>>>> is
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> worried
> >>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify the time
> >> range
> >>>>>>>> for
> >>>>>>>>>>>>>>>> keeping
> >>>>>>>>>>>>>>>>>>>> time.
> >>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we can also
> >>>>> share
> >>>>>>>>> in a
> >>>>>>>>>>>>>>>>> certain
> >>>>>>>>>>>>>>>>>>>>>> group of session, for example:
> >>>>>>>>> LifeCycle.SESSION_GROUP(...), I
> >>>>>>>>>>>>>>> am
> >>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> sure,
> >>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference only!
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Bests,
> >>>>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月23日周五
> >>>>>>>> 下午1:33写道:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache() v.s.
> >>>>>>>> persist(),
> >>>>>>>>>>>>>>>>>> personally I
> >>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately describing the
> >>>>>>>> behavior,
> >>>>>>>>>>>>>>> i.e.
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> Table
> >>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be deleted
> after
> >>>>> the
> >>>>>>>>>>>>>>> session
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> closed.
> >>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as people might
> >>>>> think
> >>>>>>>>> the
> >>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>> still be there even after the session is gone.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and stream
> >>>>> processing
> >>>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>> job.
> >>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that goal. I
> >> imagine
> >>>>>>>> that
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> huge
> >>>>>>>>>>>>>>>>>>>>>>> change across the board, including sources,
> operators
> >>>>> and
> >>>>>>>>>>>>>>>>>>>> optimizations,
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several separate
> >>>>> in-depth
> >>>>>>>>>>>>>>>>> discussions.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <
> >>>>>>>>>>>>>>> xingcanc@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access domain
> >> are
> >>>>>>>> both
> >>>>>>>>>>>>>>>>>> orthogonal
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may be the
> >> first
> >>>>>>>> time
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>> plan
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism other than the
> >>>>>>>> state.
> >>>>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>>> it’s
> >>>>>>>>>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then concentrate
> on
> >> a
> >>>>>>>>>> specific
> >>>>>>>>>>>>>>>>> part?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned with
> the
> >>>>>>>>>> underlying
> >>>>>>>>>>>>>>>>>>>> service.
> >>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to the
> >> existing
> >>>>>>>>>>>>>>> codebase.
> >>>>>>>>>>>>>>>> As
> >>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be extendible to
> support
> >>>>>>>> other
> >>>>>>>>>>>>>>>>>> components
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another thread.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more
> >> interactive
> >>>>>>>>> Table
> >>>>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service
> mechanism.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>> Xingcan
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <
> >>>>>>>>>>>>>>>> xiaoweij@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table for
> clean
> >> up
> >>>>>>>> is
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>>>> reliable.
> >>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be executed
> >>>>>>>>>> successfully.
> >>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>>>>> risk
> >>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that it's safer
> to
> >>>>>>>> have
> >>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>> association
> >>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So we can
> always
> >>>>>>>> clean
> >>>>>>>>>> up
> >>>>>>>>>>>>>>>> temp
> >>>>>>>>>>>>>>>>>>>>>>> tables
> >>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any active
> >>>>>>>> sessions.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
> >>>>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great proposal!
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful and user
> >>>>>>>> friendly
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>> examples.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has to be
> >>>>>>>> executed
> >>>>>>>>> in
> >>>>>>>>>>>>>>>>> several
> >>>>>>>>>>>>>>>>>>>>>>>> stages
> >>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline of Flink
> >> ML,
> >>>>> in
> >>>>>>>>>> order
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> utilize
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we have to
> >> submit a
> >>>>>>>> job
> >>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>> env.execute().
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()`  , I think is better to
> named
> >>>>>>>>>>>>>>> `persist()`,
> >>>>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we internally
> >>>>> cache
> >>>>>>>>> in
> >>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the data into
> >> state
> >>>>>>>>>> backend
> >>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.)
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the future,
> >>>>> support
> >>>>>>>>> for
> >>>>>>>>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job will also
> >>>>> benefit
> >>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> "Interactive
> >>>>>>>>>>>>>>>>>>>>>>>>>> Programming",  I am looking forward to your
> JIRAs
> >>>>> and
> >>>>>>>>>> FLIP!
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx>
> 于2018年11月20日周二
> >>>>>>>>>> 下午9:56写道:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have pointed out,
> >> it
> >>>>>>>> is a
> >>>>>>>>>>>>>>>>> promising
> >>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API in
> various
> >>>>>>>>>> aspects,
> >>>>>>>>>>>>>>>>>>>>>> including
> >>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among others. One
> >> of
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> scenarios
> >>>>>>>>>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive
> >>>>> programming.
> >>>>>>>> To
> >>>>>>>>>>>>>>>> explain
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> issues
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the solution,
> we
> >>>>> put
> >>>>>>>>>>>>>>>> together
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>
> >>
> >>
>
>