osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Support Interactive Programming in Flink Table API


Hi Becket,

Introducing CacheHandle seems too complicated. That means users have to
maintain Handler properly.

And since cache is just a hint for optimizer, why not just return Table
itself for cache method. This hint info should be kept in Table I believe.

So how about adding method cache and uncache for Table, and both return
Table. Because what cache and uncache did is just adding some hint info
into Table.




Becket Qin <becket.qin@xxxxxxxxx> 于2018年12月12日周三 上午11:25写道:

> Hi Till and Piotrek,
>
> Thanks for the clarification. That solves quite a few confusion. My
> understanding of how cache works is same as what Till describe. i.e.
> cache() is a hint to Flink, but it is not guaranteed that cache always
> exist and it might be recomputed from its lineage.
>
> Is this the core of our disagreement here? That you would like this
> > “cache()” to be mostly hint for the optimiser?
>
> Semantic wise, yes. That's also why I think materialize() has a much larger
> scope than cache(), thus it should be a different method.
>
> Regarding the chance of optimization, it might not be that rare. Some very
> simple statistics could already help in many cases. For example, simply
> maintaining max and min of each fields can already eliminate some
> unnecessary table scan (potentially scanning the cached table) if the
> result is doomed to be empty. A histogram would give even further
> information. The optimizer could be very careful and only ignores cache
> when it is 100% sure doing that is cheaper. e.g. only when a filter on the
> cache will absolutely return nothing.
>
> Given the above clarification on cache, I would like to revisit the
> original "void cache()" proposal and see if we can improve on top of that.
>
> What do you think about the following modified interface?
>
> Table {
>   /**
>    * This call hints Flink to maintain a cache of this table and leverage
> it for performance optimization if needed.
>    * Note that Flink may still decide to not use the cache if it is cheaper
> by doing so.
>    *
>    * A CacheHandle will be returned to allow user release the cache
> actively. The cache will be deleted if there
>    * is no unreleased cache handlers to it. When the TableEnvironment is
> closed. The cache will also be deleted
>    * and all the cache handlers will be released.
>    *
>    * @return a CacheHandle referring to the cache of this table.
>    */
>   CacheHandle cache();
> }
>
> CacheHandle {
>   /**
>    * Close the cache handle. This method does not necessarily deletes the
> cache. Instead, it simply decrements the reference counter to the cache.
>    * When the there is no handle referring to a cache. The cache will be
> deleted.
>    *
>    * @return the number of open handles to the cache after this handle has
> been released.
>    */
>   int release()
> }
>
> The rationale behind this interface is following:
> In vast majority of the cases, users wouldn't really care whether the cache
> is used or not. So I think the most intuitive way is letting cache() return
> nothing. So nobody needs to worry about the difference between operations
> on CacheTables and those on the "original" tables. This will make maybe
> 99.9% of the users happy. There were two concerns raised for this approach:
> 1. In some rare cases, users may want to ignore cache,
> 2. A table might be cached/uncached in a third party function while the
> caller does not know.
>
> For the first issue, users can use hint("ignoreCache") to explicitly ignore
> cache.
> For the second issue, the above proposal lets cache() return a CacheHandle,
> the only method in it is release(). Different CacheHandles will refer to
> the same cache, if a cache no longer has any cache handle, it will be
> deleted. This will address the following case:
> {
>   val handle1 = a.cache()
>   process(a)
>   a.select(...) // cache is still available, handle1 has not been released.
> }
>
> void process(Table t) {
>   val handle2 = t.cache() // new handle to cache
>   t.select(...) // optimizer decides cache usage
>   t.hint("ignoreCache").select(...) // cache is ignored
>   handle2.release() // release the handle, but the cache may still be
> available if there are other handles
>   ...
> }
>
> Does the above modified approach look reasonable to you?
>
> Cheers,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
>
>
> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <trohrmann@xxxxxxxxxx>
> wrote:
>
> > Hi Becket,
> >
> > I was aiming at semantics similar to 1. I actually thought that `cache()`
> > would tell the system to materialize the intermediate result so that
> > subsequent queries don't need to reprocess it. This means that the usage
> of
> > the cached table in this example
> >
> > {
> >  val cachedTable = a.cache()
> >  val b1 = cachedTable.select(…)
> >  val b2 = cachedTable.foo().select(…)
> >  val b3 = cachedTable.bar().select(...)
> >  val c1 = a.select(…)
> >  val c2 = a.foo().select(…)
> >  val c3 = a.bar().select(...)
> > }
> >
> > strongly depends on interleaved calls which trigger the execution of sub
> > queries. So for example, if there is only a single env.execute call at
> the
> > end of  block, then b1, b2, b3, c1, c2 and c3 would all be computed by
> > reading directly from the sources (given that there is only a single
> > JobGraph). It just happens that the result of `a` will be cached such
> that
> > we skip the processing of `a` when there are subsequent queries reading
> > from `cachedTable`. If for some reason the system cannot materialize the
> > table (e.g. running out of disk space, ttl expired), then it could also
> > happen that we need to reprocess `a`. In that sense `cachedTable` simply
> is
> > an identifier for the materialized result of `a` with the lineage how to
> > reprocess it.
> >
> > Cheers,
> > Till
> >
> >
> >
> >
> >
> > On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx
> >
> > wrote:
> >
> > > Hi Becket,
> > >
> > > > {
> > > >  val cachedTable = a.cache()
> > > >  val b = cachedTable.select(...)
> > > >  val c = a.select(...)
> > > > }
> > > >
> > > > Semantic 1. b uses cachedTable as user demanded so. c uses original
> DAG
> > > as
> > > > user demanded so. In this case, the optimizer has no chance to
> > optimize.
> > > > Semantic 2. b uses cachedTable as user demanded so. c leaves the
> > > optimizer
> > > > to choose whether the cache or DAG should be used. In this case, user
> > > lose
> > > > the option to NOT use cache.
> > > >
> > > > As you can see, neither of the options seem perfect. However, I guess
> > you
> > > > and Till are proposing the third option:
> > > >
> > > > Semantic 3. b leaves the optimizer to choose whether cache or DAG
> > should
> > > be
> > > > used. c always use the DAG.
> > >
> > > I am pretty sure that me, Till, Fabian and others were all proposing
> and
> > > advocating in favour of semantic “1”. No cost based optimiser decisions
> > at
> > > all.
> > >
> > > {
> > >  val cachedTable = a.cache()
> > >  val b1 = cachedTable.select(…)
> > >  val b2 = cachedTable.foo().select(…)
> > >  val b3 = cachedTable.bar().select(...)
> > >  val c1 = a.select(…)
> > >  val c2 = a.foo().select(…)
> > >  val c3 = a.bar().select(...)
> > > }
> > >
> > > All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are
> > > re-executing whole plan for “a”.
> > >
> > > In the future we could discuss going one step further, introducing some
> > > global optimisation (that can be manually enabled/disabled):
> deduplicate
> > > plan nodes/deduplicate sub queries/re-use sub queries results/or
> whatever
> > > we could call it. It could do two things:
> > >
> > > 1. Automatically try to deduplicate fragments of the plan and share the
> > > result using CachedTable - in other words automatically insert
> > `CachedTable
> > > cache()` calls.
> > > 2. Automatically make decision to bypass explicit `CachedTable` access
> > > (this would be the equivalent of what you described as “semantic 3”).
> > >
> > > However as I wrote previously, I have big doubts if such cost-based
> > > optimisation would work (this applies also to “Semantic 2”). I would
> > expect
> > > it to do more harm than good in so many cases, that it wouldn’t make
> > sense.
> > > Even assuming that we calculate statistics perfectly (this ain’t gonna
> > > happen), it’s virtually impossible to correctly estimate correct
> exchange
> > > rate of CPU cycles vs IO operations as it is changing so much from
> > > deployment to deployment.
> > >
> > > Is this the core of our disagreement here? That you would like this
> > > “cache()” to be mostly hint for the optimiser?
> > >
> > > Piotrek
> > >
> > > > On 11 Dec 2018, at 06:00, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> > > >
> > > > Another potential concern for semantic 3 is that. In the future, we
> may
> > > add
> > > > automatic caching to Flink. e.g. cache the intermediate results at
> the
> > > > shuffle boundary. If our semantic is that reference to the original
> > table
> > > > means skipping cache, those users may not be able to benefit from the
> > > > implicit cache.
> > > >
> > > >
> > > >
> > > > On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <becket.qin@xxxxxxxxx>
> > > wrote:
> > > >
> > > >> Hi Piotrek,
> > > >>
> > > >> Thanks for the reply. Thought about it again, I might have
> > misunderstood
> > > >> your proposal in earlier emails. Returning a CachedTable might not
> be
> > a
> > > bad
> > > >> idea.
> > > >>
> > > >> I was more concerned about the semantic and its intuitiveness when a
> > > >> CachedTable is returned. i..e, if cache() returns CachedTable. What
> > are
> > > the
> > > >> semantic in the following code:
> > > >> {
> > > >>  val cachedTable = a.cache()
> > > >>  val b = cachedTable.select(...)
> > > >>  val c = a.select(...)
> > > >> }
> > > >> What is the difference between b and c? At the first glance, I see
> two
> > > >> options:
> > > >>
> > > >> Semantic 1. b uses cachedTable as user demanded so. c uses original
> > DAG
> > > as
> > > >> user demanded so. In this case, the optimizer has no chance to
> > optimize.
> > > >> Semantic 2. b uses cachedTable as user demanded so. c leaves the
> > > optimizer
> > > >> to choose whether the cache or DAG should be used. In this case,
> user
> > > lose
> > > >> the option to NOT use cache.
> > > >>
> > > >> As you can see, neither of the options seem perfect. However, I
> guess
> > > you
> > > >> and Till are proposing the third option:
> > > >>
> > > >> Semantic 3. b leaves the optimizer to choose whether cache or DAG
> > should
> > > >> be used. c always use the DAG.
> > > >>
> > > >> This does address all the concerns. It is just that from
> intuitiveness
> > > >> perspective, I found that asking user to explicitly use a
> CachedTable
> > > while
> > > >> the optimizer might choose to ignore is a little weird. That was
> why I
> > > did
> > > >> not think about that semantic. But given there is material benefit,
> I
> > > think
> > > >> this semantic is acceptable.
> > > >>
> > > >> 1. If we want to let optimiser make decisions whether to use cache
> or
> > > not,
> > > >>> then why do we need “void cache()” method at all? Would It
> > “increase”
> > > the
> > > >>> chance of using the cache? That’s sounds strange. What would be the
> > > >>> mechanism of deciding whether to use the cache or not? If we want
> to
> > > >>> introduce such kind  automated optimisations of “plan nodes
> > > deduplication”
> > > >>> I would turn it on globally, not per table, and let the optimiser
> do
> > > all of
> > > >>> the work.
> > > >>> 2. We do not have statistics at the moment for any use/not use
> cache
> > > >>> decision.
> > > >>> 3. Even if we had, I would be veeerryy sceptical whether such cost
> > > based
> > > >>> optimisations would work properly and I would still insist first on
> > > >>> providing explicit caching mechanism (`CachedTable cache()`)
> > > >>>
> > > >> We are absolutely on the same page here. An explicit cache() method
> is
> > > >> necessary not only because optimizer may not be able to make the
> right
> > > >> decision, but also because of the nature of interactive programming.
> > For
> > > >> example, if users write the following code in Scala shell:
> > > >>  val b = a.select(...)
> > > >>  val c = b.select(...)
> > > >>  val d = c.select(...).writeToSink(...)
> > > >>  tEnv.execute()
> > > >> There is no way optimizer will know whether b or c will be used in
> > later
> > > >> code, unless users hint explicitly.
> > > >>
> > > >> At the same time I’m not sure if you have responded to our
> objections
> > of
> > > >>> `void cache()` being implicit/having side effects, which me, Jark,
> > > Fabian,
> > > >>> Till and I think also Shaoxuan are supporting.
> > > >>
> > > >> Is there any other side effects if we use semantic 3 mentioned
> above?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> JIangjie (Becket) Qin
> > > >>
> > > >>
> > > >> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <
> > piotr@xxxxxxxxxxxxxxxxx
> > > >
> > > >> wrote:
> > > >>
> > > >>> Hi Becket,
> > > >>>
> > > >>> Sorry for not responding long time.
> > > >>>
> > > >>> Regarding case1.
> > > >>>
> > > >>> There wouldn’t be no “a.unCache()” method, but I would expect only
> > > >>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t
> affect
> > > >>> `cachedTableA2`. Just as in any other database dropping modifying
> one
> > > >>> independent table/materialised view does not affect others.
> > > >>>
> > > >>>> What I meant is that assuming there is already a cached table,
> > ideally
> > > >>> users need
> > > >>>> not to specify whether the next query should read from the cache
> or
> > > use
> > > >>> the
> > > >>>> original DAG. This should be decided by the optimizer.
> > > >>>
> > > >>> 1. If we want to let optimiser make decisions whether to use cache
> or
> > > >>> not, then why do we need “void cache()” method at all? Would It
> > > “increase”
> > > >>> the chance of using the cache? That’s sounds strange. What would be
> > the
> > > >>> mechanism of deciding whether to use the cache or not? If we want
> to
> > > >>> introduce such kind  automated optimisations of “plan nodes
> > > deduplication”
> > > >>> I would turn it on globally, not per table, and let the optimiser
> do
> > > all of
> > > >>> the work.
> > > >>> 2. We do not have statistics at the moment for any use/not use
> cache
> > > >>> decision.
> > > >>> 3. Even if we had, I would be veeerryy sceptical whether such cost
> > > based
> > > >>> optimisations would work properly and I would still insist first on
> > > >>> providing explicit caching mechanism (`CachedTable cache()`)
> > > >>> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t
> > > >>> contradict future work on automated cost based caching.
> > > >>>
> > > >>>
> > > >>> At the same time I’m not sure if you have responded to our
> objections
> > > of
> > > >>> `void cache()` being implicit/having side effects, which me, Jark,
> > > Fabian,
> > > >>> Till and I think also Shaoxuan are supporting.
> > > >>>
> > > >>> Piotrek
> > > >>>
> > > >>>> On 5 Dec 2018, at 12:42, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> > > >>>>
> > > >>>> Hi Till,
> > > >>>>
> > > >>>> It is true that after the first job submission, there will be no
> > > >>> ambiguity
> > > >>>> in terms of whether a cached table is used or not. That is the
> same
> > > for
> > > >>> the
> > > >>>> cache() without returning a CachedTable.
> > > >>>>
> > > >>>> Conceptually one could think of cache() as introducing a caching
> > > >>> operator
> > > >>>>> from which you need to consume from if you want to benefit from
> the
> > > >>> caching
> > > >>>>> functionality.
> > > >>>>
> > > >>>> I am thinking a little differently. I think it is a hint (as you
> > > >>> mentioned
> > > >>>> later) instead of a new operator. I'd like to be careful about the
> > > >>> semantic
> > > >>>> of the API. A hint is a property set on an existing operator, but
> is
> > > not
> > > >>>> itself an operator as it does not really manipulate the data.
> > > >>>>
> > > >>>> I agree, ideally the optimizer makes this kind of decision which
> > > >>>>> intermediate result should be cached. But especially when
> executing
> > > >>> ad-hoc
> > > >>>>> queries the user might better know which results need to be
> cached
> > > >>> because
> > > >>>>> Flink might not see the full DAG. In that sense, I would consider
> > the
> > > >>>>> cache() method as a hint for the optimizer. Of course, in the
> > future
> > > we
> > > >>>>> might add functionality which tries to automatically cache
> results
> > > >>> (e.g.
> > > >>>>> caching the latest intermediate results until so and so much
> space
> > is
> > > >>>>> used). But this should hopefully not contradict with `CachedTable
> > > >>> cache()`.
> > > >>>>
> > > >>>> I agree that cache() method is needed for exactly the reason you
> > > >>> mentioned,
> > > >>>> i.e. Flink cannot predict what users are going to write later, so
> > > users
> > > >>>> need to tell Flink explicitly that this table will be used later.
> > > What I
> > > >>>> meant is that assuming there is already a cached table, ideally
> > users
> > > >>> need
> > > >>>> not to specify whether the next query should read from the cache
> or
> > > use
> > > >>> the
> > > >>>> original DAG. This should be decided by the optimizer.
> > > >>>>
> > > >>>> To explain the difference between returning / not returning a
> > > >>> CachedTable,
> > > >>>> I want compare the following two case:
> > > >>>>
> > > >>>> *Case 1:  returning a CachedTable*
> > > >>>> b = a.map(...)
> > > >>>> val cachedTableA1 = a.cache()
> > > >>>> val cachedTableA2 = a.cache()
> > > >>>> b.print() // Just to make sure a is cached.
> > > >>>>
> > > >>>> c = a.filter(...) // User specify that the original DAG is used?
> Or
> > > the
> > > >>>> optimizer decides whether DAG or cache should be used?
> > > >>>> d = cachedTableA1.filter() // User specify that the cached table
> is
> > > >>> used.
> > > >>>>
> > > >>>> a.unCache() // Can cachedTableA still be used afterwards?
> > > >>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?
> > > >>>>
> > > >>>> *Case 2: not returning a CachedTable*
> > > >>>> b = a.map()
> > > >>>> a.cache()
> > > >>>> a.cache() // no-op
> > > >>>> b.print() // Just to make sure a is cached
> > > >>>>
> > > >>>> c = a.filter(...) // Optimizer decides whether the cache or DAG
> > should
> > > >>> be
> > > >>>> used
> > > >>>> d = a.filter(...) // Optimizer decides whether the cache or DAG
> > should
> > > >>> be
> > > >>>> used
> > > >>>>
> > > >>>> a.unCache()
> > > >>>> a.unCache() // no-op
> > > >>>>
> > > >>>> In case 1, semantic wise, optimizer lose the option to choose
> > between
> > > >>> DAG
> > > >>>> and cache. And the unCache() call becomes tricky.
> > > >>>> In case 2, users do not need to worry about whether cache or DAG
> is
> > > >>> used.
> > > >>>> And the unCache() semantic is clear. However, the caveat is that
> > users
> > > >>>> cannot explicitly ignore the cache.
> > > >>>>
> > > >>>> In order to address the issues mentioned in case 2 and inspired by
> > the
> > > >>>> discussion so far, I am thinking about using hint to allow user
> > > >>> explicitly
> > > >>>> ignore cache. Although we do not have hint yet, but we probably
> > should
> > > >>> have
> > > >>>> one. So the code becomes:
> > > >>>>
> > > >>>> *Case 3: returning this table*
> > > >>>> b = a.map()
> > > >>>> a.cache()
> > > >>>> a.cache() // no-op
> > > >>>> b.print() // Just to make sure a is cached
> > > >>>>
> > > >>>> c = a.filter(...) // Optimizer decides whether the cache or DAG
> > should
> > > >>> be
> > > >>>> used
> > > >>>> d = a.hint("ignoreCache").filter(...) // DAG will be used instead
> of
> > > the
> > > >>>> cache.
> > > >>>>
> > > >>>> a.unCache()
> > > >>>> a.unCache() // no-op
> > > >>>>
> > > >>>> We could also let cache() return this table to allow chained
> method
> > > >>> calls.
> > > >>>> Do you think this API addresses the concerns?
> > > >>>>
> > > >>>> Thanks,
> > > >>>>
> > > >>>> Jiangjie (Becket) Qin
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imjark@xxxxxxxxx> wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> All the recent discussions are focused on whether there is a
> > problem
> > > if
> > > >>>>> cache() not return a Table.
> > > >>>>> It seems that returning a Table explicitly is more clear (and
> > safe?).
> > > >>>>>
> > > >>>>> So whether there are any problems if cache() returns a Table?
> > > @Becket
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Jark
> > > >>>>>
> > > >>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <trohrmann@xxxxxxxxxx
> >
> > > >>> wrote:
> > > >>>>>
> > > >>>>>> It's true that b, c, d and e will all read from the original DAG
> > > that
> > > >>>>>> generates a. But all subsequent operators (when running multiple
> > > >>> queries)
> > > >>>>>> which reference cachedTableA should not need to reproduce `a`
> but
> > > >>>>> directly
> > > >>>>>> consume the intermediate result.
> > > >>>>>>
> > > >>>>>> Conceptually one could think of cache() as introducing a caching
> > > >>> operator
> > > >>>>>> from which you need to consume from if you want to benefit from
> > the
> > > >>>>> caching
> > > >>>>>> functionality.
> > > >>>>>>
> > > >>>>>> I agree, ideally the optimizer makes this kind of decision which
> > > >>>>>> intermediate result should be cached. But especially when
> > executing
> > > >>>>> ad-hoc
> > > >>>>>> queries the user might better know which results need to be
> cached
> > > >>>>> because
> > > >>>>>> Flink might not see the full DAG. In that sense, I would
> consider
> > > the
> > > >>>>>> cache() method as a hint for the optimizer. Of course, in the
> > future
> > > >>> we
> > > >>>>>> might add functionality which tries to automatically cache
> results
> > > >>> (e.g.
> > > >>>>>> caching the latest intermediate results until so and so much
> space
> > > is
> > > >>>>>> used). But this should hopefully not contradict with
> `CachedTable
> > > >>>>> cache()`.
> > > >>>>>>
> > > >>>>>> Cheers,
> > > >>>>>> Till
> > > >>>>>>
> > > >>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <becket.qin@xxxxxxxxx
> >
> > > >>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Till,
> > > >>>>>>>
> > > >>>>>>> Thanks for the clarification. I am still a little confused.
> > > >>>>>>>
> > > >>>>>>> If cache() returns a CachedTable, the example might become:
> > > >>>>>>>
> > > >>>>>>> b = a.map(...)
> > > >>>>>>> c = a.map(...)
> > > >>>>>>>
> > > >>>>>>> cachedTableA = a.cache()
> > > >>>>>>> d = cachedTableA.map(...)
> > > >>>>>>> e = a.map()
> > > >>>>>>>
> > > >>>>>>> In the above case, if cache() is lazily evaluated, b, c, d and
> e
> > > are
> > > >>>>> all
> > > >>>>>>> going to be reading from the original DAG that generates a. But
> > > with
> > > >>> a
> > > >>>>>>> naive expectation, d should be reading from the cache. This
> seems
> > > not
> > > >>>>>>> solving the potential confusion you raised, right?
> > > >>>>>>>
> > > >>>>>>> Just to be clear, my understanding are all based on the
> > assumption
> > > >>> that
> > > >>>>>> the
> > > >>>>>>> tables are immutable. Therefore, after a.cache(), a the
> > > >>> c*achedTableA*
> > > >>>>>> and
> > > >>>>>>> original table *a * should be completely interchangeable.
> > > >>>>>>>
> > > >>>>>>> That said, I think a valid argument is optimization. There are
> > > indeed
> > > >>>>>> cases
> > > >>>>>>> that reading from the original DAG could be faster than reading
> > > from
> > > >>>>> the
> > > >>>>>>> cache. For example, in the following example:
> > > >>>>>>>
> > > >>>>>>> a.filter(f1' > 100)
> > > >>>>>>> a.cache()
> > > >>>>>>> b = a.filter(f1' < 100)
> > > >>>>>>>
> > > >>>>>>> Ideally the optimizer should be intelligent enough to decide
> > which
> > > >>> way
> > > >>>>> is
> > > >>>>>>> faster, without user intervention. In this case, it will
> identify
> > > >>> that
> > > >>>>> b
> > > >>>>>>> would just be an empty table, thus skip reading from the cache
> > > >>>>>> completely.
> > > >>>>>>> But I agree that returning a CachedTable would give user the
> > > control
> > > >>> of
> > > >>>>>>> when to use cache, even though I still feel that letting the
> > > >>> optimizer
> > > >>>>>>> handle this is a better option in long run.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>>
> > > >>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <
> > trohrmann@xxxxxxxxxx
> > > >
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Yes you are right Becket that it still depends on the actual
> > > >>>>> execution
> > > >>>>>> of
> > > >>>>>>>> the job whether a consumer reads from a cached result or not.
> > > >>>>>>>>
> > > >>>>>>>> My point was actually about the properties of a (cached vs.
> > > >>>>> non-cached)
> > > >>>>>>> and
> > > >>>>>>>> not about the execution. I would not make cache trigger the
> > > >>> execution
> > > >>>>>> of
> > > >>>>>>>> the job because one loses some flexibility by eagerly
> triggering
> > > the
> > > >>>>>>>> execution.
> > > >>>>>>>>
> > > >>>>>>>> I tried to argue for an explicit CachedTable which is returned
> > by
> > > >>> the
> > > >>>>>>>> cache() method like Piotr did in order to make the API more
> > > >>> explicit.
> > > >>>>>>>>
> > > >>>>>>>> Cheers,
> > > >>>>>>>> Till
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <
> becket.qin@xxxxxxxxx
> > >
> > > >>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Till,
> > > >>>>>>>>>
> > > >>>>>>>>> That is a good example. Just a minor correction, in this
> case,
> > > b, c
> > > >>>>>>> and d
> > > >>>>>>>>> will all consume from a non-cached a. This is because cache
> > will
> > > >>>>> only
> > > >>>>>>> be
> > > >>>>>>>>> created on the very first job submission that generates the
> > table
> > > >>>>> to
> > > >>>>>> be
> > > >>>>>>>>> cached.
> > > >>>>>>>>>
> > > >>>>>>>>> If I understand correctly, this is example is about whether
> > > >>>>> .cache()
> > > >>>>>>>> method
> > > >>>>>>>>> should be eagerly evaluated or lazily evaluated. In another
> > word,
> > > >>>>> if
> > > >>>>>>>>> cache() method actually triggers a job that creates the
> cache,
> > > >>>>> there
> > > >>>>>>> will
> > > >>>>>>>>> be no such confusion. Is that right?
> > > >>>>>>>>>
> > > >>>>>>>>> In the example, although d will not consume from the cached
> > Table
> > > >>>>>> while
> > > >>>>>>>> it
> > > >>>>>>>>> looks supposed to, from correctness perspective the code will
> > > still
> > > >>>>>>>> return
> > > >>>>>>>>> correct result, assuming that tables are immutable.
> > > >>>>>>>>>
> > > >>>>>>>>> Personally I feel it is OK because users probably won't
> really
> > > >>>>> worry
> > > >>>>>>>> about
> > > >>>>>>>>> whether the table is cached or not. And lazy cache could
> avoid
> > > some
> > > >>>>>>>>> unnecessary caching if a cached table is never created in the
> > > user
> > > >>>>>>>>> application. But I am not opposed to do eager evaluation of
> > > cache.
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks,
> > > >>>>>>>>>
> > > >>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <
> > > >>>>> trohrmann@xxxxxxxxxx>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Another argument for Piotr's point is that lazily changing
> > > >>>>>> properties
> > > >>>>>>>> of
> > > >>>>>>>>> a
> > > >>>>>>>>>> node affects all down stream consumers but does not
> > necessarily
> > > >>>>>> have
> > > >>>>>>> to
> > > >>>>>>>>>> happen before these consumers are defined. From a user's
> > > >>>>>> perspective
> > > >>>>>>>> this
> > > >>>>>>>>>> can be quite confusing:
> > > >>>>>>>>>>
> > > >>>>>>>>>> b = a.map(...)
> > > >>>>>>>>>> c = a.map(...)
> > > >>>>>>>>>>
> > > >>>>>>>>>> a.cache()
> > > >>>>>>>>>> d = a.map(...)
> > > >>>>>>>>>>
> > > >>>>>>>>>> now b, c and d will consume from a cached operator. In this
> > > case,
> > > >>>>>> the
> > > >>>>>>>>> user
> > > >>>>>>>>>> would most likely expect that only d reads from a cached
> > result.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Cheers,
> > > >>>>>>>>>> Till
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <
> > > >>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hey Shaoxuan and Becket,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Can you explain a bit more one what are the side effects?
> So
> > > >>>>>> far
> > > >>>>>>> my
> > > >>>>>>>>>>>> understanding is that such side effects only exist if a
> > table
> > > >>>>>> is
> > > >>>>>>>>>> mutable.
> > > >>>>>>>>>>>> Is that the case?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Not only that. There are also performance implications and
> > > >>>>> those
> > > >>>>>>> are
> > > >>>>>>>>>>> another implicit side effects of using `void cache()`. As I
> > > >>>>> wrote
> > > >>>>>>>>> before,
> > > >>>>>>>>>>> reading from cache might not always be desirable, thus it
> can
> > > >>>>>> cause
> > > >>>>>>>>>>> performance degradation and I’m fine with that - user's or
> > > >>>>>>>> optimiser’s
> > > >>>>>>>>>>> choice. What I do not like is that this implicit side
> effect
> > > >>>>> can
> > > >>>>>>>>> manifest
> > > >>>>>>>>>>> in completely different part of code, that wasn’t touched
> by
> > a
> > > >>>>>> user
> > > >>>>>>>>> while
> > > >>>>>>>>>>> he was adding `void cache()` call somewhere else. And even
> if
> > > >>>>>>> caching
> > > >>>>>>>>>>> improves performance, it’s still a side effect of `void
> > > >>>>> cache()`.
> > > >>>>>>>>> Almost
> > > >>>>>>>>>>> from the definition `void` methods have only side effects.
> > As I
> > > >>>>>>> wrote
> > > >>>>>>>>>>> before, there are couple of scenarios where this might be
> > > >>>>>>> undesirable
> > > >>>>>>>>>>> and/or unexpected, for example:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 1.
> > > >>>>>>>>>>> Table b = …;
> > > >>>>>>>>>>> b.cache()
> > > >>>>>>>>>>> x = b.join(…)
> > > >>>>>>>>>>> y = b.count()
> > > >>>>>>>>>>> // ...
> > > >>>>>>>>>>> // 100
> > > >>>>>>>>>>> // hundred
> > > >>>>>>>>>>> // lines
> > > >>>>>>>>>>> // of
> > > >>>>>>>>>>> // code
> > > >>>>>>>>>>> // later
> > > >>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even hidden in
> a
> > > >>>>>>>> different
> > > >>>>>>>>>>> method/file/package/dependency
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> 2.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Table b = ...
> > > >>>>>>>>>>> If (some_condition) {
> > > >>>>>>>>>>> foo(b)
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>> Else {
> > > >>>>>>>>>>> bar(b)
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>> z = b.filter(…).groupBy(…)
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Void foo(Table b) {
> > > >>>>>>>>>>> b.cache()
> > > >>>>>>>>>>> // do something with b
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> In both above examples, `b.cache()` will implicitly affect
> > > >>>>>>> (semantic
> > > >>>>>>>>> of a
> > > >>>>>>>>>>> program in case of sources being mutable and performance)
> `z
> > =
> > > >>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from obvious.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On top of that, there is still this argument of mine that
> > > >>>>> having
> > > >>>>>> a
> > > >>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more
> flexible
> > > >>>>> for
> > > >>>>>> us
> > > >>>>>>>> for
> > > >>>>>>>>>> the
> > > >>>>>>>>>>> future and for the user (as a manual option to bypass cache
> > > >>>>>> reads).
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> But Jiangjie is correct,
> > > >>>>>>>>>>>> the source table in batching should be immutable. It is
> the
> > > >>>>>>> user’s
> > > >>>>>>>>>>>> responsibility to ensure it, otherwise even a regular
> > > >>>>> failover
> > > >>>>>>> may
> > > >>>>>>>>> lead
> > > >>>>>>>>>>>> to inconsistent results.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment
> should
> > > >>>>> be.
> > > >>>>>>> But
> > > >>>>>>>>> its
> > > >>>>>>>>>>> often isn’t and while I’m not trying to fix this (since the
> > > >>>>>> proper
> > > >>>>>>>> fix
> > > >>>>>>>>> is
> > > >>>>>>>>>>> to support transactions), I’m just trying to minimise
> > confusion
> > > >>>>>> for
> > > >>>>>>>> the
> > > >>>>>>>>>>> users that are not fully aware what’s going on and operate
> in
> > > >>>>>> less
> > > >>>>>>>> then
> > > >>>>>>>>>>> perfect setup. And if something bites them after adding
> > > >>>>>> `b.cache()`
> > > >>>>>>>>> call,
> > > >>>>>>>>>>> to make sure that they at least know all of the places that
> > > >>>>>> adding
> > > >>>>>>>> this
> > > >>>>>>>>>>> line can affect.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks, Piotrek
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin <becket.qin@xxxxxxxxx
> >
> > > >>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Piotrek,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks again for the clarification. Some more replies are
> > > >>>>>>>> following.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> But keep in mind that `.cache()` will/might not only be
> used
> > > >>>>> in
> > > >>>>>>>>>>> interactive
> > > >>>>>>>>>>>>> programming and not only in batching.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> It is true. Actually in stream processing, cache() has the
> > > >>>>> same
> > > >>>>>>>>>> semantic
> > > >>>>>>>>>>> as
> > > >>>>>>>>>>>> batch processing. The semantic is following:
> > > >>>>>>>>>>>> For a table created via a series of computation, save that
> > > >>>>>> table
> > > >>>>>>>> for
> > > >>>>>>>>>>> later
> > > >>>>>>>>>>>> reference to avoid running the computation logic to
> > > >>>>> regenerate
> > > >>>>>>> the
> > > >>>>>>>>>> table.
> > > >>>>>>>>>>>> Once the application exits, drop all the cache.
> > > >>>>>>>>>>>> This semantic is same for both batch and stream
> processing.
> > > >>>>> The
> > > >>>>>>>>>>> difference
> > > >>>>>>>>>>>> is that stream applications will only run once as they are
> > > >>>>> long
> > > >>>>>>>>>> running.
> > > >>>>>>>>>>>> And the batch applications may be run multiple times,
> hence
> > > >>>>> the
> > > >>>>>>>> cache
> > > >>>>>>>>>> may
> > > >>>>>>>>>>>> be created and dropped each time the application runs.
> > > >>>>>>>>>>>> Admittedly, there will probably be some resource
> management
> > > >>>>>>>>>> requirements
> > > >>>>>>>>>>>> for the streaming cached table, such as time based / size
> > > >>>>> based
> > > >>>>>>>>>>> retention,
> > > >>>>>>>>>>>> to address the infinite data issue. But such requirement
> > does
> > > >>>>>> not
> > > >>>>>>>>>> change
> > > >>>>>>>>>>>> the semantic.
> > > >>>>>>>>>>>> You are right that interactive programming is just one use
> > > >>>>> case
> > > >>>>>>> of
> > > >>>>>>>>>>> cache().
> > > >>>>>>>>>>>> It is not the only use case.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> For me the more important issue is of not having the `void
> > > >>>>>>> cache()`
> > > >>>>>>>>>> with
> > > >>>>>>>>>>>>> side effects.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> This is indeed the key point. The argument around whether
> > > >>>>>> cache()
> > > >>>>>>>>>> should
> > > >>>>>>>>>>>> return something already indicates that cache() and
> > > >>>>>> materialize()
> > > >>>>>>>>>> address
> > > >>>>>>>>>>>> different issues.
> > > >>>>>>>>>>>> Can you explain a bit more one what are the side effects?
> So
> > > >>>>>> far
> > > >>>>>>> my
> > > >>>>>>>>>>>> understanding is that such side effects only exist if a
> > table
> > > >>>>>> is
> > > >>>>>>>>>> mutable.
> > > >>>>>>>>>>>> Is that the case?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I don’t know, probably initially we should make
> CachedTable
> > > >>>>>>>>> read-only.
> > > >>>>>>>>>> I
> > > >>>>>>>>>>>>> don’t find it more confusing than the fact that user can
> > not
> > > >>>>>>> write
> > > >>>>>>>>> to
> > > >>>>>>>>>>> views
> > > >>>>>>>>>>>>> or materialised views in SQL or that user currently can
> not
> > > >>>>>>> write
> > > >>>>>>>>> to a
> > > >>>>>>>>>>>>> Table.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I don't think anyone should insert something to a cache.
> By
> > > >>>>>>>>> definition
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>> cache should only be updated when the corresponding
> original
> > > >>>>>>> table
> > > >>>>>>>> is
> > > >>>>>>>>>>>> updated. What I am wondering is that given the following
> two
> > > >>>>>>> facts:
> > > >>>>>>>>>>>> 1. If and only if a table is mutable (with something like
> > > >>>>>>>> insert()),
> > > >>>>>>>>> a
> > > >>>>>>>>>>>> CachedTable may have implicit behavior.
> > > >>>>>>>>>>>> 2. A CachedTable extends a Table.
> > > >>>>>>>>>>>> We can come to the conclusion that a CachedTable is
> mutable
> > > >>>>> and
> > > >>>>>>>> users
> > > >>>>>>>>>> can
> > > >>>>>>>>>>>> insert into the CachedTable directly. This is where I
> > thought
> > > >>>>>>>>>> confusing.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <
> > > >>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One more
> > > >>>>>>>> explanation
> > > >>>>>>>>>> why
> > > >>>>>>>>>>> I
> > > >>>>>>>>>>>>> think `materialize()` is more natural to me is that I
> think
> > > >>>>> of
> > > >>>>>>> all
> > > >>>>>>>>>>> “Table”s
> > > >>>>>>>>>>>>> in Table-API as views. They behave the same way as SQL
> > > >>>>> views,
> > > >>>>>>> the
> > > >>>>>>>>> only
> > > >>>>>>>>>>>>> difference for me is that their live scope is short -
> > > >>>>> current
> > > >>>>>>>>> session
> > > >>>>>>>>>>> which
> > > >>>>>>>>>>>>> is limited by different execution model. That’s why
> > > >>>>> “cashing”
> > > >>>>>> a
> > > >>>>>>>> view
> > > >>>>>>>>>>> for me
> > > >>>>>>>>>>>>> is just materialising it.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> However I see and I understand your point of view. Coming
> > > >>>>> from
> > > >>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL world,
> > > >>>>>>> `cache()`
> > > >>>>>>>>> is
> > > >>>>>>>>>>> more
> > > >>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might not
> > > >>>>> only
> > > >>>>>> be
> > > >>>>>>>>> used
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>> interactive programming and not only in batching. But
> > naming
> > > >>>>>> is
> > > >>>>>>>> one
> > > >>>>>>>>>>> issue,
> > > >>>>>>>>>>>>> and not that critical to me. Especially that once we
> > > >>>>> implement
> > > >>>>>>>>> proper
> > > >>>>>>>>>>>>> materialised views, we can always deprecate/rename
> > `cache()`
> > > >>>>>> if
> > > >>>>>>> we
> > > >>>>>>>>>> deem
> > > >>>>>>>>>>> so.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> For me the more important issue is of not having the
> `void
> > > >>>>>>>> cache()`
> > > >>>>>>>>>> with
> > > >>>>>>>>>>>>> side effects. Exactly for the reasons that you have
> > > >>>>> mentioned.
> > > >>>>>>>> True:
> > > >>>>>>>>>>>>> results might be non deterministic if underlying source
> > > >>>>> table
> > > >>>>>>> are
> > > >>>>>>>>>>> changing.
> > > >>>>>>>>>>>>> Problem is that `void cache()` implicitly changes the
> > > >>>>> semantic
> > > >>>>>>> of
> > > >>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It can
> > > >>>>> cause
> > > >>>>>>>> “wtf”
> > > >>>>>>>>>>> moment
> > > >>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some place
> in
> > > >>>>> his
> > > >>>>>>>> code
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>> suddenly some other random places are behaving
> differently.
> > > >>>>> If
> > > >>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle, we
> > > >>>>> force
> > > >>>>>>> user
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>> explicitly use the cache which removes the “random” part
> > > >>>>> from
> > > >>>>>>> the
> > > >>>>>>>>>>> "suddenly
> > > >>>>>>>>>>>>> some other random places are behaving differently”.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> This argument and others that I’ve raised (greater
> > > >>>>>>>>>> flexibility/allowing
> > > >>>>>>>>>>>>> user to explicitly bypass the cache) are independent of
> > > >>>>>>> `cache()`
> > > >>>>>>>> vs
> > > >>>>>>>>>>>>> `materialize()` discussion.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Does that mean one can also insert into the CachedTable?
> > > >>>>> This
> > > >>>>>>>>> sounds
> > > >>>>>>>>>>>>> pretty confusing.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I don’t know, probably initially we should make
> CachedTable
> > > >>>>>>>>>> read-only. I
> > > >>>>>>>>>>>>> don’t find it more confusing than the fact that user can
> > not
> > > >>>>>>> write
> > > >>>>>>>>> to
> > > >>>>>>>>>>> views
> > > >>>>>>>>>>>>> or materialised views in SQL or that user currently can
> not
> > > >>>>>>> write
> > > >>>>>>>>> to a
> > > >>>>>>>>>>>>> Table.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui <
> xingcanc@xxxxxxxxx
> > >
> > > >>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I agree with @Becket that `cache()` and `materialize()`
> > > >>>>>> should
> > > >>>>>>> be
> > > >>>>>>>>>>>>> considered as two different methods where the later one
> is
> > > >>>>>> more
> > > >>>>>>>>>>>>> sophisticated.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> According to my understanding, the initial idea is just
> to
> > > >>>>>>>>> introduce
> > > >>>>>>>>>> a
> > > >>>>>>>>>>>>> simple cache or persist mechanism, but as the TableAPI
> is a
> > > >>>>>>>>> high-level
> > > >>>>>>>>>>> API,
> > > >>>>>>>>>>>>> it’s naturally for as to think in a SQL way.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Maybe we can add the `cache()` method to the DataSet API
> > > >>>>> and
> > > >>>>>>>> force
> > > >>>>>>>>>>> users
> > > >>>>>>>>>>>>> to translate a Table to a Dataset before caching it. Then
> > > >>>>> the
> > > >>>>>>>> users
> > > >>>>>>>>>>> should
> > > >>>>>>>>>>>>> manually register the cached dataset to a table again (we
> > > >>>>> may
> > > >>>>>>> need
> > > >>>>>>>>>> some
> > > >>>>>>>>>>>>> table replacement mechanisms for datasets with an
> identical
> > > >>>>>>> schema
> > > >>>>>>>>> but
> > > >>>>>>>>>>>>> different contents here). After all, it’s the dataset
> > rather
> > > >>>>>>> than
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>> dynamic table that need to be cached, right?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>> Xingcan
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <
> > > >>>>>>> becket.qin@xxxxxxxxx>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi Piotrek and Jark,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are good
> > > >>>>>>>> arguments.
> > > >>>>>>>>>>> But I
> > > >>>>>>>>>>>>>>> think those arguments are mostly about materialized
> view.
> > > >>>>>> Let
> > > >>>>>>> me
> > > >>>>>>>>> try
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>>> explain the reason I believe cache() and materialize()
> > are
> > > >>>>>>>>>> different.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I think cache() and materialize() have quite different
> > > >>>>>>>>> implications.
> > > >>>>>>>>>>> An
> > > >>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When users
> > > >>>>> call
> > > >>>>>>>>> cache(),
> > > >>>>>>>>>>> it
> > > >>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>> just like they are saving an intermediate result as a
> > > >>>>> draft
> > > >>>>>> of
> > > >>>>>>>>> their
> > > >>>>>>>>>>>>> work,
> > > >>>>>>>>>>>>>>> this intermediate result may not have any realistic
> > > >>>>> meaning.
> > > >>>>>>>>> Calling
> > > >>>>>>>>>>>>>>> cache() does not mean users want to publish the cached
> > > >>>>> table
> > > >>>>>>> in
> > > >>>>>>>>> any
> > > >>>>>>>>>>>>> manner.
> > > >>>>>>>>>>>>>>> But when users call materialize(), that means "I have
> > > >>>>>>> something
> > > >>>>>>>>>>>>> meaningful
> > > >>>>>>>>>>>>>>> to be reused by others", now users need to think about
> > the
> > > >>>>>>>>>> validation,
> > > >>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Piotrek's suggestions on variations of the
> materialize()
> > > >>>>>>> methods
> > > >>>>>>>>> are
> > > >>>>>>>>>>>>> very
> > > >>>>>>>>>>>>>>> useful. It would be great if Flink have them. The
> concept
> > > >>>>> of
> > > >>>>>>>>>>>>> materialized
> > > >>>>>>>>>>>>>>> view is actually a pretty big feature, not to say the
> > > >>>>>> related
> > > >>>>>>>>> stuff
> > > >>>>>>>>>>> like
> > > >>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the
> > > >>>>>> materialized
> > > >>>>>>>>> view
> > > >>>>>>>>>>>>> itself
> > > >>>>>>>>>>>>>>> should be discussed in a more thorough and systematic
> > > >>>>>> manner.
> > > >>>>>>>> And
> > > >>>>>>>>> I
> > > >>>>>>>>>>>>> found
> > > >>>>>>>>>>>>>>> that discussion is kind of orthogonal and way beyond
> > > >>>>>>> interactive
> > > >>>>>>>>>>>>>>> programming experience.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> The example you gave was interesting. I still have some
> > > >>>>>>>> questions,
> > > >>>>>>>>>>>>> though.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Table source = … // some source that scans files from a
> > > >>>>>>>> directory
> > > >>>>>>>>>>>>>>>> “/foo/bar/“
> > > >>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> > > >>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
> > > >>>>> initialised)
> > > >>>>>>>>>>>>>>>> int a1 = t1.count()
> > > >>>>>>>>>>>>>>>> int b1 = t2.count()
> > > >>>>>>>>>>>>>>>> // something in the background (or we trigger it)
> writes
> > > >>>>>> new
> > > >>>>>>>>> files
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> /foo/bar
> > > >>>>>>>>>>>>>>>> int a2 = t1.count()
> > > >>>>>>>>>>>>>>>> int b2 = t2.count()
> > > >>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to be
> > > >>>>>>>> implemented
> > > >>>>>>>>> in
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> initial version
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> what if someone else added some more files to /foo/bar
> at
> > > >>>>>> this
> > > >>>>>>>>>> point?
> > > >>>>>>>>>>> In
> > > >>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result become
> > > >>>>>>>>>>>>> non-deterministic,
> > > >>>>>>>>>>>>>>> right?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> int a3 = t1.count()
> > > >>>>>>>>>>>>>>>> int b3 = t2.count()
> > > >>>>>>>>>>>>>>>> t2.drop() // another possible future extension, manual
> > > >>>>>>> “cache”
> > > >>>>>>>>>>> dropping
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> When we talk about interactive programming, in most
> > cases,
> > > >>>>>> we
> > > >>>>>>>> are
> > > >>>>>>>>>>>>> talking
> > > >>>>>>>>>>>>>>> about batch applications. A fundamental assumption of
> > such
> > > >>>>>>> case
> > > >>>>>>>> is
> > > >>>>>>>>>>> that
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> source data is complete before the data processing
> > begins,
> > > >>>>>> and
> > > >>>>>>>> the
> > > >>>>>>>>>>> data
> > > >>>>>>>>>>>>>>> will not change during the data processing. IMO, if
> > > >>>>>> additional
> > > >>>>>>>>> rows
> > > >>>>>>>>>>>>> needs
> > > >>>>>>>>>>>>>>> to be added to some source during the processing, it
> > > >>>>> should
> > > >>>>>> be
> > > >>>>>>>>> done
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>> ways
> > > >>>>>>>>>>>>>>> like union the source with another table containing the
> > > >>>>> rows
> > > >>>>>>> to
> > > >>>>>>>> be
> > > >>>>>>>>>>>>> added.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> There are a few cases that computations are executed
> > > >>>>>>> repeatedly
> > > >>>>>>>> on
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>> changing data source.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> For example, people may run a ML training job every
> hour
> > > >>>>>> with
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> samples
> > > >>>>>>>>>>>>>>> newly added in the past hour. In that case, the source
> > > >>>>> data
> > > >>>>>>>>> between
> > > >>>>>>>>>>> will
> > > >>>>>>>>>>>>>>> indeed change. But still, the data remain unchanged
> > within
> > > >>>>>> one
> > > >>>>>>>>> run.
> > > >>>>>>>>>>> And
> > > >>>>>>>>>>>>>>> usually in that case, the result will need versioning,
> > > >>>>> i.e.
> > > >>>>>>> for
> > > >>>>>>>> a
> > > >>>>>>>>>>> given
> > > >>>>>>>>>>>>>>> result, it tells that the result is a result from the
> > > >>>>> source
> > > >>>>>>>> data
> > > >>>>>>>>>> by a
> > > >>>>>>>>>>>>>>> certain timestamp.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Another example is something like data warehouse. In
> this
> > > >>>>>>> case,
> > > >>>>>>>>>> there
> > > >>>>>>>>>>>>> are a
> > > >>>>>>>>>>>>>>> few source of original/raw data. On top of those
> sources,
> > > >>>>>> many
> > > >>>>>>>>>>>>> materialized
> > > >>>>>>>>>>>>>>> view / queries / reports / dashboards can be created to
> > > >>>>>>> generate
> > > >>>>>>>>>>> derived
> > > >>>>>>>>>>>>>>> data. Those derived data needs to be updated when the
> > > >>>>>>> underlying
> > > >>>>>>>>>>>>> original
> > > >>>>>>>>>>>>>>> data changes. In that case, the processing logic that
> > > >>>>>> derives
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> original
> > > >>>>>>>>>>>>>>> data needs to be executed repeatedly to update those
> > > >>>>>>>>> reports/views.
> > > >>>>>>>>>>>>> Again,
> > > >>>>>>>>>>>>>>> all those derived data also need to have version
> > > >>>>> management,
> > > >>>>>>>> such
> > > >>>>>>>>> as
> > > >>>>>>>>>>>>>>> timestamp.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> In any of the above two cases, during a single run of
> the
> > > >>>>>>>>> processing
> > > >>>>>>>>>>>>> logic,
> > > >>>>>>>>>>>>>>> the data cannot change. Otherwise the behavior of the
> > > >>>>>>> processing
> > > >>>>>>>>>> logic
> > > >>>>>>>>>>>>> may
> > > >>>>>>>>>>>>>>> be undefined. In the above two examples, when writing
> the
> > > >>>>>>>>> processing
> > > >>>>>>>>>>>>> logic,
> > > >>>>>>>>>>>>>>> Users can use .cache() to hint Flink that those results
> > > >>>>>> should
> > > >>>>>>>> be
> > > >>>>>>>>>>> saved
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>> avoid repeated computation. And then for the result of
> my
> > > >>>>>>>>>> application
> > > >>>>>>>>>>>>>>> logic, I'll call materialize(), so that these results
> > > >>>>> could
> > > >>>>>> be
> > > >>>>>>>>>> managed
> > > >>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>> the system with versioning, metadata management,
> > lifecycle
> > > >>>>>>>>>> management,
> > > >>>>>>>>>>>>>>> ACLs, etc.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> It is true we can use materialize() to do the cache()
> > job,
> > > >>>>>>> but I
> > > >>>>>>>>> am
> > > >>>>>>>>>>>>> really
> > > >>>>>>>>>>>>>>> reluctant to shoehorn cache() into materialize() and
> > force
> > > >>>>>>> users
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>> worry
> > > >>>>>>>>>>>>>>> about a bunch of implications that they needn't have
> to.
> > I
> > > >>>>>> am
> > > >>>>>>>>>>>>> absolutely on
> > > >>>>>>>>>>>>>>> your side that redundant API is bad. But it is equally
> > > >>>>>>>>> frustrating,
> > > >>>>>>>>>> if
> > > >>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>> more, that the same API does different things.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <
> > > >>>>>>>>> wshaoxuan@xxxxxxxxx
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Thanks Piotrek,
> > > >>>>>>>>>>>>>>>> You provided a very good example, it explains all the
> > > >>>>>>>> confusions
> > > >>>>>>>>> I
> > > >>>>>>>>>>>>> have.
> > > >>>>>>>>>>>>>>>> It is clear that there is something we have not
> > > >>>>> considered
> > > >>>>>> in
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> initial
> > > >>>>>>>>>>>>>>>> proposal. We intend to force the user to reuse the
> > > >>>>>>>>>>> cached/materialized
> > > >>>>>>>>>>>>>>>> table, if its cache() method is executed. We did not
> > > >>>>> expect
> > > >>>>>>>> that
> > > >>>>>>>>>> user
> > > >>>>>>>>>>>>> may
> > > >>>>>>>>>>>>>>>> want to re-executed the plan from the source table.
> Let
> > > >>>>> me
> > > >>>>>>>>> re-think
> > > >>>>>>>>>>>>> about
> > > >>>>>>>>>>>>>>>> it and get back to you later.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> In the meanwhile, this example/observation also infers
> > > >>>>> that
> > > >>>>>>> we
> > > >>>>>>>>>> cannot
> > > >>>>>>>>>>>>> fully
> > > >>>>>>>>>>>>>>>> involve the optimizer to decide the plan if a
> > > >>>>>>> cache/materialize
> > > >>>>>>>>> is
> > > >>>>>>>>>>>>>>>> explicitly used, because weather to reuse the cache
> data
> > > >>>>> or
> > > >>>>>>>>>>> re-execute
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> query from source data may lead to different results.
> > > >>>>> (But
> > > >>>>>> I
> > > >>>>>>>>> guess
> > > >>>>>>>>>>>>>>>> optimizer can still help in some cases ---- as long as
> > it
> > > >>>>>>> does
> > > >>>>>>>>> not
> > > >>>>>>>>>>>>>>>> re-execute from the varied source, we should be safe).
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>> Shaoxuan
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski <
> > > >>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi Shaoxuan,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Re 2:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is
> > > >>>>>> modified
> > > >>>>>>>>> to->
> > > >>>>>>>>>>> t1’
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> What do you mean that “ t1 is modified to-> t1’ ” ?
> > That
> > > >>>>>>>>>>>>>>>>> `methodThatAppliesOperators()` method has changed
> it’s
> > > >>>>>> plan?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I was thinking more about something like this:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Table source = … // some source that scans files
> from a
> > > >>>>>>>>> directory
> > > >>>>>>>>>>>>>>>>> “/foo/bar/“
> > > >>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> > > >>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
> > > >>>>>> initialised)
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> int a1 = t1.count()
> > > >>>>>>>>>>>>>>>>> int b1 = t2.count()
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> // something in the background (or we trigger it)
> > writes
> > > >>>>>> new
> > > >>>>>>>>> files
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> /foo/bar
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> int a2 = t1.count()
> > > >>>>>>>>>>>>>>>>> int b2 = t2.count()
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to be
> > > >>>>>>>> implemented
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> initial version
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> int a3 = t1.count()
> > > >>>>>>>>>>>>>>>>> int b3 = t2.count()
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> t2.drop() // another possible future extension,
> manual
> > > >>>>>>> “cache”
> > > >>>>>>>>>>>>> dropping
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> assertTrue(a1 == b1) // same results, but b1 comes
> from
> > > >>>>>> the
> > > >>>>>>>>>> “cache"
> > > >>>>>>>>>>>>>>>>> assertTrue(b1 == b2) // both values come from the
> same
> > > >>>>>> cache
> > > >>>>>>>>>>>>>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2
> > > >>>>> re-executed
> > > >>>>>>>> full
> > > >>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>> scan
> > > >>>>>>>>>>>>>>>>> and has more data
> > > >>>>>>>>>>>>>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache
> > > >>>>>>>>>>>>>>>>> assertTrue(b3 == a2 == a3)
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imjark@xxxxxxxxx
> >
> > > >>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> It is an very interesting and useful design!
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Here I want to share some of my thoughts:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 1. Agree with that cache() method should return some
> > > >>>>>> Table
> > > >>>>>>> to
> > > >>>>>>>>>> avoid
> > > >>>>>>>>>>>>>>>> some
> > > >>>>>>>>>>>>>>>>>> unexpected problems because of the mutable object.
> > > >>>>>>>>>>>>>>>>>> All the existing methods of Table are returning a
> new
> > > >>>>>> Table
> > > >>>>>>>>>>> instance.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 2. I think materialize() would be more consistent
> with
> > > >>>>>> SQL,
> > > >>>>>>>>> this
> > > >>>>>>>>>>>>> makes
> > > >>>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>>>> possible to support the same feature for SQL
> > > >>>>> (materialize
> > > >>>>>>>> view)
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>>>>> keep
> > > >>>>>>>>>>>>>>>>>> the same API for users in the future.
> > > >>>>>>>>>>>>>>>>>> But I'm also fine if we choose cache().
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 3. In the proposal, a TableService (or
> FlinkService?)
> > > >>>>> is
> > > >>>>>>> used
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>> cache
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> result of the (intermediate) table.
> > > >>>>>>>>>>>>>>>>>> But the name of TableService may be a bit general
> > which
> > > >>>>>> is
> > > >>>>>>>> not
> > > >>>>>>>>>>> quite
> > > >>>>>>>>>>>>>>>>>> understanding correctly in the first glance (a
> > > >>>>> metastore
> > > >>>>>>> for
> > > >>>>>>>>>>>>> tables?).
> > > >>>>>>>>>>>>>>>>>> Maybe a more specific name would be better, such as
> > > >>>>>>>>>>> TableCacheSerive
> > > >>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>> TableMaterializeSerivce or something else.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <
> > > >>>>>>>> fhueske@xxxxxxxxx
> > > >>>>>>>>>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks for the clarification Becket!
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I have a few thoughts to share / questions:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 1) I'd like to know how you plan to implement the
> > > >>>>>> feature
> > > >>>>>>>> on a
> > > >>>>>>>>>>> plan
> > > >>>>>>>>>>>>> /
> > > >>>>>>>>>>>>>>>>>>> planner level.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I would imaging the following to happen when
> > > >>>>>> Table.cache()
> > > >>>>>>>> is
> > > >>>>>>>>>>>>> called:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 1) immediately optimize the Table and internally
> > > >>>>> convert
> > > >>>>>>> it
> > > >>>>>>>>>> into a
> > > >>>>>>>>>>>>>>>>>>> DataSet/DataStream. This is necessary, to avoid
> that
> > > >>>>>>>> operators
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>>>>>> later
> > > >>>>>>>>>>>>>>>>>>> queries on top of the Table are pushed down.
> > > >>>>>>>>>>>>>>>>>>> 2) register the DataSet/DataStream as a
> > > >>>>>>>>>> DataSet/DataStream-backed
> > > >>>>>>>>>>>>>>>> Table
> > > >>>>>>>>>>>>>>>>> X
> > > >>>>>>>>>>>>>>>>>>> 3) add a sink to the DataSet/DataStream. This is
> the
> > > >>>>>>>>>>> materialization
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> Table X
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Based on your proposal the following would happen:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Table t1 = ....
> > > >>>>>>>>>>>>>>>>>>> t1.cache(); // cache() returns void. The logical
> plan
> > > >>>>> of
> > > >>>>>>> t1
> > > >>>>>>>> is
> > > >>>>>>>>>>>>>>>> replaced
> > > >>>>>>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>>> a scan of X. There is also a reference to the
> > > >>>>>>>> materialization
> > > >>>>>>>>> of
> > > >>>>>>>>>>> X.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> t1.count(); // this executes the program, including
> > > >>>>> the
> > > >>>>>>>>>>>>>>>>> DataSet/DataStream
> > > >>>>>>>>>>>>>>>>>>> that backs X and the sink that writes the
> > > >>>>>> materialization
> > > >>>>>>>> of X
> > > >>>>>>>>>>>>>>>>>>> t1.count(); // this executes the program, but
> reads X
> > > >>>>>> from
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> materialization.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> My question is, how do you determine when whether
> the
> > > >>>>>> scan
> > > >>>>>>>> of
> > > >>>>>>>>> t1
> > > >>>>>>>>>>>>>>>> should
> > > >>>>>>>>>>>>>>>>> go
> > > >>>>>>>>>>>>>>>>>>> against the DataSet/DataStream program and when
> > > >>>>> against
> > > >>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> materialization?
> > > >>>>>>>>>>>>>>>>>>> AFAIK, there is no hook that will tell you that a
> > part
> > > >>>>>> of
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> program
> > > >>>>>>>>>>>>>>>>> was
> > > >>>>>>>>>>>>>>>>>>> executed. Flipping a switch during optimization or
> > > >>>>> plan
> > > >>>>>>>>>> generation
> > > >>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>> sufficient as there is no guarantee that the plan
> is
> > > >>>>>> also
> > > >>>>>>>>>>> executed.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Overall, this behavior is somewhat similar to what
> I
> > > >>>>>>>> proposed
> > > >>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>> FLINK-8950, which does not include persisting the
> > > >>>>> table,
> > > >>>>>>> but
> > > >>>>>>>>>> just
> > > >>>>>>>>>>>>>>>>>>> optimizing and reregistering it as
> DataSet/DataStream
> > > >>>>>>> scan.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 2) I think Piotr has a point about the implicit
> > > >>>>> behavior
> > > >>>>>>> and
> > > >>>>>>>>>> side
> > > >>>>>>>>>>>>>>>>> effects
> > > >>>>>>>>>>>>>>>>>>> of the cache() method if it does not return
> anything.
> > > >>>>>>>>>>>>>>>>>>> Consider the following example:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Table t1 = ???
> > > >>>>>>>>>>>>>>>>>>> Table t2 = methodThatAppliesOperators(t1);
> > > >>>>>>>>>>>>>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1);
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> In this case, the behavior/performance of the plan
> > > >>>>> that
> > > >>>>>>>>> results
> > > >>>>>>>>>>> from
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> second method call depends on whether t1 was
> modified
> > > >>>>> by
> > > >>>>>>> the
> > > >>>>>>>>>> first
> > > >>>>>>>>>>>>>>>>> method
> > > >>>>>>>>>>>>>>>>>>> or not.
> > > >>>>>>>>>>>>>>>>>>> This is the classic issue of mutable vs. immutable
> > > >>>>>>> objects.
> > > >>>>>>>>>>>>>>>>>>> Also, as Piotr pointed out, it might also be good
> to
> > > >>>>>> have
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> original
> > > >>>>>>>>>>>>>>>>> plan
> > > >>>>>>>>>>>>>>>>>>> of t1, because in some cases it is possible to push
> > > >>>>>>> filters
> > > >>>>>>>>> down
> > > >>>>>>>>>>>>> such
> > > >>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>> evaluating the query from scratch might be more
> > > >>>>>> efficient
> > > >>>>>>>> than
> > > >>>>>>>>>>>>>>>> accessing
> > > >>>>>>>>>>>>>>>>>>> the cache.
> > > >>>>>>>>>>>>>>>>>>> Moreover, a CachedTable could extend Table() and
> > > >>>>> offer a
> > > >>>>>>>>> method
> > > >>>>>>>>>>>>>>>>> refresh().
> > > >>>>>>>>>>>>>>>>>>> This sounds quite useful in an interactive session
> > > >>>>> mode.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> 3) Regarding the name, I can see both arguments.
> IMO,
> > > >>>>>>>>>>> materialize()
> > > >>>>>>>>>>>>>>>>> seems
> > > >>>>>>>>>>>>>>>>>>> to be more future proof.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Best, Fabian
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan
> > > >>>>>> Wang <
> > > >>>>>>>>>>>>>>>>>>> wshaoxuan@xxxxxxxxx>:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Hi Piotr,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Thanks for sharing your ideas on the method
> naming.
> > > >>>>> We
> > > >>>>>>> will
> > > >>>>>>>>>> think
> > > >>>>>>>>>>>>>>>> about
> > > >>>>>>>>>>>>>>>>>>>> your suggestions. But I don't understand why we
> need
> > > >>>>> to
> > > >>>>>>>>> change
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> return
> > > >>>>>>>>>>>>>>>>>>>> type of cache().
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Cache() is a physical operation, it does not
> change
> > > >>>>> the
> > > >>>>>>>> logic
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>> the `Table`. On the tableAPI layer, we should not
> > > >>>>>>>> introduce a
> > > >>>>>>>>>> new
> > > >>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>>> type unless the logic of table has been changed.
> If
> > > >>>>> we
> > > >>>>>>>>>> introduce
> > > >>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>>>>> table type `CachedTable`, we need create the same
> > set
> > > >>>>>> of
> > > >>>>>>>>>> methods
> > > >>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>> `Table`
> > > >>>>>>>>>>>>>>>>>>>> for it. I don't think it is worth doing this. Or
> can
> > > >>>>>> you
> > > >>>>>>>>> please
> > > >>>>>>>>>>>>>>>>> elaborate
> > > >>>>>>>>>>>>>>>>>>>> more on what could be the "implicit
> behaviours/side
> > > >>>>>>>> effects"
> > > >>>>>>>>>> you
> > > >>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>>>>>> thinking about?
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>>>>> Shaoxuan
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski <
> > > >>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Hi Becket,
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the response.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> 1. I wasn’t saying that materialised view must be
> > > >>>>>>> mutable
> > > >>>>>>>> or
> > > >>>>>>>>>>> not.
> > > >>>>>>>>>>>>>>>> The
> > > >>>>>>>>>>>>>>>>>>>> same
> > > >>>>>>>>>>>>>>>>>>>>> thing applies to caches as well. To the
> contrary, I
> > > >>>>>>> would
> > > >>>>>>>>>> expect
> > > >>>>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>>> consistency and updates from something that is
> > > >>>>> called
> > > >>>>>>>>> “cache”
> > > >>>>>>>>>> vs
> > > >>>>>>>>>>>>>>>>>>>> something
> > > >>>>>>>>>>>>>>>>>>>>> that’s a “materialised view”. In other words, IMO
> > > >>>>> most
> > > >>>>>>>>> caches
> > > >>>>>>>>>> do
> > > >>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>> serve
> > > >>>>>>>>>>>>>>>>>>>>> you invalid/outdated data and they handle updates
> > on
> > > >>>>>>> their
> > > >>>>>>>>>> own.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> 2. I don’t think that having in the future two
> very
> > > >>>>>>>> similar
> > > >>>>>>>>>>>>> concepts
> > > >>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>> `materialized` view and `cache` is a good idea.
> It
> > > >>>>>> would
> > > >>>>>>>> be
> > > >>>>>>>>>>>>>>>> confusing
> > > >>>>>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>>> the users. I think it could be handled by
> > > >>>>>>>>>> variations/overloading
> > > >>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>> materialised view concept. We could start with:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materialize()` - immutable,
> > > >>>>> session
> > > >>>>>>>> life
> > > >>>>>>>>>>> scope
> > > >>>>>>>>>>>>>>>>>>>>> (basically the same semantic as you are proposing
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> And then in the future (if ever) build on top of
> > > >>>>>>>> that/expand
> > > >>>>>>>>>> it
> > > >>>>>>>>>>>>>>>> with:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or
> > > >>>>>>>>>>>>> `MaterializedTable
> > > >>>>>>>>>>>>>>>>>>>>> materialize(refreshHook=…)`
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Or with cross session support:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materializeInto(connector=…)`
> or
> > > >>>>>>>>>>>>>>>> `MaterializedTable
> > > >>>>>>>>>>>>>>>>>>>>> materializeInto(tableFactory=…)`
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> I’m not saying that we should implement cross
> > > >>>>>>>>>> session/refreshing
> > > >>>>>>>>>>>>> now
> > > >>>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>>>>> even in the near future. I’m just arguing that
> > > >>>>> naming
> > > >>>>>>>>> current
> > > >>>>>>>>>>>>>>>>> immutable
> > > >>>>>>>>>>>>>>>>>>>>> session life scope method `materialize()` is more
> > > >>>>>> future
> > > >>>>>>>>> proof
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>>> consistent with SQL (on which after all table-api
> > is
> > > >>>>>>>> heavily
> > > >>>>>>>>>>>>> basing
> > > >>>>>>>>>>>>>>>>>>> on).
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> 3. Even if we agree on naming it `cache()`, I
> would
> > > >>>>>>> still
> > > >>>>>>>>>> insist
> > > >>>>>>>>>>>>> on
> > > >>>>>>>>>>>>>>>>>>>>> `cache()` returning `CachedTable` handle to avoid
> > > >>>>>>> implicit
> > > >>>>>>>>>>>>>>>>>>>> behaviours/side
> > > >>>>>>>>>>>>>>>>>>>>> effects and to give both us & users more
> > > >>>>> flexibility.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <
> > > >>>>>>>> becket.qin@xxxxxxxxx
> > > >>>>>>>>>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Just to add a little bit, the materialized view
> is
> > > >>>>>>>> probably
> > > >>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>> similar
> > > >>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>> the persistent() brought up earlier in the
> thread.
> > > >>>>> So
> > > >>>>>>> it
> > > >>>>>>>> is
> > > >>>>>>>>>>>>> usually
> > > >>>>>>>>>>>>>>>>>>>> cross
> > > >>>>>>>>>>>>>>>>>>>>>> session and could be used in a larger scope. For
> > > >>>>>>>> example, a
> > > >>>>>>>>>>>>>>>>>>>> materialized
> > > >>>>>>>>>>>>>>>>>>>>>> view created by user A may be visible to user B.
> > It
> > > >>>>>> is
> > > >>>>>>>>>> probably
> > > >>>>>>>>>>>>>>>>>>>> something
> > > >>>>>>>>>>>>>>>>>>>>>> we want to have in the future. I'll put it in
> the
> > > >>>>>>> future
> > > >>>>>>>>> work
> > > >>>>>>>>>>>>>>>>>>> section.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <
> > > >>>>>>>>>>> becket.qin@xxxxxxxxx
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Right now we are mostly thinking of the cached
> > > >>>>> table
> > > >>>>>>> as
> > > >>>>>>>>>>>>>>>> immutable. I
> > > >>>>>>>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>>>>>> see the Materialized view would be useful in
> the
> > > >>>>>>> future.
> > > >>>>>>>>>> That
> > > >>>>>>>>>>>>>>>> said,
> > > >>>>>>>>>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>>>>>>>>>> a simple cache mechanism is probably still
> > needed.
> > > >>>>>> So
> > > >>>>>>> to
> > > >>>>>>>>> me,
> > > >>>>>>>>>>>>>>>> cache()
> > > >>>>>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>>>> materialize() should be two separate method as
> > > >>>>> they
> > > >>>>>>>>> address
> > > >>>>>>>>>>>>>>>>>>> different
> > > >>>>>>>>>>>>>>>>>>>>>>> needs. Materialize() is a higher level concept
> > > >>>>>> usually
> > > >>>>>>>>>>> implying
> > > >>>>>>>>>>>>>>>>>>>>> periodical
> > > >>>>>>>>>>>>>>>>>>>>>>> update, while cache() has much simpler
> semantic.
> > > >>>>> For
> > > >>>>>>>>>> example,
> > > >>>>>>>>>>>>> one
> > > >>>>>>>>>>>>>>>>>>> may
> > > >>>>>>>>>>>>>>>>>>>>>>> create a materialized view and use cache()
> method
> > > >>>>> in
> > > >>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> materialized
> > > >>>>>>>>>>>>>>>>>>>>> view
> > > >>>>>>>>>>>>>>>>>>>>>>> creation logic. So that during the materialized
> > > >>>>> view
> > > >>>>>>>>> update,
> > > >>>>>>>>>>>>> they
> > > >>>>>>>>>>>>>>>> do
> > > >>>>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>>>>> need to worry about the case that the cached
> > table
> > > >>>>>> is
> > > >>>>>>>> also
> > > >>>>>>>>>>>>>>>> changed.
> > > >>>>>>>>>>>>>>>>>>>>> Maybe
> > > >>>>>>>>>>>>>>>>>>>>>>> under the hood, materialized() and cache()
> could
> > > >>>>>> share
> > > >>>>>>>>> some
> > > >>>>>>>>>>>>>>>>>>> mechanism,
> > > >>>>>>>>>>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>>>>>>>>>> I think a simple cache() method would be handy
> in
> > > >>>>> a
> > > >>>>>>> lot
> > > >>>>>>>> of
> > > >>>>>>>>>>>>> cases.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski
> <
> > > >>>>>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Becket,
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Is there any extra thing user can do on a
> > > >>>>>>>>>> MaterializedTable
> > > >>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>> they
> > > >>>>>>>>>>>>>>>>>>>>>>>> cannot do on a Table?
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Maybe not in the initial implementation, but
> > > >>>>>> various
> > > >>>>>>>> DBs
> > > >>>>>>>>>>> offer
> > > >>>>>>>>>>>>>>>>>>>>> different
> > > >>>>>>>>>>>>>>>>>>>>>>>> ways to “refresh” the materialised view.
> Hooks,
> > > >>>>>>>> triggers,
> > > >>>>>>>>>>>>> timers,
> > > >>>>>>>>>>>>>>>>>>>>> manually
> > > >>>>>>>>>>>>>>>>>>>>>>>> etc. Having `MaterializedTable` would help us
> to
> > > >>>>>>> handle
> > > >>>>>>>>>> that
> > > >>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> future.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> After users call *table.cache(), *users can
> > just
> > > >>>>>> use
> > > >>>>>>>>> that
> > > >>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>> do
> > > >>>>>>>>>>>>>>>>>>>>>>>> anything that is supported on a Table,
> including
> > > >>>>>> SQL.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> This is some implicit behaviour with side
> > > >>>>> effects.
> > > >>>>>>>>> Imagine
> > > >>>>>>>>>> if
> > > >>>>>>>>>>>>>>>> user
> > > >>>>>>>>>>>>>>>>>>>> has
> > > >>>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>>> long and complicated program, that touches
> table
> > > >>>>>> `b`
> > > >>>>>>>>>> multiple
> > > >>>>>>>>>>>>>>>>>>> times,
> > > >>>>>>>>>>>>>>>>>>>>> maybe
> > > >>>>>>>>>>>>>>>>>>>>>>>> scattered around different methods. If he
> > > >>>>> modifies
> > > >>>>>>> his
> > > >>>>>>>>>>> program
> > > >>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>>>>> inserting
> > > >>>>>>>>>>>>>>>>>>>>>>>> in one place
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> b.cache()
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> This implicitly alters the semantic and
> > behaviour
> > > >>>>>> of
> > > >>>>>>>> his
> > > >>>>>>>>>> code
> > > >>>>>>>>>>>>> all
> > > >>>>>>>>>>>>>>>>>>>> over
> > > >>>>>>>>>>>>>>>>>>>>>>>> the place, maybe in a ways that might cause
> > > >>>>>> problems.
> > > >>>>>>>> For
> > > >>>>>>>>>>>>> example
> > > >>>>>>>>>>>>>>>>>>>> what
> > > >>>>>>>>>>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>>>>>>>>>>>> underlying data is changing?
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Having invisible side effects is also not very
> > > >>>>>> clean,
> > > >>>>>>>> for
> > > >>>>>>>>>>>>> example
> > > >>>>>>>>>>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>>>>>>>>>>> about something like this (but more
> > complicated):
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Table b = ...;
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> If (some_condition) {
> > > >>>>>>>>>>>>>>>>>>>>>>>> processTable1(b)
> > > >>>>>>>>>>>>>>>>>>>>>>>> }
> > > >>>>>>>>>>>>>>>>>>>>>>>> else {
> > > >>>>>>>>>>>>>>>>>>>>>>>> processTable2(b)
> > > >>>>>>>>>>>>>>>>>>>>>>>> }
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> // do more stuff with b
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> And user adds `b.cache()` call to only one of
> > the
> > > >>>>>>>>>>>>> `processTable1`
> > > >>>>>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>>>>>>>> `processTable2` methods.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> On the other hand
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Table materialisedB = b.materialize()
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Avoids (at least some of) the side effect
> issues
> > > >>>>>> and
> > > >>>>>>>>> forces
> > > >>>>>>>>>>>>> user
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>> explicitly use `materialisedB` where it’s
> > > >>>>>> appropriate
> > > >>>>>>>> and
> > > >>>>>>>>>>>>> forces
> > > >>>>>>>>>>>>>>>>>>> user
> > > >>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>> think what does it actually mean. And if
> > > >>>>> something
> > > >>>>>>>>> doesn’t
> > > >>>>>>>>>>> work
> > > >>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> end
> > > >>>>>>>>>>>>>>>>>>>>>>>> for the user, he will know what has he changed
> > > >>>>>>> instead
> > > >>>>>>>> of
> > > >>>>>>>>>>>>> blaming
> > > >>>>>>>>>>>>>>>>>>>>> Flink for
> > > >>>>>>>>>>>>>>>>>>>>>>>> some “magic” underneath. In the above example,
> > > >>>>>> after
> > > >>>>>>>>>>>>>>>> materialising
> > > >>>>>>>>>>>>>>>>>>> b
> > > >>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>>> only one of the methods, he should/would
> realise
> > > >>>>>>> about
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>> issue
> > > >>>>>>>>>>>>>>>>>>> when
> > > >>>>>>>>>>>>>>>>>>>>>>>> handling the return value `MaterializedTable`
> of
> > > >>>>>> that
> > > >>>>>>>>>> method.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> I guess it comes down to personal preferences
> if
> > > >>>>>> you
> > > >>>>>>>> like
> > > >>>>>>>>>>>>> things
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>>>>>>> implicit or not. The more power is the user,
> > > >>>>>> probably
> > > >>>>>>>> the
> > > >>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>> likely
> > > >>>>>>>>>>>>>>>>>>>>> he is
> > > >>>>>>>>>>>>>>>>>>>>>>>> to like/understand implicit behaviour. And we
> as
> > > >>>>>>> Table
> > > >>>>>>>>> API
> > > >>>>>>>>>>>>>>>>>>> designers
> > > >>>>>>>>>>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>>>>>>>>>> the most power users out there, so I would
> > > >>>>> proceed
> > > >>>>>>> with
> > > >>>>>>>>>>> caution
> > > >>>>>>>>>>>>>>>> (so
> > > >>>>>>>>>>>>>>>>>>>>> that we
> > > >>>>>>>>>>>>>>>>>>>>>>>> do not end up in the crazy perl realm with
> it’s
> > > >>>>>>> lovely
> > > >>>>>>>>>>> implicit
> > > >>>>>>>>>>>>>>>>>>>> method
> > > >>>>>>>>>>>>>>>>>>>>>>>> arguments ;)  <
> > > >>>>>>>>>> https://stackoverflow.com/a/14922656/8149051
> > > >>>>>>>>>>>> )
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Table API to also support non-relational
> > > >>>>>> processing
> > > >>>>>>>>> cases,
> > > >>>>>>>>>>>>>>>> cache()
> > > >>>>>>>>>>>>>>>>>>>>>>>> might be slightly better.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> I think even such extended Table API could
> > > >>>>> benefit
> > > >>>>>>> from
> > > >>>>>>>>>>>>> sticking
> > > >>>>>>>>>>>>>>>>>>>>> to/being
> > > >>>>>>>>>>>>>>>>>>>>>>>> consistent with SQL where both SQL and Table
> API
> > > >>>>>> are
> > > >>>>>>>>>>> basically
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> same.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> One more thing. `MaterializedTable
> > materialize()`
> > > >>>>>>> could
> > > >>>>>>>>> be
> > > >>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>>>>>> powerful/flexible allowing the user to operate
> > > >>>>> both
> > > >>>>>>> on
> > > >>>>>>>>>>>>>>>> materialised
> > > >>>>>>>>>>>>>>>>>>>>> and not
> > > >>>>>>>>>>>>>>>>>>>>>>>> materialised view at the same time for
> whatever
> > > >>>>>>> reasons
> > > >>>>>>>>>>>>>>>> (underlying
> > > >>>>>>>>>>>>>>>>>>>>> data
> > > >>>>>>>>>>>>>>>>>>>>>>>> changing/better optimisation opportunities
> after
> > > >>>>>>>> pushing
> > > >>>>>>>>>> down
> > > >>>>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>>> filters
> > > >>>>>>>>>>>>>>>>>>>>>>>> etc). For example:
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Table b = …;
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> MaterlizedTable mb = b.materialize();
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Val min = mb.min();
> > > >>>>>>>>>>>>>>>>>>>>>>>> Val max = mb.max();
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42);
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Could be more efficient compared to
> `b.cache()`
> > > >>>>> if
> > > >>>>>>>>>>>>>>>> `filter(‘userId
> > > >>>>>>>>>>>>>>>>>>> =
> > > >>>>>>>>>>>>>>>>>>>>>>>> 42);` allows for much more aggressive
> > > >>>>>> optimisations.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <
> > > >>>>>>>>>> fhueske@xxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> I'm not suggesting to add support for Ignite.
> > > >>>>> This
> > > >>>>>>> was
> > > >>>>>>>>>> just
> > > >>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>>>> example.
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Plasma and Arrow sound interesting, too.
> > > >>>>>>>>>>>>>>>>>>>>>>>>> For the sake of this proposal, it would be up
> > to
> > > >>>>>> the
> > > >>>>>>>>> user
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>> implement a
> > > >>>>>>>>>>>>>>>>>>>>>>>>> TableFactory and corresponding TableSource /
> > > >>>>>>> TableSink
> > > >>>>>>>>>>> classes
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>> persist
> > > >>>>>>>>>>>>>>>>>>>>>>>>> and read the data.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb
> > > >>>>> Flavio
> > > >>>>>>>>>>> Pompermaier
> > > >>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>>>>>>>>>>>> pompermaier@xxxxxxxx>:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow
> > as
> > > >>>>>> an
> > > >>>>>>>>>>>>> alternative
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>> Apache
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Ignite?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>
> > >
> https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian
> Hueske
> > > >>>>> <
> > > >>>>>>>>>>>>>>>>>>> fhueske@xxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal!
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> To summarize, you propose a new method
> > > >>>>>>>> Table.cache():
> > > >>>>>>>>>>> Table
> > > >>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>>> will
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> trigger a job and write the result into
> some
> > > >>>>>>>> temporary
> > > >>>>>>>>>>>>> storage
> > > >>>>>>>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>>>>>>> defined
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> by a TableFactory.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> The cache() call blocks while the job is
> > > >>>>> running
> > > >>>>>>> and
> > > >>>>>>>>>>>>>>>> eventually
> > > >>>>>>>>>>>>>>>>>>>>>>>> returns a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Table object that represents a scan of the
> > > >>>>>>> temporary
> > > >>>>>>>>>>> table.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> When the "session" is closed (closing to be
> > > >>>>>>>> defined?),
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>> temporary
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> tables
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> are all dropped.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think this behavior makes sense and is a
> > > >>>>> good
> > > >>>>>>>> first
> > > >>>>>>>>>> step
> > > >>>>>>>>>>>>>>>>>>> towards
> > > >>>>>>>>>>>>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> interactive workloads.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, its performance suffers from
> writing
> > > >>>>> to
> > > >>>>>>> and
> > > >>>>>>>>>>> reading
> > > >>>>>>>>>>>>>>>>>>> from
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> external
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> systems.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think this is OK for now. Changes that
> > would
> > > >>>>>>>>>>> significantly
> > > >>>>>>>>>>>>>>>>>>>> improve
> > > >>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory
> > across
> > > >>>>>>> jobs)
> > > >>>>>>>>>> would
> > > >>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>>>> large
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> impacts on many components of Flink.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Users could use in-memory filesystems or
> > > >>>>> storage
> > > >>>>>>>> grids
> > > >>>>>>>>>>>>> (Apache
> > > >>>>>>>>>>>>>>>>>>>>>>>> Ignite) to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> mitigate some of the performance effects.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb
> > > >>>>>> Becket
> > > >>>>>>>> Qin
> > > >>>>>>>>> <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> :
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there any extra thing user can do on a
> > > >>>>>>>>>>> MaterializedTable
> > > >>>>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>>> they
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> cannot do on a Table? After users call
> > > >>>>>>>>> *table.cache(),
> > > >>>>>>>>>>>>> *users
> > > >>>>>>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>>>>>>> just
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> use
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> that table and do anything that is
> supported
> > > >>>>>> on a
> > > >>>>>>>>>> Table,
> > > >>>>>>>>>>>>>>>>>>>> including
> > > >>>>>>>>>>>>>>>>>>>>>>>> SQL.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Naming wise, either cache() or
> materialize()
> > > >>>>>>> sounds
> > > >>>>>>>>>> fine
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> me.
> > > >>>>>>>>>>>>>>>>>>>>>>>> cache()
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> a bit more general than materialize().
> Given
> > > >>>>>> that
> > > >>>>>>>> we
> > > >>>>>>>>>> are
> > > >>>>>>>>>>>>>>>>>>>> enhancing
> > > >>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Table API to also support non-relational
> > > >>>>>>> processing
> > > >>>>>>>>>>> cases,
> > > >>>>>>>>>>>>>>>>>>>> cache()
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> might
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> slightly better.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr
> > > >>>>>> Nowojski <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Becket,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you
> intend
> > > >>>>> to
> > > >>>>>>>> reuse
> > > >>>>>>>>>>>>> existing
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I
> > > >>>>>> assumed
> > > >>>>>>>> that
> > > >>>>>>>>>> you
> > > >>>>>>>>>>>>>>>> want
> > > >>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> provide
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternate way of writing the data.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Now that I hopefully understand the
> > > >>>>> proposal,
> > > >>>>>>>> maybe
> > > >>>>>>>>> we
> > > >>>>>>>>>>>>> could
> > > >>>>>>>>>>>>>>>>>>>>> rename
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> `cache()` to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> void materialize()
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or going step further
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MaterializedTable materialize()
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MaterializedTable
> createMaterializedView()
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> The second option with returning a
> handle I
> > > >>>>>>> think
> > > >>>>>>>> is
> > > >>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>> flexible
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could provide features such as
> > > >>>>>>> “refresh”/“delete”
> > > >>>>>>>> or
> > > >>>>>>>>>>>>>>>> generally
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> speaking
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> manage the the view. In the future we
> could
> > > >>>>>> also
> > > >>>>>>>>> think
> > > >>>>>>>>>>>>> about
> > > >>>>>>>>>>>>>>>>>>>>> adding
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> hooks
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is
> > > >>>>> also
> > > >>>>>>> more
> > > >>>>>>>>>>>>> explicit
> > > >>>>>>>>>>>>>>>> -
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialization returning a new table
> > handle
> > > >>>>>>> will
> > > >>>>>>>>> not
> > > >>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> same
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implicit side effects as adding a simple
> > > >>>>> line
> > > >>>>>> of
> > > >>>>>>>>> code
> > > >>>>>>>>>>> like
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> `b.cache()`
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would have.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would also be more SQL like, making it
> > > >>>>> more
> > > >>>>>>>>>> intuitive
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>> users
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> already
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> familiar with the SQL.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin <
> > > >>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it
> is
> > > >>>>>>>>> equivalent
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>> creating
> > > >>>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> BUILT-IN
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That
> > > >>>>>>>>>> functionality
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>>>> missing
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> today,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> though. Not sure if I understand your
> > > >>>>>> question.
> > > >>>>>>>> Do
> > > >>>>>>>>>> you
> > > >>>>>>>>>>>>> mean
> > > >>>>>>>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> already
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the functionality and just need a syntax
> > > >>>>>> sugar?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's more interesting in the proposal
> is
> > > >>>>> do
> > > >>>>>>> we
> > > >>>>>>>>> want
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> stop
> > > >>>>>>>>>>>>>>>>>>>> at
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> creating
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to
> > > >>>>>> extend
> > > >>>>>>>> that
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>> future
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful unified data store distributed
> with
> > > >>>>>>> Flink?
> > > >>>>>>>>> And
> > > >>>>>>>>>>> do
> > > >>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>>>> want
> > > >>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job
> > > >>>>>> pattern
> > > >>>>>>>> with
> > > >>>>>>>>>>> their
> > > >>>>>>>>>>>>>>>> own
> > > >>>>>>>>>>>>>>>>>>>>> user
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> defined
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> services. These considerations are much
> > > >>>>> more
> > > >>>>>>>>>>>>> architectural.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr
> > > >>>>>> Nowojski
> > > >>>>>>> <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to
> > understand
> > > >>>>>> the
> > > >>>>>>>>>>> problem.
> > > >>>>>>>>>>>>>>>>>>> Isn’t
> > > >>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing
> > > >>>>> data
> > > >>>>>>> to
> > > >>>>>>>> a
> > > >>>>>>>>>> sink
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>> later
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> reading
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited
> > > >>>>> live
> > > >>>>>>>>>> scope/live
> > > >>>>>>>>>>>>>>>> time?
> > > >>>>>>>>>>>>>>>>>>>> And
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a
> > > >>>>> file
> > > >>>>>>>> sink?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If so, what’s the problem with
> creating a
> > > >>>>>>>>>> materialised
> > > >>>>>>>>>>>>>>>> view
> > > >>>>>>>>>>>>>>>>>>>>> from a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and
> > > >>>>>> reusing
> > > >>>>>>>>> this
> > > >>>>>>>>>>>>>>>>>>>> materialised
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> view
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms
> to
> > > >>>>>>> clean
> > > >>>>>>>> up
> > > >>>>>>>>>>>>>>>>>>>> materialised
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> views
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (for
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example when current session finishes)?
> > > >>>>>> Maybe
> > > >>>>>>> we
> > > >>>>>>>>>> need
> > > >>>>>>>>>>>>> some
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> syntactic
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sugar
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on top of it?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin <
> > > >>>>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a
> > > >>>>>>> persist()
> > > >>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> lifecycle/defined
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the
> > > >>>>> future
> > > >>>>>>>> work
> > > >>>>>>>>>> for
> > > >>>>>>>>>>>>>>>> this.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM
> jincheng
> > > >>>>>> sun
> > > >>>>>>> <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about
> the
> > > >>>>>> name
> > > >>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>> `cache()`, I
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you designed this way!
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can
> specify
> > a
> > > >>>>>>>>> lifecycle
> > > >>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>> data
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> persistence?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, persist
> > > >>>>> (LifeCycle.SESSION),
> > > >>>>>> so
> > > >>>>>>>>> that
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> user
> > > >>>>>>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worried
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly
> > > >>>>> specify
> > > >>>>>>> the
> > > >>>>>>>>> time
> > > >>>>>>>>>>>>> range
> > > >>>>>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> keeping
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to
> expand,
> > > >>>>> we
> > > >>>>>>> can
> > > >>>>>>>>>> also
> > > >>>>>>>>>>>>>>>> share
> > > >>>>>>>>>>>>>>>>>>>> in a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> certain
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> group of session, for example:
> > > >>>>>>>>>>>>>>>>>>>> LifeCycle.SESSION_GROUP(...), I
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> am
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for
> > > >>>>> reference
> > > >>>>>>>> only!
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx>
> > > >>>>>>>>> 于2018年11月23日周五
> > > >>>>>>>>>>>>>>>>>>> 下午1:33写道:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding
> > > >>>>>> cache()
> > > >>>>>>>> v.s.
> > > >>>>>>>>>>>>>>>>>>> persist(),
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> personally I
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately
> > > >>>>>>> describing
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> behavior,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> i.e.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will
> be
> > > >>>>>>>> deleted
> > > >>>>>>>>>>> after
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> session
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> closed.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading
> as
> > > >>>>>>> people
> > > >>>>>>>>>> might
> > > >>>>>>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still be there even after the
> session
> > > >>>>> is
> > > >>>>>>>> gone.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch
> and
> > > >>>>>>> stream
> > > >>>>>>>>>>>>>>>> processing
> > > >>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> same
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards
> that
> > > >>>>>>> goal.
> > > >>>>>>>> I
> > > >>>>>>>>>>>>> imagine
> > > >>>>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> would
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change across the board, including
> > > >>>>>> sources,
> > > >>>>>>>>>>> operators
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need
> several
> > > >>>>>>>> separate
> > > >>>>>>>>>>>>>>>> in-depth
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM
> > Xingcan
> > > >>>>>>> Cui <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> xingcanc@xxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or
> > > >>>>>> access
> > > >>>>>>>>>> domain
> > > >>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>>>>> both
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially,
> this
> > > >>>>> may
> > > >>>>>>> be
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>> first
> > > >>>>>>>>>>>>>>>>>>> time
> > > >>>>>>>>>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> plan
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism
> > > >>>>>> other
> > > >>>>>>>> than
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> state.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Maybe
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> it’s
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and
> then
> > > >>>>>>>>> concentrate
> > > >>>>>>>>>>> on
> > > >>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>> specific
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> part?
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more
> > > >>>>>> concerned
> > > >>>>>>>>> with
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> underlying
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> service.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major
> change
> > > >>>>> to
> > > >>>>>>> the
> > > >>>>>>>>>>>>> existing
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> codebase.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> As
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be
> > > >>>>>> extendible
> > > >>>>>>> to
> > > >>>>>>>>>>> support
> > > >>>>>>>>>>>>>>>>>>> other
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> components
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another
> > > >>>>>>> thread.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy
> the
> > > >>>>>> more
> > > >>>>>>>>>>>>> interactive
> > > >>>>>>>>>>>>>>>>>>>> Table
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> API,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough
> > > >>>>> service
> > > >>>>>>>>>>> mechanism.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xingcan
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM,
> > Xiaowei
> > > >>>>>>>> Jiang <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> xiaoweij@xxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp
> > > >>>>>> table
> > > >>>>>>>> for
> > > >>>>>>>>>>> clean
> > > >>>>>>>>>>>>> up
> > > >>>>>>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> very
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reliable.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will
> > be
> > > >>>>>>>>> executed
> > > >>>>>>>>>>>>>>>>>>>>> successfully.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> We
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> may
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> risk
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think
> > that
> > > >>>>>>> it's
> > > >>>>>>>>>> safer
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> association
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id.
> > So
> > > >>>>>> we
> > > >>>>>>>> can
> > > >>>>>>>>>>> always
> > > >>>>>>>>>>>>>>>>>>> clean
> > > >>>>>>>>>>>>>>>>>>>>> up
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> temp
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tables
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated
> with
> > > >>>>> any
> > > >>>>>>>>> active
> > > >>>>>>>>>>>>>>>>>>> sessions.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM
> > > >>>>>> jincheng
> > > >>>>>>>>> sun <
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great
> > > >>>>>>> proposal!
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very
> > > >>>>> useful
> > > >>>>>>> and
> > > >>>>>>>>>> user
> > > >>>>>>>>>>>>>>>>>>> friendly
> > > >>>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> case
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> examples.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a
> > business
> > > >>>>>> has
> > > >>>>>>>> to
> > > >>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>> executed
> > > >>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> several
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stages
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the
> > > >>>>> pipeline
> > > >>>>>>> of
> > > >>>>>>>>>> Flink
> > > >>>>>>>>>>>>> ML,
> > > >>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>> order
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> utilize
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results
> we
> > > >>>>>> have
> > > >>>>>>>> to
> > > >>>>>>>>>>>>> submit a
> > > >>>>>>>>>>>>>>>>>>> job
> > > >>>>>>>>>>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> env.execute().
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()`  , I think is
> > > >>>>>> better
> > > >>>>>>>> to
> > > >>>>>>>>>>> named
> > > >>>>>>>>>>>>>>>>>>>>>>>>>> `persist()`,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> And
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines
> whether
> > > >>>>> we
> > > >>>>>>>>>> internally
> > > >>>>>>>>>>>>>>>> cache
> > > >>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> memory
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> persist
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save
> > the
> > > >>>>>>> data
> > > >>>>>>>>> into
> > > >>>>>>>>>>>>> state
> > > >>>>>>>>>>>>>>>>>>>>> backend
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or
> > > >>>>>>> RocksDBStateBackend
> > > >>>>>>>>>> etc.)
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view
> in
> > > >>>>> the
> > > >>>>>>>>> future,
> > > >>>>>>>>>>>>>>>> support
> > > >>>>>>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same
> > job
> > > >>>>>>> will
> > > >>>>>>>>> also
> > > >>>>>>>>>>>>>>>> benefit
> > > >>>>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "Interactive
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Programming",  I am looking
> forward
> > > >>>>> to
> > > >>>>>>>> your
> > > >>>>>>>>>>> JIRAs
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>> FLIP!
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx
> >
> > > >>>>>>>>>>> 于2018年11月20日周二
> > > >>>>>>>>>>>>>>>>>>>>> 下午9:56写道:
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads
> have
> > > >>>>>>>> pointed
> > > >>>>>>>>>> out,
> > > >>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>>>>> is a
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> promising
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink
> Table
> > > >>>>>> API
> > > >>>>>>> in
> > > >>>>>>>>>>> various
> > > >>>>>>>>>>>>>>>>>>>>> aspects,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> including
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use
> > among
> > > >>>>>>>> others.
> > > >>>>>>>>>> One
> > > >>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> scenarios
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is
> > > >>>>>> interactive
> > > >>>>>>>>>>>>>>>> programming.
> > > >>>>>>>>>>>>>>>>>>> To
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> explain
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on
> > > >>>>> the
> > > >>>>>>>>>> solution,
> > > >>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>> put
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> together
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our
> > > >>>>> proposal.
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very
> > > >>>>>> welcome!
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>>
> > >
> > >
> >
>


-- 
Best Regards

Jeff Zhang