osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Support Interactive Programming in Flink Table API


Hi Piotrek,

Cache() should not affect semantics and business logic, and thus it will
not lead to random behavior/results. The underlying design should ensure
this. I thought your example as a valid anti-case. But Jiangjie is correct,
the source table in batching should be immutable. It is the user’s
responsibility to ensure it, otherwise even a regular failover may lead
to inconsistent results. If you consider cache as an optimization hint,
rather than a special case of materialized view, it might be easy to
understand the problem we are trying to solve.

Regards,
Shaoxuan


On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
wrote:

> Hi all,
>
> Regarding naming `cache()` vs `materialize()`. One more explanation why I
> think `materialize()` is more natural to me is that I think of all “Table”s
> in Table-API as views. They behave the same way as SQL views, the only
> difference for me is that their live scope is short - current session which
> is limited by different execution model. That’s why “cashing” a view for me
> is just materialising it.
>
> However I see and I understand your point of view. Coming from
> DataSet/DataStream and generally speaking non-SQL world, `cache()` is more
> natural. But keep in mind that `.cache()` will/might not only be used in
> interactive programming and not only in batching. But naming is one issue,
> and not that critical to me. Especially that once we implement proper
> materialised views, we can always deprecate/rename `cache()` if we deem so.
>
>
> For me the more important issue is of not having the `void cache()` with
> side effects. Exactly for the reasons that you have mentioned. True:
> results might be non deterministic if underlying source table are changing.
> Problem is that `void cache()` implicitly changes the semantic of
> subsequent uses of the cached/materialized Table. It can cause “wtf” moment
> for a user if he inserts “b.cache()” call in some place in his code and
> suddenly some other random places are behaving differently. If
> `materialize()` or `cache()` returns a Table handle, we force user to
> explicitly use the cache which removes the “random” part from the "suddenly
> some other random places are behaving differently”.
>
> This argument and others that I’ve raised (greater flexibility/allowing
> user to explicitly bypass the cache) are independent of `cache()` vs
> `materialize()` discussion.
>
> > Does that mean one can also insert into the CachedTable? This sounds
> pretty confusing.
>
> I don’t know, probably initially we should make CachedTable read-only. I
> don’t find it more confusing than the fact that user can not write to views
> or materialised views in SQL or that user currently can not write to a
> Table.
>
> Piotrek
>
> > On 30 Nov 2018, at 17:38, Xingcan Cui <xingcanc@xxxxxxxxx> wrote:
> >
> > Hi all,
> >
> > I agree with @Becket that `cache()` and `materialize()` should be
> considered as two different methods where the later one is more
> sophisticated.
> >
> > According to my understanding, the initial idea is just to introduce a
> simple cache or persist mechanism, but as the TableAPI is a high-level API,
> it’s naturally for as to think in a SQL way.
> >
> > Maybe we can add the `cache()` method to the DataSet API and force users
> to translate a Table to a Dataset before caching it. Then the users should
> manually register the cached dataset to a table again (we may need some
> table replacement mechanisms for datasets with an identical schema but
> different contents here). After all, it’s the dataset rather than the
> dynamic table that need to be cached, right?
> >
> > Best,
> > Xingcan
> >
> >> On Nov 30, 2018, at 10:57 AM, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> >>
> >> Hi Piotrek and Jark,
> >>
> >> Thanks for the feedback and explanation. Those are good arguments. But I
> >> think those arguments are mostly about materialized view. Let me try to
> >> explain the reason I believe cache() and materialize() are different.
> >>
> >> I think cache() and materialize() have quite different implications. An
> >> analogy I can think of is save()/publish(). When users call cache(), it
> is
> >> just like they are saving an intermediate result as a draft of their
> work,
> >> this intermediate result may not have any realistic meaning. Calling
> >> cache() does not mean users want to publish the cached table in any
> manner.
> >> But when users call materialize(), that means "I have something
> meaningful
> >> to be reused by others", now users need to think about the validation,
> >> update & versioning, lifecycle of the result, etc.
> >>
> >> Piotrek's suggestions on variations of the materialize() methods are
> very
> >> useful. It would be great if Flink have them. The concept of
> materialized
> >> view is actually a pretty big feature, not to say the related stuff like
> >> triggers/hooks you mentioned earlier. I think the materialized view
> itself
> >> should be discussed in a more thorough and systematic manner. And I
> found
> >> that discussion is kind of orthogonal and way beyond interactive
> >> programming experience.
> >>
> >> The example you gave was interesting. I still have some questions,
> though.
> >>
> >> Table source = … // some source that scans files from a directory
> >>> “/foo/bar/“
> >>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> >>> Table t2 = t1.materialize() // (or `cache()`)
> >>
> >> t2.count() // initialise cache (if it’s lazily initialised)
> >>> int a1 = t1.count()
> >>> int b1 = t2.count()
> >>> // something in the background (or we trigger it) writes new files to
> >>> /foo/bar
> >>> int a2 = t1.count()
> >>> int b2 = t2.count()
> >>> t2.refresh() // possible future extension, not to be implemented in the
> >>> initial version
> >>>
> >>
> >> what if someone else added some more files to /foo/bar at this point? In
> >> that case, a3 won't equals to b3, and the result become
> non-deterministic,
> >> right?
> >>
> >> int a3 = t1.count()
> >>> int b3 = t2.count()
> >>> t2.drop() // another possible future extension, manual “cache” dropping
> >>
> >>
> >> When we talk about interactive programming, in most cases, we are
> talking
> >> about batch applications. A fundamental assumption of such case is that
> the
> >> source data is complete before the data processing begins, and the data
> >> will not change during the data processing. IMO, if additional rows
> needs
> >> to be added to some source during the processing, it should be done in
> ways
> >> like union the source with another table containing the rows to be
> added.
> >>
> >> There are a few cases that computations are executed repeatedly on the
> >> changing data source.
> >>
> >> For example, people may run a ML training job every hour with the
> samples
> >> newly added in the past hour. In that case, the source data between will
> >> indeed change. But still, the data remain unchanged within one run. And
> >> usually in that case, the result will need versioning, i.e. for a given
> >> result, it tells that the result is a result from the source data by a
> >> certain timestamp.
> >>
> >> Another example is something like data warehouse. In this case, there
> are a
> >> few source of original/raw data. On top of those sources, many
> materialized
> >> view / queries / reports / dashboards can be created to generate derived
> >> data. Those derived data needs to be updated when the underlying
> original
> >> data changes. In that case, the processing logic that derives the
> original
> >> data needs to be executed repeatedly to update those reports/views.
> Again,
> >> all those derived data also need to have version management, such as
> >> timestamp.
> >>
> >> In any of the above two cases, during a single run of the processing
> logic,
> >> the data cannot change. Otherwise the behavior of the processing logic
> may
> >> be undefined. In the above two examples, when writing the processing
> logic,
> >> Users can use .cache() to hint Flink that those results should be saved
> to
> >> avoid repeated computation. And then for the result of my application
> >> logic, I'll call materialize(), so that these results could be managed
> by
> >> the system with versioning, metadata management, lifecycle management,
> >> ACLs, etc.
> >>
> >> It is true we can use materialize() to do the cache() job, but I am
> really
> >> reluctant to shoehorn cache() into materialize() and force users to
> worry
> >> about a bunch of implications that they needn't have to. I am
> absolutely on
> >> your side that redundant API is bad. But it is equally frustrating, if
> not
> >> more, that the same API does different things.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <wshaoxuan@xxxxxxxxx>
> wrote:
> >>
> >>> Thanks Piotrek,
> >>> You provided a very good example, it explains all the confusions I
> have.
> >>> It is clear that there is something we have not considered in the
> initial
> >>> proposal. We intend to force the user to reuse the cached/materialized
> >>> table, if its cache() method is executed. We did not expect that user
> may
> >>> want to re-executed the plan from the source table. Let me re-think
> about
> >>> it and get back to you later.
> >>>
> >>> In the meanwhile, this example/observation also infers that we cannot
> fully
> >>> involve the optimizer to decide the plan if a cache/materialize is
> >>> explicitly used, because weather to reuse the cache data or re-execute
> the
> >>> query from source data may lead to different results. (But I guess
> >>> optimizer can still help in some cases ---- as long as it does not
> >>> re-execute from the varied source, we should be safe).
> >>>
> >>> Regards,
> >>> Shaoxuan
> >>>
> >>>
> >>>
> >>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski <
> piotr@xxxxxxxxxxxxxxxxx>
> >>> wrote:
> >>>
> >>>> Hi Shaoxuan,
> >>>>
> >>>> Re 2:
> >>>>
> >>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is modified to-> t1’
> >>>>
> >>>> What do you mean that “ t1 is modified to-> t1’ ” ? That
> >>>> `methodThatAppliesOperators()` method has changed it’s plan?
> >>>>
> >>>> I was thinking more about something like this:
> >>>>
> >>>> Table source = … // some source that scans files from a directory
> >>>> “/foo/bar/“
> >>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> >>>> Table t2 = t1.materialize() // (or `cache()`)
> >>>>
> >>>> t2.count() // initialise cache (if it’s lazily initialised)
> >>>>
> >>>> int a1 = t1.count()
> >>>> int b1 = t2.count()
> >>>>
> >>>> // something in the background (or we trigger it) writes new files to
> >>>> /foo/bar
> >>>>
> >>>> int a2 = t1.count()
> >>>> int b2 = t2.count()
> >>>>
> >>>> t2.refresh() // possible future extension, not to be implemented in
> the
> >>>> initial version
> >>>>
> >>>> int a3 = t1.count()
> >>>> int b3 = t2.count()
> >>>>
> >>>> t2.drop() // another possible future extension, manual “cache”
> dropping
> >>>>
> >>>> assertTrue(a1 == b1) // same results, but b1 comes from the “cache"
> >>>> assertTrue(b1 == b2) // both values come from the same cache
> >>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed full table
> >>> scan
> >>>> and has more data
> >>>> assertTrue(b3 > b2) // b3 comes from refreshed cache
> >>>> assertTrue(b3 == a2 == a3)
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 30 Nov 2018, at 10:22, Jark Wu <imjark@xxxxxxxxx> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> It is an very interesting and useful design!
> >>>>>
> >>>>> Here I want to share some of my thoughts:
> >>>>>
> >>>>> 1. Agree with that cache() method should return some Table to avoid
> >>> some
> >>>>> unexpected problems because of the mutable object.
> >>>>> All the existing methods of Table are returning a new Table instance.
> >>>>>
> >>>>> 2. I think materialize() would be more consistent with SQL, this
> makes
> >>> it
> >>>>> possible to support the same feature for SQL (materialize view) and
> >>> keep
> >>>>> the same API for users in the future.
> >>>>> But I'm also fine if we choose cache().
> >>>>>
> >>>>> 3. In the proposal, a TableService (or FlinkService?) is used to
> cache
> >>>> the
> >>>>> result of the (intermediate) table.
> >>>>> But the name of TableService may be a bit general which is not quite
> >>>>> understanding correctly in the first glance (a metastore for
> tables?).
> >>>>> Maybe a more specific name would be better, such as TableCacheSerive
> >>> or
> >>>>> TableMaterializeSerivce or something else.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <fhueske@xxxxxxxxx>
> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thanks for the clarification Becket!
> >>>>>>
> >>>>>> I have a few thoughts to share / questions:
> >>>>>>
> >>>>>> 1) I'd like to know how you plan to implement the feature on a plan
> /
> >>>>>> planner level.
> >>>>>>
> >>>>>> I would imaging the following to happen when Table.cache() is
> called:
> >>>>>>
> >>>>>> 1) immediately optimize the Table and internally convert it into a
> >>>>>> DataSet/DataStream. This is necessary, to avoid that operators of
> >>> later
> >>>>>> queries on top of the Table are pushed down.
> >>>>>> 2) register the DataSet/DataStream as a DataSet/DataStream-backed
> >>> Table
> >>>> X
> >>>>>> 3) add a sink to the DataSet/DataStream. This is the materialization
> >>> of
> >>>> the
> >>>>>> Table X
> >>>>>>
> >>>>>> Based on your proposal the following would happen:
> >>>>>>
> >>>>>> Table t1 = ....
> >>>>>> t1.cache(); // cache() returns void. The logical plan of t1 is
> >>> replaced
> >>>> by
> >>>>>> a scan of X. There is also a reference to the materialization of X.
> >>>>>>
> >>>>>> t1.count(); // this executes the program, including the
> >>>> DataSet/DataStream
> >>>>>> that backs X and the sink that writes the materialization of X
> >>>>>> t1.count(); // this executes the program, but reads X from the
> >>>>>> materialization.
> >>>>>>
> >>>>>> My question is, how do you determine when whether the scan of t1
> >>> should
> >>>> go
> >>>>>> against the DataSet/DataStream program and when against the
> >>>>>> materialization?
> >>>>>> AFAIK, there is no hook that will tell you that a part of the
> program
> >>>> was
> >>>>>> executed. Flipping a switch during optimization or plan generation
> is
> >>>> not
> >>>>>> sufficient as there is no guarantee that the plan is also executed.
> >>>>>>
> >>>>>> Overall, this behavior is somewhat similar to what I proposed in
> >>>>>> FLINK-8950, which does not include persisting the table, but just
> >>>>>> optimizing and reregistering it as DataSet/DataStream scan.
> >>>>>>
> >>>>>> 2) I think Piotr has a point about the implicit behavior and side
> >>>> effects
> >>>>>> of the cache() method if it does not return anything.
> >>>>>> Consider the following example:
> >>>>>>
> >>>>>> Table t1 = ???
> >>>>>> Table t2 = methodThatAppliesOperators(t1);
> >>>>>> Table t3 = methodThatAppliesOtherOperators(t1);
> >>>>>>
> >>>>>> In this case, the behavior/performance of the plan that results from
> >>> the
> >>>>>> second method call depends on whether t1 was modified by the first
> >>>> method
> >>>>>> or not.
> >>>>>> This is the classic issue of mutable vs. immutable objects.
> >>>>>> Also, as Piotr pointed out, it might also be good to have the
> original
> >>>> plan
> >>>>>> of t1, because in some cases it is possible to push filters down
> such
> >>>> that
> >>>>>> evaluating the query from scratch might be more efficient than
> >>> accessing
> >>>>>> the cache.
> >>>>>> Moreover, a CachedTable could extend Table() and offer a method
> >>>> refresh().
> >>>>>> This sounds quite useful in an interactive session mode.
> >>>>>>
> >>>>>> 3) Regarding the name, I can see both arguments. IMO, materialize()
> >>>> seems
> >>>>>> to be more future proof.
> >>>>>>
> >>>>>> Best, Fabian
> >>>>>>
> >>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang <
> >>>>>> wshaoxuan@xxxxxxxxx>:
> >>>>>>
> >>>>>>> Hi Piotr,
> >>>>>>>
> >>>>>>> Thanks for sharing your ideas on the method naming. We will think
> >>> about
> >>>>>>> your suggestions. But I don't understand why we need to change the
> >>>> return
> >>>>>>> type of cache().
> >>>>>>>
> >>>>>>> Cache() is a physical operation, it does not change the logic of
> >>>>>>> the `Table`. On the tableAPI layer, we should not introduce a new
> >>> table
> >>>>>>> type unless the logic of table has been changed. If we introduce a
> >>> new
> >>>>>>> table type `CachedTable`, we need create the same set of methods of
> >>>>>> `Table`
> >>>>>>> for it. I don't think it is worth doing this. Or can you please
> >>>> elaborate
> >>>>>>> more on what could be the "implicit behaviours/side effects" you
> are
> >>>>>>> thinking about?
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Shaoxuan
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski <
> >>>> piotr@xxxxxxxxxxxxxxxxx>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Becket,
> >>>>>>>>
> >>>>>>>> Thanks for the response.
> >>>>>>>>
> >>>>>>>> 1. I wasn’t saying that materialised view must be mutable or not.
> >>> The
> >>>>>>> same
> >>>>>>>> thing applies to caches as well. To the contrary, I would expect
> >>> more
> >>>>>>>> consistency and updates from something that is called “cache” vs
> >>>>>>> something
> >>>>>>>> that’s a “materialised view”. In other words, IMO most caches do
> not
> >>>>>>> serve
> >>>>>>>> you invalid/outdated data and they handle updates on their own.
> >>>>>>>>
> >>>>>>>> 2. I don’t think that having in the future two very similar
> concepts
> >>>> of
> >>>>>>>> `materialized` view and `cache` is a good idea. It would be
> >>> confusing
> >>>>>> for
> >>>>>>>> the users. I think it could be handled by variations/overloading
> of
> >>>>>>>> materialised view concept. We could start with:
> >>>>>>>>
> >>>>>>>> `MaterializedTable materialize()` - immutable, session life scope
> >>>>>>>> (basically the same semantic as you are proposing
> >>>>>>>>
> >>>>>>>> And then in the future (if ever) build on top of that/expand it
> >>> with:
> >>>>>>>>
> >>>>>>>> `MaterializedTable materialize(refreshTime=…)` or
> `MaterializedTable
> >>>>>>>> materialize(refreshHook=…)`
> >>>>>>>>
> >>>>>>>> Or with cross session support:
> >>>>>>>>
> >>>>>>>> `MaterializedTable materializeInto(connector=…)` or
> >>> `MaterializedTable
> >>>>>>>> materializeInto(tableFactory=…)`
> >>>>>>>>
> >>>>>>>> I’m not saying that we should implement cross session/refreshing
> now
> >>>> or
> >>>>>>>> even in the near future. I’m just arguing that naming current
> >>>> immutable
> >>>>>>>> session life scope method `materialize()` is more future proof and
> >>>> more
> >>>>>>>> consistent with SQL (on which after all table-api is heavily
> basing
> >>>>>> on).
> >>>>>>>>
> >>>>>>>> 3. Even if we agree on naming it `cache()`, I would still insist
> on
> >>>>>>>> `cache()` returning `CachedTable` handle to avoid implicit
> >>>>>>> behaviours/side
> >>>>>>>> effects and to give both us & users more flexibility.
> >>>>>>>>
> >>>>>>>> Piotrek
> >>>>>>>>
> >>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <becket.qin@xxxxxxxxx>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Just to add a little bit, the materialized view is probably more
> >>>>>>> similar
> >>>>>>>> to
> >>>>>>>>> the persistent() brought up earlier in the thread. So it is
> usually
> >>>>>>> cross
> >>>>>>>>> session and could be used in a larger scope. For example, a
> >>>>>>> materialized
> >>>>>>>>> view created by user A may be visible to user B. It is probably
> >>>>>>> something
> >>>>>>>>> we want to have in the future. I'll put it in the future work
> >>>>>> section.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>
> >>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <becket.qin@xxxxxxxxx
> >
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the explanation.
> >>>>>>>>>>
> >>>>>>>>>> Right now we are mostly thinking of the cached table as
> >>> immutable. I
> >>>>>>> can
> >>>>>>>>>> see the Materialized view would be useful in the future. That
> >>> said,
> >>>>>> I
> >>>>>>>> think
> >>>>>>>>>> a simple cache mechanism is probably still needed. So to me,
> >>> cache()
> >>>>>>> and
> >>>>>>>>>> materialize() should be two separate method as they address
> >>>>>> different
> >>>>>>>>>> needs. Materialize() is a higher level concept usually implying
> >>>>>>>> periodical
> >>>>>>>>>> update, while cache() has much simpler semantic. For example,
> one
> >>>>>> may
> >>>>>>>>>> create a materialized view and use cache() method in the
> >>>>>> materialized
> >>>>>>>> view
> >>>>>>>>>> creation logic. So that during the materialized view update,
> they
> >>> do
> >>>>>>> not
> >>>>>>>>>> need to worry about the case that the cached table is also
> >>> changed.
> >>>>>>>> Maybe
> >>>>>>>>>> under the hood, materialized() and cache() could share some
> >>>>>> mechanism,
> >>>>>>>> but
> >>>>>>>>>> I think a simple cache() method would be handy in a lot of
> cases.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>>
> >>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski <
> >>>>>>> piotr@xxxxxxxxxxxxxxxxx
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Becket,
> >>>>>>>>>>>
> >>>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable
> that
> >>>>>>> they
> >>>>>>>>>>> cannot do on a Table?
> >>>>>>>>>>>
> >>>>>>>>>>> Maybe not in the initial implementation, but various DBs offer
> >>>>>>>> different
> >>>>>>>>>>> ways to “refresh” the materialised view. Hooks, triggers,
> timers,
> >>>>>>>> manually
> >>>>>>>>>>> etc. Having `MaterializedTable` would help us to handle that in
> >>> the
> >>>>>>>> future.
> >>>>>>>>>>>
> >>>>>>>>>>>> After users call *table.cache(), *users can just use that
> table
> >>>>>> and
> >>>>>>> do
> >>>>>>>>>>> anything that is supported on a Table, including SQL.
> >>>>>>>>>>>
> >>>>>>>>>>> This is some implicit behaviour with side effects. Imagine if
> >>> user
> >>>>>>> has
> >>>>>>>> a
> >>>>>>>>>>> long and complicated program, that touches table `b` multiple
> >>>>>> times,
> >>>>>>>> maybe
> >>>>>>>>>>> scattered around different methods. If he modifies his program
> by
> >>>>>>>> inserting
> >>>>>>>>>>> in one place
> >>>>>>>>>>>
> >>>>>>>>>>> b.cache()
> >>>>>>>>>>>
> >>>>>>>>>>> This implicitly alters the semantic and behaviour of his code
> all
> >>>>>>> over
> >>>>>>>>>>> the place, maybe in a ways that might cause problems. For
> example
> >>>>>>> what
> >>>>>>>> if
> >>>>>>>>>>> underlying data is changing?
> >>>>>>>>>>>
> >>>>>>>>>>> Having invisible side effects is also not very clean, for
> example
> >>>>>>> think
> >>>>>>>>>>> about something like this (but more complicated):
> >>>>>>>>>>>
> >>>>>>>>>>> Table b = ...;
> >>>>>>>>>>>
> >>>>>>>>>>> If (some_condition) {
> >>>>>>>>>>> processTable1(b)
> >>>>>>>>>>> }
> >>>>>>>>>>> else {
> >>>>>>>>>>> processTable2(b)
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> // do more stuff with b
> >>>>>>>>>>>
> >>>>>>>>>>> And user adds `b.cache()` call to only one of the
> `processTable1`
> >>>>>> or
> >>>>>>>>>>> `processTable2` methods.
> >>>>>>>>>>>
> >>>>>>>>>>> On the other hand
> >>>>>>>>>>>
> >>>>>>>>>>> Table materialisedB = b.materialize()
> >>>>>>>>>>>
> >>>>>>>>>>> Avoids (at least some of) the side effect issues and forces
> user
> >>> to
> >>>>>>>>>>> explicitly use `materialisedB` where it’s appropriate and
> forces
> >>>>>> user
> >>>>>>>> to
> >>>>>>>>>>> think what does it actually mean. And if something doesn’t work
> >>> in
> >>>>>>> the
> >>>>>>>> end
> >>>>>>>>>>> for the user, he will know what has he changed instead of
> blaming
> >>>>>>>> Flink for
> >>>>>>>>>>> some “magic” underneath. In the above example, after
> >>> materialising
> >>>>>> b
> >>>>>>> in
> >>>>>>>>>>> only one of the methods, he should/would realise about the
> issue
> >>>>>> when
> >>>>>>>>>>> handling the return value `MaterializedTable` of that method.
> >>>>>>>>>>>
> >>>>>>>>>>> I guess it comes down to personal preferences if you like
> things
> >>> to
> >>>>>>> be
> >>>>>>>>>>> implicit or not. The more power is the user, probably the more
> >>>>>> likely
> >>>>>>>> he is
> >>>>>>>>>>> to like/understand implicit behaviour. And we as Table API
> >>>>>> designers
> >>>>>>>> are
> >>>>>>>>>>> the most power users out there, so I would proceed with caution
> >>> (so
> >>>>>>>> that we
> >>>>>>>>>>> do not end up in the crazy perl realm with it’s lovely implicit
> >>>>>>> method
> >>>>>>>>>>> arguments ;)  <https://stackoverflow.com/a/14922656/8149051>)
> >>>>>>>>>>>
> >>>>>>>>>>>> Table API to also support non-relational processing cases,
> >>> cache()
> >>>>>>>>>>> might be slightly better.
> >>>>>>>>>>>
> >>>>>>>>>>> I think even such extended Table API could benefit from
> sticking
> >>>>>>>> to/being
> >>>>>>>>>>> consistent with SQL where both SQL and Table API are basically
> >>> the
> >>>>>>>> same.
> >>>>>>>>>>>
> >>>>>>>>>>> One more thing. `MaterializedTable materialize()` could be more
> >>>>>>>>>>> powerful/flexible allowing the user to operate both on
> >>> materialised
> >>>>>>>> and not
> >>>>>>>>>>> materialised view at the same time for whatever reasons
> >>> (underlying
> >>>>>>>> data
> >>>>>>>>>>> changing/better optimisation opportunities after pushing down
> >>> more
> >>>>>>>> filters
> >>>>>>>>>>> etc). For example:
> >>>>>>>>>>>
> >>>>>>>>>>> Table b = …;
> >>>>>>>>>>>
> >>>>>>>>>>> MaterlizedTable mb = b.materialize();
> >>>>>>>>>>>
> >>>>>>>>>>> Val min = mb.min();
> >>>>>>>>>>> Val max = mb.max();
> >>>>>>>>>>>
> >>>>>>>>>>> Val user42 = b.filter(‘userId = 42);
> >>>>>>>>>>>
> >>>>>>>>>>> Could be more efficient compared to `b.cache()` if
> >>> `filter(‘userId
> >>>>>> =
> >>>>>>>>>>> 42);` allows for much more aggressive optimisations.
> >>>>>>>>>>>
> >>>>>>>>>>> Piotrek
> >>>>>>>>>>>
> >>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <fhueske@xxxxxxxxx>
> >>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm not suggesting to add support for Ignite. This was just an
> >>>>>>>> example.
> >>>>>>>>>>>> Plasma and Arrow sound interesting, too.
> >>>>>>>>>>>> For the sake of this proposal, it would be up to the user to
> >>>>>>>> implement a
> >>>>>>>>>>>> TableFactory and corresponding TableSource / TableSink classes
> >>> to
> >>>>>>>>>>> persist
> >>>>>>>>>>>> and read the data.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio Pompermaier
> <
> >>>>>>>>>>>> pompermaier@xxxxxxxx>:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as an
> alternative
> >>> to
> >>>>>>>>>>> Apache
> >>>>>>>>>>>>> Ignite?
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>
> https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske <
> >>>>>> fhueske@xxxxxxxxx>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the proposal!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> To summarize, you propose a new method Table.cache(): Table
> >>> that
> >>>>>>>> will
> >>>>>>>>>>>>>> trigger a job and write the result into some temporary
> storage
> >>>>>> as
> >>>>>>>>>>> defined
> >>>>>>>>>>>>>> by a TableFactory.
> >>>>>>>>>>>>>> The cache() call blocks while the job is running and
> >>> eventually
> >>>>>>>>>>> returns a
> >>>>>>>>>>>>>> Table object that represents a scan of the temporary table.
> >>>>>>>>>>>>>> When the "session" is closed (closing to be defined?), the
> >>>>>>> temporary
> >>>>>>>>>>>>> tables
> >>>>>>>>>>>>>> are all dropped.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think this behavior makes sense and is a good first step
> >>>>>> towards
> >>>>>>>>>>> more
> >>>>>>>>>>>>>> interactive workloads.
> >>>>>>>>>>>>>> However, its performance suffers from writing to and reading
> >>>>>> from
> >>>>>>>>>>>>> external
> >>>>>>>>>>>>>> systems.
> >>>>>>>>>>>>>> I think this is OK for now. Changes that would significantly
> >>>>>>> improve
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> situation (i.e., pinning data in-memory across jobs) would
> >>> have
> >>>>>>>> large
> >>>>>>>>>>>>>> impacts on many components of Flink.
> >>>>>>>>>>>>>> Users could use in-memory filesystems or storage grids
> (Apache
> >>>>>>>>>>> Ignite) to
> >>>>>>>>>>>>>> mitigate some of the performance effects.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin <
> >>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> >>>>>>>>>>>>>>> :
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the explanation, Piotrek.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable
> >>>>>> that
> >>>>>>>> they
> >>>>>>>>>>>>>>> cannot do on a Table? After users call *table.cache(),
> *users
> >>>>>> can
> >>>>>>>>>>> just
> >>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>> that table and do anything that is supported on a Table,
> >>>>>>> including
> >>>>>>>>>>> SQL.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Naming wise, either cache() or materialize() sounds fine to
> >>> me.
> >>>>>>>>>>> cache()
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>> a bit more general than materialize(). Given that we are
> >>>>>>> enhancing
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> Table API to also support non-relational processing cases,
> >>>>>>> cache()
> >>>>>>>>>>>>> might
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> slightly better.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski <
> >>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Becket,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to reuse
> existing
> >>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed that you
> >>> want
> >>>>>> to
> >>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>> alternate way of writing the data.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Now that I hopefully understand the proposal, maybe we
> could
> >>>>>>>> rename
> >>>>>>>>>>>>>>>> `cache()` to
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> void materialize()
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> or going step further
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> MaterializedTable materialize()
> >>>>>>>>>>>>>>>> MaterializedTable createMaterializedView()
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> ?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The second option with returning a handle I think is more
> >>>>>>> flexible
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> could provide features such as “refresh”/“delete” or
> >>> generally
> >>>>>>>>>>>>> speaking
> >>>>>>>>>>>>>>>> manage the the view. In the future we could also think
> about
> >>>>>>>> adding
> >>>>>>>>>>>>>> hooks
> >>>>>>>>>>>>>>>> to automatically refresh view etc. It is also more
> explicit
> >>> -
> >>>>>>>>>>>>>>>> materialization returning a new table handle will not have
> >>> the
> >>>>>>>> same
> >>>>>>>>>>>>>>>> implicit side effects as adding a simple line of code like
> >>>>>>>>>>>>> `b.cache()`
> >>>>>>>>>>>>>>>> would have.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It would also be more SQL like, making it more intuitive
> for
> >>>>>>> users
> >>>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>> familiar with the SQL.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin <
> becket.qin@xxxxxxxxx
> >>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is equivalent to
> >>>>>>> creating
> >>>>>>>> a
> >>>>>>>>>>>>>>>> BUILT-IN
> >>>>>>>>>>>>>>>>> materialized view with a lifecycle. That functionality is
> >>>>>>> missing
> >>>>>>>>>>>>>>> today,
> >>>>>>>>>>>>>>>>> though. Not sure if I understand your question. Do you
> mean
> >>>>>> we
> >>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> the functionality and just need a syntax sugar?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> What's more interesting in the proposal is do we want to
> >>> stop
> >>>>>>> at
> >>>>>>>>>>>>>>> creating
> >>>>>>>>>>>>>>>>> the materialized view? Or do we want to extend that in
> the
> >>>>>>> future
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>> useful unified data store distributed with Flink? And do
> we
> >>>>>>> want
> >>>>>>>> to
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> mechanism allow more flexible user job pattern with their
> >>> own
> >>>>>>>> user
> >>>>>>>>>>>>>>>> defined
> >>>>>>>>>>>>>>>>> services. These considerations are much more
> architectural.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski <
> >>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand the problem.
> >>>>>> Isn’t
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data to a sink
> and
> >>>>>>> later
> >>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live scope/live
> >>> time?
> >>>>>>> And
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> sink
> >>>>>>>>>>>>>>>>>> could be implemented as in memory or a file sink?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a materialised
> >>> view
> >>>>>>>> from a
> >>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>> “b” (from your document’s example) and reusing this
> >>>>>>> materialised
> >>>>>>>>>>>>>> view
> >>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to clean up
> >>>>>>> materialised
> >>>>>>>>>>>>>> views
> >>>>>>>>>>>>>>>> (for
> >>>>>>>>>>>>>>>>>> example when current session finishes)? Maybe we need
> some
> >>>>>>>>>>>>> syntactic
> >>>>>>>>>>>>>>>> sugar
> >>>>>>>>>>>>>>>>>> on top of it?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin <
> >>> becket.qin@xxxxxxxxx
> >>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a persist() with
> >>>>>>>>>>>>>>> lifecycle/defined
> >>>>>>>>>>>>>>>>>>> scope. I just added a section in the future work for
> >>> this.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <
> >>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Jiangjie,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the name of
> >>>>>> `cache()`, I
> >>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>> why
> >>>>>>>>>>>>>>>>>>>> you designed this way!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a lifecycle for
> >>>>>> data
> >>>>>>>>>>>>>>>> persistence?
> >>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so that the
> >>> user
> >>>>>>> is
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> worried
> >>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify the time
> range
> >>>>>> for
> >>>>>>>>>>>>>> keeping
> >>>>>>>>>>>>>>>>>> time.
> >>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we can also
> >>> share
> >>>>>>> in a
> >>>>>>>>>>>>>>> certain
> >>>>>>>>>>>>>>>>>>>> group of session, for example:
> >>>>>>> LifeCycle.SESSION_GROUP(...), I
> >>>>>>>>>>>>> am
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> sure,
> >>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference only!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Bests,
> >>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月23日周五
> >>>>>> 下午1:33写道:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Re: Jincheng,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache() v.s.
> >>>>>> persist(),
> >>>>>>>>>>>>>>>> personally I
> >>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately describing the
> >>>>>> behavior,
> >>>>>>>>>>>>> i.e.
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> Table
> >>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be deleted after
> >>> the
> >>>>>>>>>>>>> session
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> closed.
> >>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as people might
> >>> think
> >>>>>>> the
> >>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>> still be there even after the session is gone.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and stream
> >>> processing
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>> job.
> >>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that goal. I
> imagine
> >>>>>> that
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> huge
> >>>>>>>>>>>>>>>>>>>>> change across the board, including sources, operators
> >>> and
> >>>>>>>>>>>>>>>>>> optimizations,
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several separate
> >>> in-depth
> >>>>>>>>>>>>>>> discussions.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <
> >>>>>>>>>>>>> xingcanc@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access domain
> are
> >>>>>> both
> >>>>>>>>>>>>>>>> orthogonal
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may be the
> first
> >>>>>> time
> >>>>>>>> we
> >>>>>>>>>>>>>> plan
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism other than the
> >>>>>> state.
> >>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>> it’s
> >>>>>>>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then concentrate on
> a
> >>>>>>>> specific
> >>>>>>>>>>>>>>> part?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned with the
> >>>>>>>> underlying
> >>>>>>>>>>>>>>>>>> service.
> >>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to the
> existing
> >>>>>>>>>>>>> codebase.
> >>>>>>>>>>>>>> As
> >>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>> claimed, the service should be extendible to support
> >>>>>> other
> >>>>>>>>>>>>>>>> components
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another thread.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more
> interactive
> >>>>>>> Table
> >>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service mechanism.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>> Xingcan
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <
> >>>>>>>>>>>>>> xiaoweij@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table for clean
> up
> >>>>>> is
> >>>>>>>> not
> >>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>> reliable.
> >>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be executed
> >>>>>>>> successfully.
> >>>>>>>>>>>>> We
> >>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>>> risk
> >>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that it's safer to
> >>>>>> have
> >>>>>>> an
> >>>>>>>>>>>>>>>>>>>> association
> >>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So we can always
> >>>>>> clean
> >>>>>>>> up
> >>>>>>>>>>>>>> temp
> >>>>>>>>>>>>>>>>>>>>> tables
> >>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any active
> >>>>>> sessions.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
> >>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great proposal!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful and user
> >>>>>> friendly
> >>>>>>>> in
> >>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>> examples.
> >>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has to be
> >>>>>> executed
> >>>>>>> in
> >>>>>>>>>>>>>>> several
> >>>>>>>>>>>>>>>>>>>>>> stages
> >>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline of Flink
> ML,
> >>> in
> >>>>>>>> order
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> utilize
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we have to
> submit a
> >>>>>> job
> >>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>> env.execute().
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> About the `cache()`  , I think is better to named
> >>>>>>>>>>>>> `persist()`,
> >>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we internally
> >>> cache
> >>>>>>> in
> >>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the data into
> state
> >>>>>>>> backend
> >>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.)
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the future,
> >>> support
> >>>>>>> for
> >>>>>>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job will also
> >>> benefit
> >>>>>>> in
> >>>>>>>>>>>>>>>>>>>> "Interactive
> >>>>>>>>>>>>>>>>>>>>>>>> Programming",  I am looking forward to your JIRAs
> >>> and
> >>>>>>>> FLIP!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月20日周二
> >>>>>>>> 下午9:56写道:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have pointed out,
> it
> >>>>>> is a
> >>>>>>>>>>>>>>> promising
> >>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API in various
> >>>>>>>> aspects,
> >>>>>>>>>>>>>>>>>>>> including
> >>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among others. One
> of
> >>>>>> the
> >>>>>>>>>>>>>>> scenarios
> >>>>>>>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive
> >>> programming.
> >>>>>> To
> >>>>>>>>>>>>>> explain
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> issues
> >>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the solution, we
> >>> put
> >>>>>>>>>>>>>> together
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome!
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >
> >
>
>
>