osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Support Interactive Programming in Flink Table API


Hi Till and Piotrek,

Thanks for the clarification. That solves quite a few confusion. My
understanding of how cache works is same as what Till describe. i.e.
cache() is a hint to Flink, but it is not guaranteed that cache always
exist and it might be recomputed from its lineage.

Is this the core of our disagreement here? That you would like this
> “cache()” to be mostly hint for the optimiser?

Semantic wise, yes. That's also why I think materialize() has a much larger
scope than cache(), thus it should be a different method.

Regarding the chance of optimization, it might not be that rare. Some very
simple statistics could already help in many cases. For example, simply
maintaining max and min of each fields can already eliminate some
unnecessary table scan (potentially scanning the cached table) if the
result is doomed to be empty. A histogram would give even further
information. The optimizer could be very careful and only ignores cache
when it is 100% sure doing that is cheaper. e.g. only when a filter on the
cache will absolutely return nothing.

Given the above clarification on cache, I would like to revisit the
original "void cache()" proposal and see if we can improve on top of that.

What do you think about the following modified interface?

Table {
  /**
   * This call hints Flink to maintain a cache of this table and leverage
it for performance optimization if needed.
   * Note that Flink may still decide to not use the cache if it is cheaper
by doing so.
   *
   * A CacheHandle will be returned to allow user release the cache
actively. The cache will be deleted if there
   * is no unreleased cache handlers to it. When the TableEnvironment is
closed. The cache will also be deleted
   * and all the cache handlers will be released.
   *
   * @return a CacheHandle referring to the cache of this table.
   */
  CacheHandle cache();
}

CacheHandle {
  /**
   * Close the cache handle. This method does not necessarily deletes the
cache. Instead, it simply decrements the reference counter to the cache.
   * When the there is no handle referring to a cache. The cache will be
deleted.
   *
   * @return the number of open handles to the cache after this handle has
been released.
   */
  int release()
}

The rationale behind this interface is following:
In vast majority of the cases, users wouldn't really care whether the cache
is used or not. So I think the most intuitive way is letting cache() return
nothing. So nobody needs to worry about the difference between operations
on CacheTables and those on the "original" tables. This will make maybe
99.9% of the users happy. There were two concerns raised for this approach:
1. In some rare cases, users may want to ignore cache,
2. A table might be cached/uncached in a third party function while the
caller does not know.

For the first issue, users can use hint("ignoreCache") to explicitly ignore
cache.
For the second issue, the above proposal lets cache() return a CacheHandle,
the only method in it is release(). Different CacheHandles will refer to
the same cache, if a cache no longer has any cache handle, it will be
deleted. This will address the following case:
{
  val handle1 = a.cache()
  process(a)
  a.select(...) // cache is still available, handle1 has not been released.
}

void process(Table t) {
  val handle2 = t.cache() // new handle to cache
  t.select(...) // optimizer decides cache usage
  t.hint("ignoreCache").select(...) // cache is ignored
  handle2.release() // release the handle, but the cache may still be
available if there are other handles
  ...
}

Does the above modified approach look reasonable to you?

Cheers,

Jiangjie (Becket) Qin







On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:

> Hi Becket,
>
> I was aiming at semantics similar to 1. I actually thought that `cache()`
> would tell the system to materialize the intermediate result so that
> subsequent queries don't need to reprocess it. This means that the usage of
> the cached table in this example
>
> {
>  val cachedTable = a.cache()
>  val b1 = cachedTable.select(…)
>  val b2 = cachedTable.foo().select(…)
>  val b3 = cachedTable.bar().select(...)
>  val c1 = a.select(…)
>  val c2 = a.foo().select(…)
>  val c3 = a.bar().select(...)
> }
>
> strongly depends on interleaved calls which trigger the execution of sub
> queries. So for example, if there is only a single env.execute call at the
> end of  block, then b1, b2, b3, c1, c2 and c3 would all be computed by
> reading directly from the sources (given that there is only a single
> JobGraph). It just happens that the result of `a` will be cached such that
> we skip the processing of `a` when there are subsequent queries reading
> from `cachedTable`. If for some reason the system cannot materialize the
> table (e.g. running out of disk space, ttl expired), then it could also
> happen that we need to reprocess `a`. In that sense `cachedTable` simply is
> an identifier for the materialized result of `a` with the lineage how to
> reprocess it.
>
> Cheers,
> Till
>
>
>
>
>
> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
> wrote:
>
> > Hi Becket,
> >
> > > {
> > >  val cachedTable = a.cache()
> > >  val b = cachedTable.select(...)
> > >  val c = a.select(...)
> > > }
> > >
> > > Semantic 1. b uses cachedTable as user demanded so. c uses original DAG
> > as
> > > user demanded so. In this case, the optimizer has no chance to
> optimize.
> > > Semantic 2. b uses cachedTable as user demanded so. c leaves the
> > optimizer
> > > to choose whether the cache or DAG should be used. In this case, user
> > lose
> > > the option to NOT use cache.
> > >
> > > As you can see, neither of the options seem perfect. However, I guess
> you
> > > and Till are proposing the third option:
> > >
> > > Semantic 3. b leaves the optimizer to choose whether cache or DAG
> should
> > be
> > > used. c always use the DAG.
> >
> > I am pretty sure that me, Till, Fabian and others were all proposing and
> > advocating in favour of semantic “1”. No cost based optimiser decisions
> at
> > all.
> >
> > {
> >  val cachedTable = a.cache()
> >  val b1 = cachedTable.select(…)
> >  val b2 = cachedTable.foo().select(…)
> >  val b3 = cachedTable.bar().select(...)
> >  val c1 = a.select(…)
> >  val c2 = a.foo().select(…)
> >  val c3 = a.bar().select(...)
> > }
> >
> > All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are
> > re-executing whole plan for “a”.
> >
> > In the future we could discuss going one step further, introducing some
> > global optimisation (that can be manually enabled/disabled): deduplicate
> > plan nodes/deduplicate sub queries/re-use sub queries results/or whatever
> > we could call it. It could do two things:
> >
> > 1. Automatically try to deduplicate fragments of the plan and share the
> > result using CachedTable - in other words automatically insert
> `CachedTable
> > cache()` calls.
> > 2. Automatically make decision to bypass explicit `CachedTable` access
> > (this would be the equivalent of what you described as “semantic 3”).
> >
> > However as I wrote previously, I have big doubts if such cost-based
> > optimisation would work (this applies also to “Semantic 2”). I would
> expect
> > it to do more harm than good in so many cases, that it wouldn’t make
> sense.
> > Even assuming that we calculate statistics perfectly (this ain’t gonna
> > happen), it’s virtually impossible to correctly estimate correct exchange
> > rate of CPU cycles vs IO operations as it is changing so much from
> > deployment to deployment.
> >
> > Is this the core of our disagreement here? That you would like this
> > “cache()” to be mostly hint for the optimiser?
> >
> > Piotrek
> >
> > > On 11 Dec 2018, at 06:00, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> > >
> > > Another potential concern for semantic 3 is that. In the future, we may
> > add
> > > automatic caching to Flink. e.g. cache the intermediate results at the
> > > shuffle boundary. If our semantic is that reference to the original
> table
> > > means skipping cache, those users may not be able to benefit from the
> > > implicit cache.
> > >
> > >
> > >
> > > On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <becket.qin@xxxxxxxxx>
> > wrote:
> > >
> > >> Hi Piotrek,
> > >>
> > >> Thanks for the reply. Thought about it again, I might have
> misunderstood
> > >> your proposal in earlier emails. Returning a CachedTable might not be
> a
> > bad
> > >> idea.
> > >>
> > >> I was more concerned about the semantic and its intuitiveness when a
> > >> CachedTable is returned. i..e, if cache() returns CachedTable. What
> are
> > the
> > >> semantic in the following code:
> > >> {
> > >>  val cachedTable = a.cache()
> > >>  val b = cachedTable.select(...)
> > >>  val c = a.select(...)
> > >> }
> > >> What is the difference between b and c? At the first glance, I see two
> > >> options:
> > >>
> > >> Semantic 1. b uses cachedTable as user demanded so. c uses original
> DAG
> > as
> > >> user demanded so. In this case, the optimizer has no chance to
> optimize.
> > >> Semantic 2. b uses cachedTable as user demanded so. c leaves the
> > optimizer
> > >> to choose whether the cache or DAG should be used. In this case, user
> > lose
> > >> the option to NOT use cache.
> > >>
> > >> As you can see, neither of the options seem perfect. However, I guess
> > you
> > >> and Till are proposing the third option:
> > >>
> > >> Semantic 3. b leaves the optimizer to choose whether cache or DAG
> should
> > >> be used. c always use the DAG.
> > >>
> > >> This does address all the concerns. It is just that from intuitiveness
> > >> perspective, I found that asking user to explicitly use a CachedTable
> > while
> > >> the optimizer might choose to ignore is a little weird. That was why I
> > did
> > >> not think about that semantic. But given there is material benefit, I
> > think
> > >> this semantic is acceptable.
> > >>
> > >> 1. If we want to let optimiser make decisions whether to use cache or
> > not,
> > >>> then why do we need “void cache()” method at all? Would It
> “increase”
> > the
> > >>> chance of using the cache? That’s sounds strange. What would be the
> > >>> mechanism of deciding whether to use the cache or not? If we want to
> > >>> introduce such kind  automated optimisations of “plan nodes
> > deduplication”
> > >>> I would turn it on globally, not per table, and let the optimiser do
> > all of
> > >>> the work.
> > >>> 2. We do not have statistics at the moment for any use/not use cache
> > >>> decision.
> > >>> 3. Even if we had, I would be veeerryy sceptical whether such cost
> > based
> > >>> optimisations would work properly and I would still insist first on
> > >>> providing explicit caching mechanism (`CachedTable cache()`)
> > >>>
> > >> We are absolutely on the same page here. An explicit cache() method is
> > >> necessary not only because optimizer may not be able to make the right
> > >> decision, but also because of the nature of interactive programming.
> For
> > >> example, if users write the following code in Scala shell:
> > >>  val b = a.select(...)
> > >>  val c = b.select(...)
> > >>  val d = c.select(...).writeToSink(...)
> > >>  tEnv.execute()
> > >> There is no way optimizer will know whether b or c will be used in
> later
> > >> code, unless users hint explicitly.
> > >>
> > >> At the same time I’m not sure if you have responded to our objections
> of
> > >>> `void cache()` being implicit/having side effects, which me, Jark,
> > Fabian,
> > >>> Till and I think also Shaoxuan are supporting.
> > >>
> > >> Is there any other side effects if we use semantic 3 mentioned above?
> > >>
> > >> Thanks,
> > >>
> > >> JIangjie (Becket) Qin
> > >>
> > >>
> > >> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <
> piotr@xxxxxxxxxxxxxxxxx
> > >
> > >> wrote:
> > >>
> > >>> Hi Becket,
> > >>>
> > >>> Sorry for not responding long time.
> > >>>
> > >>> Regarding case1.
> > >>>
> > >>> There wouldn’t be no “a.unCache()” method, but I would expect only
> > >>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect
> > >>> `cachedTableA2`. Just as in any other database dropping modifying one
> > >>> independent table/materialised view does not affect others.
> > >>>
> > >>>> What I meant is that assuming there is already a cached table,
> ideally
> > >>> users need
> > >>>> not to specify whether the next query should read from the cache or
> > use
> > >>> the
> > >>>> original DAG. This should be decided by the optimizer.
> > >>>
> > >>> 1. If we want to let optimiser make decisions whether to use cache or
> > >>> not, then why do we need “void cache()” method at all? Would It
> > “increase”
> > >>> the chance of using the cache? That’s sounds strange. What would be
> the
> > >>> mechanism of deciding whether to use the cache or not? If we want to
> > >>> introduce such kind  automated optimisations of “plan nodes
> > deduplication”
> > >>> I would turn it on globally, not per table, and let the optimiser do
> > all of
> > >>> the work.
> > >>> 2. We do not have statistics at the moment for any use/not use cache
> > >>> decision.
> > >>> 3. Even if we had, I would be veeerryy sceptical whether such cost
> > based
> > >>> optimisations would work properly and I would still insist first on
> > >>> providing explicit caching mechanism (`CachedTable cache()`)
> > >>> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t
> > >>> contradict future work on automated cost based caching.
> > >>>
> > >>>
> > >>> At the same time I’m not sure if you have responded to our objections
> > of
> > >>> `void cache()` being implicit/having side effects, which me, Jark,
> > Fabian,
> > >>> Till and I think also Shaoxuan are supporting.
> > >>>
> > >>> Piotrek
> > >>>
> > >>>> On 5 Dec 2018, at 12:42, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> > >>>>
> > >>>> Hi Till,
> > >>>>
> > >>>> It is true that after the first job submission, there will be no
> > >>> ambiguity
> > >>>> in terms of whether a cached table is used or not. That is the same
> > for
> > >>> the
> > >>>> cache() without returning a CachedTable.
> > >>>>
> > >>>> Conceptually one could think of cache() as introducing a caching
> > >>> operator
> > >>>>> from which you need to consume from if you want to benefit from the
> > >>> caching
> > >>>>> functionality.
> > >>>>
> > >>>> I am thinking a little differently. I think it is a hint (as you
> > >>> mentioned
> > >>>> later) instead of a new operator. I'd like to be careful about the
> > >>> semantic
> > >>>> of the API. A hint is a property set on an existing operator, but is
> > not
> > >>>> itself an operator as it does not really manipulate the data.
> > >>>>
> > >>>> I agree, ideally the optimizer makes this kind of decision which
> > >>>>> intermediate result should be cached. But especially when executing
> > >>> ad-hoc
> > >>>>> queries the user might better know which results need to be cached
> > >>> because
> > >>>>> Flink might not see the full DAG. In that sense, I would consider
> the
> > >>>>> cache() method as a hint for the optimizer. Of course, in the
> future
> > we
> > >>>>> might add functionality which tries to automatically cache results
> > >>> (e.g.
> > >>>>> caching the latest intermediate results until so and so much space
> is
> > >>>>> used). But this should hopefully not contradict with `CachedTable
> > >>> cache()`.
> > >>>>
> > >>>> I agree that cache() method is needed for exactly the reason you
> > >>> mentioned,
> > >>>> i.e. Flink cannot predict what users are going to write later, so
> > users
> > >>>> need to tell Flink explicitly that this table will be used later.
> > What I
> > >>>> meant is that assuming there is already a cached table, ideally
> users
> > >>> need
> > >>>> not to specify whether the next query should read from the cache or
> > use
> > >>> the
> > >>>> original DAG. This should be decided by the optimizer.
> > >>>>
> > >>>> To explain the difference between returning / not returning a
> > >>> CachedTable,
> > >>>> I want compare the following two case:
> > >>>>
> > >>>> *Case 1:  returning a CachedTable*
> > >>>> b = a.map(...)
> > >>>> val cachedTableA1 = a.cache()
> > >>>> val cachedTableA2 = a.cache()
> > >>>> b.print() // Just to make sure a is cached.
> > >>>>
> > >>>> c = a.filter(...) // User specify that the original DAG is used? Or
> > the
> > >>>> optimizer decides whether DAG or cache should be used?
> > >>>> d = cachedTableA1.filter() // User specify that the cached table is
> > >>> used.
> > >>>>
> > >>>> a.unCache() // Can cachedTableA still be used afterwards?
> > >>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?
> > >>>>
> > >>>> *Case 2: not returning a CachedTable*
> > >>>> b = a.map()
> > >>>> a.cache()
> > >>>> a.cache() // no-op
> > >>>> b.print() // Just to make sure a is cached
> > >>>>
> > >>>> c = a.filter(...) // Optimizer decides whether the cache or DAG
> should
> > >>> be
> > >>>> used
> > >>>> d = a.filter(...) // Optimizer decides whether the cache or DAG
> should
> > >>> be
> > >>>> used
> > >>>>
> > >>>> a.unCache()
> > >>>> a.unCache() // no-op
> > >>>>
> > >>>> In case 1, semantic wise, optimizer lose the option to choose
> between
> > >>> DAG
> > >>>> and cache. And the unCache() call becomes tricky.
> > >>>> In case 2, users do not need to worry about whether cache or DAG is
> > >>> used.
> > >>>> And the unCache() semantic is clear. However, the caveat is that
> users
> > >>>> cannot explicitly ignore the cache.
> > >>>>
> > >>>> In order to address the issues mentioned in case 2 and inspired by
> the
> > >>>> discussion so far, I am thinking about using hint to allow user
> > >>> explicitly
> > >>>> ignore cache. Although we do not have hint yet, but we probably
> should
> > >>> have
> > >>>> one. So the code becomes:
> > >>>>
> > >>>> *Case 3: returning this table*
> > >>>> b = a.map()
> > >>>> a.cache()
> > >>>> a.cache() // no-op
> > >>>> b.print() // Just to make sure a is cached
> > >>>>
> > >>>> c = a.filter(...) // Optimizer decides whether the cache or DAG
> should
> > >>> be
> > >>>> used
> > >>>> d = a.hint("ignoreCache").filter(...) // DAG will be used instead of
> > the
> > >>>> cache.
> > >>>>
> > >>>> a.unCache()
> > >>>> a.unCache() // no-op
> > >>>>
> > >>>> We could also let cache() return this table to allow chained method
> > >>> calls.
> > >>>> Do you think this API addresses the concerns?
> > >>>>
> > >>>> Thanks,
> > >>>>
> > >>>> Jiangjie (Becket) Qin
> > >>>>
> > >>>>
> > >>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imjark@xxxxxxxxx> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> All the recent discussions are focused on whether there is a
> problem
> > if
> > >>>>> cache() not return a Table.
> > >>>>> It seems that returning a Table explicitly is more clear (and
> safe?).
> > >>>>>
> > >>>>> So whether there are any problems if cache() returns a Table?
> > @Becket
> > >>>>>
> > >>>>> Best,
> > >>>>> Jark
> > >>>>>
> > >>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <trohrmann@xxxxxxxxxx>
> > >>> wrote:
> > >>>>>
> > >>>>>> It's true that b, c, d and e will all read from the original DAG
> > that
> > >>>>>> generates a. But all subsequent operators (when running multiple
> > >>> queries)
> > >>>>>> which reference cachedTableA should not need to reproduce `a` but
> > >>>>> directly
> > >>>>>> consume the intermediate result.
> > >>>>>>
> > >>>>>> Conceptually one could think of cache() as introducing a caching
> > >>> operator
> > >>>>>> from which you need to consume from if you want to benefit from
> the
> > >>>>> caching
> > >>>>>> functionality.
> > >>>>>>
> > >>>>>> I agree, ideally the optimizer makes this kind of decision which
> > >>>>>> intermediate result should be cached. But especially when
> executing
> > >>>>> ad-hoc
> > >>>>>> queries the user might better know which results need to be cached
> > >>>>> because
> > >>>>>> Flink might not see the full DAG. In that sense, I would consider
> > the
> > >>>>>> cache() method as a hint for the optimizer. Of course, in the
> future
> > >>> we
> > >>>>>> might add functionality which tries to automatically cache results
> > >>> (e.g.
> > >>>>>> caching the latest intermediate results until so and so much space
> > is
> > >>>>>> used). But this should hopefully not contradict with `CachedTable
> > >>>>> cache()`.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Till
> > >>>>>>
> > >>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <becket.qin@xxxxxxxxx>
> > >>> wrote:
> > >>>>>>
> > >>>>>>> Hi Till,
> > >>>>>>>
> > >>>>>>> Thanks for the clarification. I am still a little confused.
> > >>>>>>>
> > >>>>>>> If cache() returns a CachedTable, the example might become:
> > >>>>>>>
> > >>>>>>> b = a.map(...)
> > >>>>>>> c = a.map(...)
> > >>>>>>>
> > >>>>>>> cachedTableA = a.cache()
> > >>>>>>> d = cachedTableA.map(...)
> > >>>>>>> e = a.map()
> > >>>>>>>
> > >>>>>>> In the above case, if cache() is lazily evaluated, b, c, d and e
> > are
> > >>>>> all
> > >>>>>>> going to be reading from the original DAG that generates a. But
> > with
> > >>> a
> > >>>>>>> naive expectation, d should be reading from the cache. This seems
> > not
> > >>>>>>> solving the potential confusion you raised, right?
> > >>>>>>>
> > >>>>>>> Just to be clear, my understanding are all based on the
> assumption
> > >>> that
> > >>>>>> the
> > >>>>>>> tables are immutable. Therefore, after a.cache(), a the
> > >>> c*achedTableA*
> > >>>>>> and
> > >>>>>>> original table *a * should be completely interchangeable.
> > >>>>>>>
> > >>>>>>> That said, I think a valid argument is optimization. There are
> > indeed
> > >>>>>> cases
> > >>>>>>> that reading from the original DAG could be faster than reading
> > from
> > >>>>> the
> > >>>>>>> cache. For example, in the following example:
> > >>>>>>>
> > >>>>>>> a.filter(f1' > 100)
> > >>>>>>> a.cache()
> > >>>>>>> b = a.filter(f1' < 100)
> > >>>>>>>
> > >>>>>>> Ideally the optimizer should be intelligent enough to decide
> which
> > >>> way
> > >>>>> is
> > >>>>>>> faster, without user intervention. In this case, it will identify
> > >>> that
> > >>>>> b
> > >>>>>>> would just be an empty table, thus skip reading from the cache
> > >>>>>> completely.
> > >>>>>>> But I agree that returning a CachedTable would give user the
> > control
> > >>> of
> > >>>>>>> when to use cache, even though I still feel that letting the
> > >>> optimizer
> > >>>>>>> handle this is a better option in long run.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>>
> > >>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <
> trohrmann@xxxxxxxxxx
> > >
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Yes you are right Becket that it still depends on the actual
> > >>>>> execution
> > >>>>>> of
> > >>>>>>>> the job whether a consumer reads from a cached result or not.
> > >>>>>>>>
> > >>>>>>>> My point was actually about the properties of a (cached vs.
> > >>>>> non-cached)
> > >>>>>>> and
> > >>>>>>>> not about the execution. I would not make cache trigger the
> > >>> execution
> > >>>>>> of
> > >>>>>>>> the job because one loses some flexibility by eagerly triggering
> > the
> > >>>>>>>> execution.
> > >>>>>>>>
> > >>>>>>>> I tried to argue for an explicit CachedTable which is returned
> by
> > >>> the
> > >>>>>>>> cache() method like Piotr did in order to make the API more
> > >>> explicit.
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Till
> > >>>>>>>>
> > >>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <becket.qin@xxxxxxxxx
> >
> > >>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Till,
> > >>>>>>>>>
> > >>>>>>>>> That is a good example. Just a minor correction, in this case,
> > b, c
> > >>>>>>> and d
> > >>>>>>>>> will all consume from a non-cached a. This is because cache
> will
> > >>>>> only
> > >>>>>>> be
> > >>>>>>>>> created on the very first job submission that generates the
> table
> > >>>>> to
> > >>>>>> be
> > >>>>>>>>> cached.
> > >>>>>>>>>
> > >>>>>>>>> If I understand correctly, this is example is about whether
> > >>>>> .cache()
> > >>>>>>>> method
> > >>>>>>>>> should be eagerly evaluated or lazily evaluated. In another
> word,
> > >>>>> if
> > >>>>>>>>> cache() method actually triggers a job that creates the cache,
> > >>>>> there
> > >>>>>>> will
> > >>>>>>>>> be no such confusion. Is that right?
> > >>>>>>>>>
> > >>>>>>>>> In the example, although d will not consume from the cached
> Table
> > >>>>>> while
> > >>>>>>>> it
> > >>>>>>>>> looks supposed to, from correctness perspective the code will
> > still
> > >>>>>>>> return
> > >>>>>>>>> correct result, assuming that tables are immutable.
> > >>>>>>>>>
> > >>>>>>>>> Personally I feel it is OK because users probably won't really
> > >>>>> worry
> > >>>>>>>> about
> > >>>>>>>>> whether the table is cached or not. And lazy cache could avoid
> > some
> > >>>>>>>>> unnecessary caching if a cached table is never created in the
> > user
> > >>>>>>>>> application. But I am not opposed to do eager evaluation of
> > cache.
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>>
> > >>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <
> > >>>>> trohrmann@xxxxxxxxxx>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Another argument for Piotr's point is that lazily changing
> > >>>>>> properties
> > >>>>>>>> of
> > >>>>>>>>> a
> > >>>>>>>>>> node affects all down stream consumers but does not
> necessarily
> > >>>>>> have
> > >>>>>>> to
> > >>>>>>>>>> happen before these consumers are defined. From a user's
> > >>>>>> perspective
> > >>>>>>>> this
> > >>>>>>>>>> can be quite confusing:
> > >>>>>>>>>>
> > >>>>>>>>>> b = a.map(...)
> > >>>>>>>>>> c = a.map(...)
> > >>>>>>>>>>
> > >>>>>>>>>> a.cache()
> > >>>>>>>>>> d = a.map(...)
> > >>>>>>>>>>
> > >>>>>>>>>> now b, c and d will consume from a cached operator. In this
> > case,
> > >>>>>> the
> > >>>>>>>>> user
> > >>>>>>>>>> would most likely expect that only d reads from a cached
> result.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Till
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <
> > >>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hey Shaoxuan and Becket,
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Can you explain a bit more one what are the side effects? So
> > >>>>>> far
> > >>>>>>> my
> > >>>>>>>>>>>> understanding is that such side effects only exist if a
> table
> > >>>>>> is
> > >>>>>>>>>> mutable.
> > >>>>>>>>>>>> Is that the case?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Not only that. There are also performance implications and
> > >>>>> those
> > >>>>>>> are
> > >>>>>>>>>>> another implicit side effects of using `void cache()`. As I
> > >>>>> wrote
> > >>>>>>>>> before,
> > >>>>>>>>>>> reading from cache might not always be desirable, thus it can
> > >>>>>> cause
> > >>>>>>>>>>> performance degradation and I’m fine with that - user's or
> > >>>>>>>> optimiser’s
> > >>>>>>>>>>> choice. What I do not like is that this implicit side effect
> > >>>>> can
> > >>>>>>>>> manifest
> > >>>>>>>>>>> in completely different part of code, that wasn’t touched by
> a
> > >>>>>> user
> > >>>>>>>>> while
> > >>>>>>>>>>> he was adding `void cache()` call somewhere else. And even if
> > >>>>>>> caching
> > >>>>>>>>>>> improves performance, it’s still a side effect of `void
> > >>>>> cache()`.
> > >>>>>>>>> Almost
> > >>>>>>>>>>> from the definition `void` methods have only side effects.
> As I
> > >>>>>>> wrote
> > >>>>>>>>>>> before, there are couple of scenarios where this might be
> > >>>>>>> undesirable
> > >>>>>>>>>>> and/or unexpected, for example:
> > >>>>>>>>>>>
> > >>>>>>>>>>> 1.
> > >>>>>>>>>>> Table b = …;
> > >>>>>>>>>>> b.cache()
> > >>>>>>>>>>> x = b.join(…)
> > >>>>>>>>>>> y = b.count()
> > >>>>>>>>>>> // ...
> > >>>>>>>>>>> // 100
> > >>>>>>>>>>> // hundred
> > >>>>>>>>>>> // lines
> > >>>>>>>>>>> // of
> > >>>>>>>>>>> // code
> > >>>>>>>>>>> // later
> > >>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even hidden in a
> > >>>>>>>> different
> > >>>>>>>>>>> method/file/package/dependency
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Table b = ...
> > >>>>>>>>>>> If (some_condition) {
> > >>>>>>>>>>> foo(b)
> > >>>>>>>>>>> }
> > >>>>>>>>>>> Else {
> > >>>>>>>>>>> bar(b)
> > >>>>>>>>>>> }
> > >>>>>>>>>>> z = b.filter(…).groupBy(…)
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Void foo(Table b) {
> > >>>>>>>>>>> b.cache()
> > >>>>>>>>>>> // do something with b
> > >>>>>>>>>>> }
> > >>>>>>>>>>>
> > >>>>>>>>>>> In both above examples, `b.cache()` will implicitly affect
> > >>>>>>> (semantic
> > >>>>>>>>> of a
> > >>>>>>>>>>> program in case of sources being mutable and performance) `z
> =
> > >>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from obvious.
> > >>>>>>>>>>>
> > >>>>>>>>>>> On top of that, there is still this argument of mine that
> > >>>>> having
> > >>>>>> a
> > >>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more flexible
> > >>>>> for
> > >>>>>> us
> > >>>>>>>> for
> > >>>>>>>>>> the
> > >>>>>>>>>>> future and for the user (as a manual option to bypass cache
> > >>>>>> reads).
> > >>>>>>>>>>>
> > >>>>>>>>>>>> But Jiangjie is correct,
> > >>>>>>>>>>>> the source table in batching should be immutable. It is the
> > >>>>>>> user’s
> > >>>>>>>>>>>> responsibility to ensure it, otherwise even a regular
> > >>>>> failover
> > >>>>>>> may
> > >>>>>>>>> lead
> > >>>>>>>>>>>> to inconsistent results.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment should
> > >>>>> be.
> > >>>>>>> But
> > >>>>>>>>> its
> > >>>>>>>>>>> often isn’t and while I’m not trying to fix this (since the
> > >>>>>> proper
> > >>>>>>>> fix
> > >>>>>>>>> is
> > >>>>>>>>>>> to support transactions), I’m just trying to minimise
> confusion
> > >>>>>> for
> > >>>>>>>> the
> > >>>>>>>>>>> users that are not fully aware what’s going on and operate in
> > >>>>>> less
> > >>>>>>>> then
> > >>>>>>>>>>> perfect setup. And if something bites them after adding
> > >>>>>> `b.cache()`
> > >>>>>>>>> call,
> > >>>>>>>>>>> to make sure that they at least know all of the places that
> > >>>>>> adding
> > >>>>>>>> this
> > >>>>>>>>>>> line can affect.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks, Piotrek
> > >>>>>>>>>>>
> > >>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin <becket.qin@xxxxxxxxx>
> > >>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hi Piotrek,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks again for the clarification. Some more replies are
> > >>>>>>>> following.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> But keep in mind that `.cache()` will/might not only be used
> > >>>>> in
> > >>>>>>>>>>> interactive
> > >>>>>>>>>>>>> programming and not only in batching.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It is true. Actually in stream processing, cache() has the
> > >>>>> same
> > >>>>>>>>>> semantic
> > >>>>>>>>>>> as
> > >>>>>>>>>>>> batch processing. The semantic is following:
> > >>>>>>>>>>>> For a table created via a series of computation, save that
> > >>>>>> table
> > >>>>>>>> for
> > >>>>>>>>>>> later
> > >>>>>>>>>>>> reference to avoid running the computation logic to
> > >>>>> regenerate
> > >>>>>>> the
> > >>>>>>>>>> table.
> > >>>>>>>>>>>> Once the application exits, drop all the cache.
> > >>>>>>>>>>>> This semantic is same for both batch and stream processing.
> > >>>>> The
> > >>>>>>>>>>> difference
> > >>>>>>>>>>>> is that stream applications will only run once as they are
> > >>>>> long
> > >>>>>>>>>> running.
> > >>>>>>>>>>>> And the batch applications may be run multiple times, hence
> > >>>>> the
> > >>>>>>>> cache
> > >>>>>>>>>> may
> > >>>>>>>>>>>> be created and dropped each time the application runs.
> > >>>>>>>>>>>> Admittedly, there will probably be some resource management
> > >>>>>>>>>> requirements
> > >>>>>>>>>>>> for the streaming cached table, such as time based / size
> > >>>>> based
> > >>>>>>>>>>> retention,
> > >>>>>>>>>>>> to address the infinite data issue. But such requirement
> does
> > >>>>>> not
> > >>>>>>>>>> change
> > >>>>>>>>>>>> the semantic.
> > >>>>>>>>>>>> You are right that interactive programming is just one use
> > >>>>> case
> > >>>>>>> of
> > >>>>>>>>>>> cache().
> > >>>>>>>>>>>> It is not the only use case.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For me the more important issue is of not having the `void
> > >>>>>>> cache()`
> > >>>>>>>>>> with
> > >>>>>>>>>>>>> side effects.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> This is indeed the key point. The argument around whether
> > >>>>>> cache()
> > >>>>>>>>>> should
> > >>>>>>>>>>>> return something already indicates that cache() and
> > >>>>>> materialize()
> > >>>>>>>>>> address
> > >>>>>>>>>>>> different issues.
> > >>>>>>>>>>>> Can you explain a bit more one what are the side effects? So
> > >>>>>> far
> > >>>>>>> my
> > >>>>>>>>>>>> understanding is that such side effects only exist if a
> table
> > >>>>>> is
> > >>>>>>>>>> mutable.
> > >>>>>>>>>>>> Is that the case?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I don’t know, probably initially we should make CachedTable
> > >>>>>>>>> read-only.
> > >>>>>>>>>> I
> > >>>>>>>>>>>>> don’t find it more confusing than the fact that user can
> not
> > >>>>>>> write
> > >>>>>>>>> to
> > >>>>>>>>>>> views
> > >>>>>>>>>>>>> or materialised views in SQL or that user currently can not
> > >>>>>>> write
> > >>>>>>>>> to a
> > >>>>>>>>>>>>> Table.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I don't think anyone should insert something to a cache. By
> > >>>>>>>>> definition
> > >>>>>>>>>>> the
> > >>>>>>>>>>>> cache should only be updated when the corresponding original
> > >>>>>>> table
> > >>>>>>>> is
> > >>>>>>>>>>>> updated. What I am wondering is that given the following two
> > >>>>>>> facts:
> > >>>>>>>>>>>> 1. If and only if a table is mutable (with something like
> > >>>>>>>> insert()),
> > >>>>>>>>> a
> > >>>>>>>>>>>> CachedTable may have implicit behavior.
> > >>>>>>>>>>>> 2. A CachedTable extends a Table.
> > >>>>>>>>>>>> We can come to the conclusion that a CachedTable is mutable
> > >>>>> and
> > >>>>>>>> users
> > >>>>>>>>>> can
> > >>>>>>>>>>>> insert into the CachedTable directly. This is where I
> thought
> > >>>>>>>>>> confusing.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <
> > >>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > >>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One more
> > >>>>>>>> explanation
> > >>>>>>>>>> why
> > >>>>>>>>>>> I
> > >>>>>>>>>>>>> think `materialize()` is more natural to me is that I think
> > >>>>> of
> > >>>>>>> all
> > >>>>>>>>>>> “Table”s
> > >>>>>>>>>>>>> in Table-API as views. They behave the same way as SQL
> > >>>>> views,
> > >>>>>>> the
> > >>>>>>>>> only
> > >>>>>>>>>>>>> difference for me is that their live scope is short -
> > >>>>> current
> > >>>>>>>>> session
> > >>>>>>>>>>> which
> > >>>>>>>>>>>>> is limited by different execution model. That’s why
> > >>>>> “cashing”
> > >>>>>> a
> > >>>>>>>> view
> > >>>>>>>>>>> for me
> > >>>>>>>>>>>>> is just materialising it.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> However I see and I understand your point of view. Coming
> > >>>>> from
> > >>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL world,
> > >>>>>>> `cache()`
> > >>>>>>>>> is
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might not
> > >>>>> only
> > >>>>>> be
> > >>>>>>>>> used
> > >>>>>>>>>> in
> > >>>>>>>>>>>>> interactive programming and not only in batching. But
> naming
> > >>>>>> is
> > >>>>>>>> one
> > >>>>>>>>>>> issue,
> > >>>>>>>>>>>>> and not that critical to me. Especially that once we
> > >>>>> implement
> > >>>>>>>>> proper
> > >>>>>>>>>>>>> materialised views, we can always deprecate/rename
> `cache()`
> > >>>>>> if
> > >>>>>>> we
> > >>>>>>>>>> deem
> > >>>>>>>>>>> so.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> For me the more important issue is of not having the `void
> > >>>>>>>> cache()`
> > >>>>>>>>>> with
> > >>>>>>>>>>>>> side effects. Exactly for the reasons that you have
> > >>>>> mentioned.
> > >>>>>>>> True:
> > >>>>>>>>>>>>> results might be non deterministic if underlying source
> > >>>>> table
> > >>>>>>> are
> > >>>>>>>>>>> changing.
> > >>>>>>>>>>>>> Problem is that `void cache()` implicitly changes the
> > >>>>> semantic
> > >>>>>>> of
> > >>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It can
> > >>>>> cause
> > >>>>>>>> “wtf”
> > >>>>>>>>>>> moment
> > >>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some place in
> > >>>>> his
> > >>>>>>>> code
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> suddenly some other random places are behaving differently.
> > >>>>> If
> > >>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle, we
> > >>>>> force
> > >>>>>>> user
> > >>>>>>>>> to
> > >>>>>>>>>>>>> explicitly use the cache which removes the “random” part
> > >>>>> from
> > >>>>>>> the
> > >>>>>>>>>>> "suddenly
> > >>>>>>>>>>>>> some other random places are behaving differently”.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> This argument and others that I’ve raised (greater
> > >>>>>>>>>> flexibility/allowing
> > >>>>>>>>>>>>> user to explicitly bypass the cache) are independent of
> > >>>>>>> `cache()`
> > >>>>>>>> vs
> > >>>>>>>>>>>>> `materialize()` discussion.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Does that mean one can also insert into the CachedTable?
> > >>>>> This
> > >>>>>>>>> sounds
> > >>>>>>>>>>>>> pretty confusing.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I don’t know, probably initially we should make CachedTable
> > >>>>>>>>>> read-only. I
> > >>>>>>>>>>>>> don’t find it more confusing than the fact that user can
> not
> > >>>>>>> write
> > >>>>>>>>> to
> > >>>>>>>>>>> views
> > >>>>>>>>>>>>> or materialised views in SQL or that user currently can not
> > >>>>>>> write
> > >>>>>>>>> to a
> > >>>>>>>>>>>>> Table.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui <xingcanc@xxxxxxxxx
> >
> > >>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I agree with @Becket that `cache()` and `materialize()`
> > >>>>>> should
> > >>>>>>> be
> > >>>>>>>>>>>>> considered as two different methods where the later one is
> > >>>>>> more
> > >>>>>>>>>>>>> sophisticated.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> According to my understanding, the initial idea is just to
> > >>>>>>>>> introduce
> > >>>>>>>>>> a
> > >>>>>>>>>>>>> simple cache or persist mechanism, but as the TableAPI is a
> > >>>>>>>>> high-level
> > >>>>>>>>>>> API,
> > >>>>>>>>>>>>> it’s naturally for as to think in a SQL way.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Maybe we can add the `cache()` method to the DataSet API
> > >>>>> and
> > >>>>>>>> force
> > >>>>>>>>>>> users
> > >>>>>>>>>>>>> to translate a Table to a Dataset before caching it. Then
> > >>>>> the
> > >>>>>>>> users
> > >>>>>>>>>>> should
> > >>>>>>>>>>>>> manually register the cached dataset to a table again (we
> > >>>>> may
> > >>>>>>> need
> > >>>>>>>>>> some
> > >>>>>>>>>>>>> table replacement mechanisms for datasets with an identical
> > >>>>>>> schema
> > >>>>>>>>> but
> > >>>>>>>>>>>>> different contents here). After all, it’s the dataset
> rather
> > >>>>>>> than
> > >>>>>>>>> the
> > >>>>>>>>>>>>> dynamic table that need to be cached, right?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>> Xingcan
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <
> > >>>>>>> becket.qin@xxxxxxxxx>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Piotrek and Jark,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are good
> > >>>>>>>> arguments.
> > >>>>>>>>>>> But I
> > >>>>>>>>>>>>>>> think those arguments are mostly about materialized view.
> > >>>>>> Let
> > >>>>>>> me
> > >>>>>>>>> try
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>> explain the reason I believe cache() and materialize()
> are
> > >>>>>>>>>> different.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I think cache() and materialize() have quite different
> > >>>>>>>>> implications.
> > >>>>>>>>>>> An
> > >>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When users
> > >>>>> call
> > >>>>>>>>> cache(),
> > >>>>>>>>>>> it
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>> just like they are saving an intermediate result as a
> > >>>>> draft
> > >>>>>> of
> > >>>>>>>>> their
> > >>>>>>>>>>>>> work,
> > >>>>>>>>>>>>>>> this intermediate result may not have any realistic
> > >>>>> meaning.
> > >>>>>>>>> Calling
> > >>>>>>>>>>>>>>> cache() does not mean users want to publish the cached
> > >>>>> table
> > >>>>>>> in
> > >>>>>>>>> any
> > >>>>>>>>>>>>> manner.
> > >>>>>>>>>>>>>>> But when users call materialize(), that means "I have
> > >>>>>>> something
> > >>>>>>>>>>>>> meaningful
> > >>>>>>>>>>>>>>> to be reused by others", now users need to think about
> the
> > >>>>>>>>>> validation,
> > >>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Piotrek's suggestions on variations of the materialize()
> > >>>>>>> methods
> > >>>>>>>>> are
> > >>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>> useful. It would be great if Flink have them. The concept
> > >>>>> of
> > >>>>>>>>>>>>> materialized
> > >>>>>>>>>>>>>>> view is actually a pretty big feature, not to say the
> > >>>>>> related
> > >>>>>>>>> stuff
> > >>>>>>>>>>> like
> > >>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the
> > >>>>>> materialized
> > >>>>>>>>> view
> > >>>>>>>>>>>>> itself
> > >>>>>>>>>>>>>>> should be discussed in a more thorough and systematic
> > >>>>>> manner.
> > >>>>>>>> And
> > >>>>>>>>> I
> > >>>>>>>>>>>>> found
> > >>>>>>>>>>>>>>> that discussion is kind of orthogonal and way beyond
> > >>>>>>> interactive
> > >>>>>>>>>>>>>>> programming experience.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The example you gave was interesting. I still have some
> > >>>>>>>> questions,
> > >>>>>>>>>>>>> though.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Table source = … // some source that scans files from a
> > >>>>>>>> directory
> > >>>>>>>>>>>>>>>> “/foo/bar/“
> > >>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> > >>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
> > >>>>> initialised)
> > >>>>>>>>>>>>>>>> int a1 = t1.count()
> > >>>>>>>>>>>>>>>> int b1 = t2.count()
> > >>>>>>>>>>>>>>>> // something in the background (or we trigger it) writes
> > >>>>>> new
> > >>>>>>>>> files
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>> /foo/bar
> > >>>>>>>>>>>>>>>> int a2 = t1.count()
> > >>>>>>>>>>>>>>>> int b2 = t2.count()
> > >>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to be
> > >>>>>>>> implemented
> > >>>>>>>>> in
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> initial version
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> what if someone else added some more files to /foo/bar at
> > >>>>>> this
> > >>>>>>>>>> point?
> > >>>>>>>>>>> In
> > >>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result become
> > >>>>>>>>>>>>> non-deterministic,
> > >>>>>>>>>>>>>>> right?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> int a3 = t1.count()
> > >>>>>>>>>>>>>>>> int b3 = t2.count()
> > >>>>>>>>>>>>>>>> t2.drop() // another possible future extension, manual
> > >>>>>>> “cache”
> > >>>>>>>>>>> dropping
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> When we talk about interactive programming, in most
> cases,
> > >>>>>> we
> > >>>>>>>> are
> > >>>>>>>>>>>>> talking
> > >>>>>>>>>>>>>>> about batch applications. A fundamental assumption of
> such
> > >>>>>>> case
> > >>>>>>>> is
> > >>>>>>>>>>> that
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> source data is complete before the data processing
> begins,
> > >>>>>> and
> > >>>>>>>> the
> > >>>>>>>>>>> data
> > >>>>>>>>>>>>>>> will not change during the data processing. IMO, if
> > >>>>>> additional
> > >>>>>>>>> rows
> > >>>>>>>>>>>>> needs
> > >>>>>>>>>>>>>>> to be added to some source during the processing, it
> > >>>>> should
> > >>>>>> be
> > >>>>>>>>> done
> > >>>>>>>>>> in
> > >>>>>>>>>>>>> ways
> > >>>>>>>>>>>>>>> like union the source with another table containing the
> > >>>>> rows
> > >>>>>>> to
> > >>>>>>>> be
> > >>>>>>>>>>>>> added.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> There are a few cases that computations are executed
> > >>>>>>> repeatedly
> > >>>>>>>> on
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>> changing data source.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> For example, people may run a ML training job every hour
> > >>>>>> with
> > >>>>>>>> the
> > >>>>>>>>>>>>> samples
> > >>>>>>>>>>>>>>> newly added in the past hour. In that case, the source
> > >>>>> data
> > >>>>>>>>> between
> > >>>>>>>>>>> will
> > >>>>>>>>>>>>>>> indeed change. But still, the data remain unchanged
> within
> > >>>>>> one
> > >>>>>>>>> run.
> > >>>>>>>>>>> And
> > >>>>>>>>>>>>>>> usually in that case, the result will need versioning,
> > >>>>> i.e.
> > >>>>>>> for
> > >>>>>>>> a
> > >>>>>>>>>>> given
> > >>>>>>>>>>>>>>> result, it tells that the result is a result from the
> > >>>>> source
> > >>>>>>>> data
> > >>>>>>>>>> by a
> > >>>>>>>>>>>>>>> certain timestamp.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Another example is something like data warehouse. In this
> > >>>>>>> case,
> > >>>>>>>>>> there
> > >>>>>>>>>>>>> are a
> > >>>>>>>>>>>>>>> few source of original/raw data. On top of those sources,
> > >>>>>> many
> > >>>>>>>>>>>>> materialized
> > >>>>>>>>>>>>>>> view / queries / reports / dashboards can be created to
> > >>>>>>> generate
> > >>>>>>>>>>> derived
> > >>>>>>>>>>>>>>> data. Those derived data needs to be updated when the
> > >>>>>>> underlying
> > >>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>> data changes. In that case, the processing logic that
> > >>>>>> derives
> > >>>>>>>> the
> > >>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>> data needs to be executed repeatedly to update those
> > >>>>>>>>> reports/views.
> > >>>>>>>>>>>>> Again,
> > >>>>>>>>>>>>>>> all those derived data also need to have version
> > >>>>> management,
> > >>>>>>>> such
> > >>>>>>>>> as
> > >>>>>>>>>>>>>>> timestamp.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> In any of the above two cases, during a single run of the
> > >>>>>>>>> processing
> > >>>>>>>>>>>>> logic,
> > >>>>>>>>>>>>>>> the data cannot change. Otherwise the behavior of the
> > >>>>>>> processing
> > >>>>>>>>>> logic
> > >>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>> be undefined. In the above two examples, when writing the
> > >>>>>>>>> processing
> > >>>>>>>>>>>>> logic,
> > >>>>>>>>>>>>>>> Users can use .cache() to hint Flink that those results
> > >>>>>> should
> > >>>>>>>> be
> > >>>>>>>>>>> saved
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> avoid repeated computation. And then for the result of my
> > >>>>>>>>>> application
> > >>>>>>>>>>>>>>> logic, I'll call materialize(), so that these results
> > >>>>> could
> > >>>>>> be
> > >>>>>>>>>> managed
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>> the system with versioning, metadata management,
> lifecycle
> > >>>>>>>>>> management,
> > >>>>>>>>>>>>>>> ACLs, etc.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> It is true we can use materialize() to do the cache()
> job,
> > >>>>>>> but I
> > >>>>>>>>> am
> > >>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>> reluctant to shoehorn cache() into materialize() and
> force
> > >>>>>>> users
> > >>>>>>>>> to
> > >>>>>>>>>>>>> worry
> > >>>>>>>>>>>>>>> about a bunch of implications that they needn't have to.
> I
> > >>>>>> am
> > >>>>>>>>>>>>> absolutely on
> > >>>>>>>>>>>>>>> your side that redundant API is bad. But it is equally
> > >>>>>>>>> frustrating,
> > >>>>>>>>>> if
> > >>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>> more, that the same API does different things.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <
> > >>>>>>>>> wshaoxuan@xxxxxxxxx
> > >>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks Piotrek,
> > >>>>>>>>>>>>>>>> You provided a very good example, it explains all the
> > >>>>>>>> confusions
> > >>>>>>>>> I
> > >>>>>>>>>>>>> have.
> > >>>>>>>>>>>>>>>> It is clear that there is something we have not
> > >>>>> considered
> > >>>>>> in
> > >>>>>>>> the
> > >>>>>>>>>>>>> initial
> > >>>>>>>>>>>>>>>> proposal. We intend to force the user to reuse the
> > >>>>>>>>>>> cached/materialized
> > >>>>>>>>>>>>>>>> table, if its cache() method is executed. We did not
> > >>>>> expect
> > >>>>>>>> that
> > >>>>>>>>>> user
> > >>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>> want to re-executed the plan from the source table. Let
> > >>>>> me
> > >>>>>>>>> re-think
> > >>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>> it and get back to you later.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> In the meanwhile, this example/observation also infers
> > >>>>> that
> > >>>>>>> we
> > >>>>>>>>>> cannot
> > >>>>>>>>>>>>> fully
> > >>>>>>>>>>>>>>>> involve the optimizer to decide the plan if a
> > >>>>>>> cache/materialize
> > >>>>>>>>> is
> > >>>>>>>>>>>>>>>> explicitly used, because weather to reuse the cache data
> > >>>>> or
> > >>>>>>>>>>> re-execute
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> query from source data may lead to different results.
> > >>>>> (But
> > >>>>>> I
> > >>>>>>>>> guess
> > >>>>>>>>>>>>>>>> optimizer can still help in some cases ---- as long as
> it
> > >>>>>>> does
> > >>>>>>>>> not
> > >>>>>>>>>>>>>>>> re-execute from the varied source, we should be safe).
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>> Shaoxuan
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski <
> > >>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Shaoxuan,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Re 2:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is
> > >>>>>> modified
> > >>>>>>>>> to->
> > >>>>>>>>>>> t1’
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> What do you mean that “ t1 is modified to-> t1’ ” ?
> That
> > >>>>>>>>>>>>>>>>> `methodThatAppliesOperators()` method has changed it’s
> > >>>>>> plan?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I was thinking more about something like this:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Table source = … // some source that scans files from a
> > >>>>>>>>> directory
> > >>>>>>>>>>>>>>>>> “/foo/bar/“
> > >>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> > >>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
> > >>>>>> initialised)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> int a1 = t1.count()
> > >>>>>>>>>>>>>>>>> int b1 = t2.count()
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> // something in the background (or we trigger it)
> writes
> > >>>>>> new
> > >>>>>>>>> files
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> /foo/bar
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> int a2 = t1.count()
> > >>>>>>>>>>>>>>>>> int b2 = t2.count()
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to be
> > >>>>>>>> implemented
> > >>>>>>>>>> in
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> initial version
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> int a3 = t1.count()
> > >>>>>>>>>>>>>>>>> int b3 = t2.count()
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> t2.drop() // another possible future extension, manual
> > >>>>>>> “cache”
> > >>>>>>>>>>>>> dropping
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> assertTrue(a1 == b1) // same results, but b1 comes from
> > >>>>>> the
> > >>>>>>>>>> “cache"
> > >>>>>>>>>>>>>>>>> assertTrue(b1 == b2) // both values come from the same
> > >>>>>> cache
> > >>>>>>>>>>>>>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2
> > >>>>> re-executed
> > >>>>>>>> full
> > >>>>>>>>>>> table
> > >>>>>>>>>>>>>>>> scan
> > >>>>>>>>>>>>>>>>> and has more data
> > >>>>>>>>>>>>>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache
> > >>>>>>>>>>>>>>>>> assertTrue(b3 == a2 == a3)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imjark@xxxxxxxxx>
> > >>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> It is an very interesting and useful design!
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Here I want to share some of my thoughts:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 1. Agree with that cache() method should return some
> > >>>>>> Table
> > >>>>>>> to
> > >>>>>>>>>> avoid
> > >>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>> unexpected problems because of the mutable object.
> > >>>>>>>>>>>>>>>>>> All the existing methods of Table are returning a new
> > >>>>>> Table
> > >>>>>>>>>>> instance.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 2. I think materialize() would be more consistent with
> > >>>>>> SQL,
> > >>>>>>>>> this
> > >>>>>>>>>>>>> makes
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>> possible to support the same feature for SQL
> > >>>>> (materialize
> > >>>>>>>> view)
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>>>> keep
> > >>>>>>>>>>>>>>>>>> the same API for users in the future.
> > >>>>>>>>>>>>>>>>>> But I'm also fine if we choose cache().
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 3. In the proposal, a TableService (or FlinkService?)
> > >>>>> is
> > >>>>>>> used
> > >>>>>>>>> to
> > >>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> result of the (intermediate) table.
> > >>>>>>>>>>>>>>>>>> But the name of TableService may be a bit general
> which
> > >>>>>> is
> > >>>>>>>> not
> > >>>>>>>>>>> quite
> > >>>>>>>>>>>>>>>>>> understanding correctly in the first glance (a
> > >>>>> metastore
> > >>>>>>> for
> > >>>>>>>>>>>>> tables?).
> > >>>>>>>>>>>>>>>>>> Maybe a more specific name would be better, such as
> > >>>>>>>>>>> TableCacheSerive
> > >>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>> TableMaterializeSerivce or something else.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <
> > >>>>>>>> fhueske@xxxxxxxxx
> > >>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks for the clarification Becket!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I have a few thoughts to share / questions:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 1) I'd like to know how you plan to implement the
> > >>>>>> feature
> > >>>>>>>> on a
> > >>>>>>>>>>> plan
> > >>>>>>>>>>>>> /
> > >>>>>>>>>>>>>>>>>>> planner level.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I would imaging the following to happen when
> > >>>>>> Table.cache()
> > >>>>>>>> is
> > >>>>>>>>>>>>> called:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 1) immediately optimize the Table and internally
> > >>>>> convert
> > >>>>>>> it
> > >>>>>>>>>> into a
> > >>>>>>>>>>>>>>>>>>> DataSet/DataStream. This is necessary, to avoid that
> > >>>>>>>> operators
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>>>>>> queries on top of the Table are pushed down.
> > >>>>>>>>>>>>>>>>>>> 2) register the DataSet/DataStream as a
> > >>>>>>>>>> DataSet/DataStream-backed
> > >>>>>>>>>>>>>>>> Table
> > >>>>>>>>>>>>>>>>> X
> > >>>>>>>>>>>>>>>>>>> 3) add a sink to the DataSet/DataStream. This is the
> > >>>>>>>>>>> materialization
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> Table X
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Based on your proposal the following would happen:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Table t1 = ....
> > >>>>>>>>>>>>>>>>>>> t1.cache(); // cache() returns void. The logical plan
> > >>>>> of
> > >>>>>>> t1
> > >>>>>>>> is
> > >>>>>>>>>>>>>>>> replaced
> > >>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>> a scan of X. There is also a reference to the
> > >>>>>>>> materialization
> > >>>>>>>>> of
> > >>>>>>>>>>> X.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> t1.count(); // this executes the program, including
> > >>>>> the
> > >>>>>>>>>>>>>>>>> DataSet/DataStream
> > >>>>>>>>>>>>>>>>>>> that backs X and the sink that writes the
> > >>>>>> materialization
> > >>>>>>>> of X
> > >>>>>>>>>>>>>>>>>>> t1.count(); // this executes the program, but reads X
> > >>>>>> from
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> materialization.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> My question is, how do you determine when whether the
> > >>>>>> scan
> > >>>>>>>> of
> > >>>>>>>>> t1
> > >>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>> go
> > >>>>>>>>>>>>>>>>>>> against the DataSet/DataStream program and when
> > >>>>> against
> > >>>>>>> the
> > >>>>>>>>>>>>>>>>>>> materialization?
> > >>>>>>>>>>>>>>>>>>> AFAIK, there is no hook that will tell you that a
> part
> > >>>>>> of
> > >>>>>>>> the
> > >>>>>>>>>>>>> program
> > >>>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>> executed. Flipping a switch during optimization or
> > >>>>> plan
> > >>>>>>>>>> generation
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>> sufficient as there is no guarantee that the plan is
> > >>>>>> also
> > >>>>>>>>>>> executed.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Overall, this behavior is somewhat similar to what I
> > >>>>>>>> proposed
> > >>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>> FLINK-8950, which does not include persisting the
> > >>>>> table,
> > >>>>>>> but
> > >>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>> optimizing and reregistering it as DataSet/DataStream
> > >>>>>>> scan.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 2) I think Piotr has a point about the implicit
> > >>>>> behavior
> > >>>>>>> and
> > >>>>>>>>>> side
> > >>>>>>>>>>>>>>>>> effects
> > >>>>>>>>>>>>>>>>>>> of the cache() method if it does not return anything.
> > >>>>>>>>>>>>>>>>>>> Consider the following example:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Table t1 = ???
> > >>>>>>>>>>>>>>>>>>> Table t2 = methodThatAppliesOperators(t1);
> > >>>>>>>>>>>>>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1);
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> In this case, the behavior/performance of the plan
> > >>>>> that
> > >>>>>>>>> results
> > >>>>>>>>>>> from
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> second method call depends on whether t1 was modified
> > >>>>> by
> > >>>>>>> the
> > >>>>>>>>>> first
> > >>>>>>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>>>>> or not.
> > >>>>>>>>>>>>>>>>>>> This is the classic issue of mutable vs. immutable
> > >>>>>>> objects.
> > >>>>>>>>>>>>>>>>>>> Also, as Piotr pointed out, it might also be good to
> > >>>>>> have
> > >>>>>>>> the
> > >>>>>>>>>>>>> original
> > >>>>>>>>>>>>>>>>> plan
> > >>>>>>>>>>>>>>>>>>> of t1, because in some cases it is possible to push
> > >>>>>>> filters
> > >>>>>>>>> down
> > >>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> evaluating the query from scratch might be more
> > >>>>>> efficient
> > >>>>>>>> than
> > >>>>>>>>>>>>>>>> accessing
> > >>>>>>>>>>>>>>>>>>> the cache.
> > >>>>>>>>>>>>>>>>>>> Moreover, a CachedTable could extend Table() and
> > >>>>> offer a
> > >>>>>>>>> method
> > >>>>>>>>>>>>>>>>> refresh().
> > >>>>>>>>>>>>>>>>>>> This sounds quite useful in an interactive session
> > >>>>> mode.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 3) Regarding the name, I can see both arguments. IMO,
> > >>>>>>>>>>> materialize()
> > >>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>> to be more future proof.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan
> > >>>>>> Wang <
> > >>>>>>>>>>>>>>>>>>> wshaoxuan@xxxxxxxxx>:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Piotr,
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks for sharing your ideas on the method naming.
> > >>>>> We
> > >>>>>>> will
> > >>>>>>>>>> think
> > >>>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>>> your suggestions. But I don't understand why we need
> > >>>>> to
> > >>>>>>>>> change
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> return
> > >>>>>>>>>>>>>>>>>>>> type of cache().
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Cache() is a physical operation, it does not change
> > >>>>> the
> > >>>>>>>> logic
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>> the `Table`. On the tableAPI layer, we should not
> > >>>>>>>> introduce a
> > >>>>>>>>>> new
> > >>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>> type unless the logic of table has been changed. If
> > >>>>> we
> > >>>>>>>>>> introduce
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>> table type `CachedTable`, we need create the same
> set
> > >>>>>> of
> > >>>>>>>>>> methods
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> `Table`
> > >>>>>>>>>>>>>>>>>>>> for it. I don't think it is worth doing this. Or can
> > >>>>>> you
> > >>>>>>>>> please
> > >>>>>>>>>>>>>>>>> elaborate
> > >>>>>>>>>>>>>>>>>>>> more on what could be the "implicit behaviours/side
> > >>>>>>>> effects"
> > >>>>>>>>>> you
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>> thinking about?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>> Shaoxuan
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski <
> > >>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi Becket,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Thanks for the response.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 1. I wasn’t saying that materialised view must be
> > >>>>>>> mutable
> > >>>>>>>> or
> > >>>>>>>>>>> not.
> > >>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>> thing applies to caches as well. To the contrary, I
> > >>>>>>> would
> > >>>>>>>>>> expect
> > >>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>> consistency and updates from something that is
> > >>>>> called
> > >>>>>>>>> “cache”
> > >>>>>>>>>> vs
> > >>>>>>>>>>>>>>>>>>>> something
> > >>>>>>>>>>>>>>>>>>>>> that’s a “materialised view”. In other words, IMO
> > >>>>> most
> > >>>>>>>>> caches
> > >>>>>>>>>> do
> > >>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> serve
> > >>>>>>>>>>>>>>>>>>>>> you invalid/outdated data and they handle updates
> on
> > >>>>>>> their
> > >>>>>>>>>> own.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 2. I don’t think that having in the future two very
> > >>>>>>>> similar
> > >>>>>>>>>>>>> concepts
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> `materialized` view and `cache` is a good idea. It
> > >>>>>> would
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>> confusing
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>> the users. I think it could be handled by
> > >>>>>>>>>> variations/overloading
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> materialised view concept. We could start with:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materialize()` - immutable,
> > >>>>> session
> > >>>>>>>> life
> > >>>>>>>>>>> scope
> > >>>>>>>>>>>>>>>>>>>>> (basically the same semantic as you are proposing
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> And then in the future (if ever) build on top of
> > >>>>>>>> that/expand
> > >>>>>>>>>> it
> > >>>>>>>>>>>>>>>> with:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or
> > >>>>>>>>>>>>> `MaterializedTable
> > >>>>>>>>>>>>>>>>>>>>> materialize(refreshHook=…)`
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Or with cross session support:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materializeInto(connector=…)` or
> > >>>>>>>>>>>>>>>> `MaterializedTable
> > >>>>>>>>>>>>>>>>>>>>> materializeInto(tableFactory=…)`
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I’m not saying that we should implement cross
> > >>>>>>>>>> session/refreshing
> > >>>>>>>>>>>>> now
> > >>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>> even in the near future. I’m just arguing that
> > >>>>> naming
> > >>>>>>>>> current
> > >>>>>>>>>>>>>>>>> immutable
> > >>>>>>>>>>>>>>>>>>>>> session life scope method `materialize()` is more
> > >>>>>> future
> > >>>>>>>>> proof
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>> consistent with SQL (on which after all table-api
> is
> > >>>>>>>> heavily
> > >>>>>>>>>>>>> basing
> > >>>>>>>>>>>>>>>>>>> on).
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 3. Even if we agree on naming it `cache()`, I would
> > >>>>>>> still
> > >>>>>>>>>> insist
> > >>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>> `cache()` returning `CachedTable` handle to avoid
> > >>>>>>> implicit
> > >>>>>>>>>>>>>>>>>>>> behaviours/side
> > >>>>>>>>>>>>>>>>>>>>> effects and to give both us & users more
> > >>>>> flexibility.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <
> > >>>>>>>> becket.qin@xxxxxxxxx
> > >>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Just to add a little bit, the materialized view is
> > >>>>>>>> probably
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> similar
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> the persistent() brought up earlier in the thread.
> > >>>>> So
> > >>>>>>> it
> > >>>>>>>> is
> > >>>>>>>>>>>>> usually
> > >>>>>>>>>>>>>>>>>>>> cross
> > >>>>>>>>>>>>>>>>>>>>>> session and could be used in a larger scope. For
> > >>>>>>>> example, a
> > >>>>>>>>>>>>>>>>>>>> materialized
> > >>>>>>>>>>>>>>>>>>>>>> view created by user A may be visible to user B.
> It
> > >>>>>> is
> > >>>>>>>>>> probably
> > >>>>>>>>>>>>>>>>>>>> something
> > >>>>>>>>>>>>>>>>>>>>>> we want to have in the future. I'll put it in the
> > >>>>>>> future
> > >>>>>>>>> work
> > >>>>>>>>>>>>>>>>>>> section.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <
> > >>>>>>>>>>> becket.qin@xxxxxxxxx
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Right now we are mostly thinking of the cached
> > >>>>> table
> > >>>>>>> as
> > >>>>>>>>>>>>>>>> immutable. I
> > >>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>> see the Materialized view would be useful in the
> > >>>>>>> future.
> > >>>>>>>>>> That
> > >>>>>>>>>>>>>>>> said,
> > >>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>> a simple cache mechanism is probably still
> needed.
> > >>>>>> So
> > >>>>>>> to
> > >>>>>>>>> me,
> > >>>>>>>>>>>>>>>> cache()
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>> materialize() should be two separate method as
> > >>>>> they
> > >>>>>>>>> address
> > >>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>> needs. Materialize() is a higher level concept
> > >>>>>> usually
> > >>>>>>>>>>> implying
> > >>>>>>>>>>>>>>>>>>>>> periodical
> > >>>>>>>>>>>>>>>>>>>>>>> update, while cache() has much simpler semantic.
> > >>>>> For
> > >>>>>>>>>> example,
> > >>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>>>>>>> create a materialized view and use cache() method
> > >>>>> in
> > >>>>>>> the
> > >>>>>>>>>>>>>>>>>>> materialized
> > >>>>>>>>>>>>>>>>>>>>> view
> > >>>>>>>>>>>>>>>>>>>>>>> creation logic. So that during the materialized
> > >>>>> view
> > >>>>>>>>> update,
> > >>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>> need to worry about the case that the cached
> table
> > >>>>>> is
> > >>>>>>>> also
> > >>>>>>>>>>>>>>>> changed.
> > >>>>>>>>>>>>>>>>>>>>> Maybe
> > >>>>>>>>>>>>>>>>>>>>>>> under the hood, materialized() and cache() could
> > >>>>>> share
> > >>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>> mechanism,
> > >>>>>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>>> I think a simple cache() method would be handy in
> > >>>>> a
> > >>>>>>> lot
> > >>>>>>>> of
> > >>>>>>>>>>>>> cases.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski <
> > >>>>>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Becket,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Is there any extra thing user can do on a
> > >>>>>>>>>> MaterializedTable
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>>>>>>>>> cannot do on a Table?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Maybe not in the initial implementation, but
> > >>>>>> various
> > >>>>>>>> DBs
> > >>>>>>>>>>> offer
> > >>>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>> ways to “refresh” the materialised view. Hooks,
> > >>>>>>>> triggers,
> > >>>>>>>>>>>>> timers,
> > >>>>>>>>>>>>>>>>>>>>> manually
> > >>>>>>>>>>>>>>>>>>>>>>>> etc. Having `MaterializedTable` would help us to
> > >>>>>>> handle
> > >>>>>>>>>> that
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> future.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> After users call *table.cache(), *users can
> just
> > >>>>>> use
> > >>>>>>>>> that
> > >>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>>>>>>> anything that is supported on a Table, including
> > >>>>>> SQL.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> This is some implicit behaviour with side
> > >>>>> effects.
> > >>>>>>>>> Imagine
> > >>>>>>>>>> if
> > >>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>> has
> > >>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>> long and complicated program, that touches table
> > >>>>>> `b`
> > >>>>>>>>>> multiple
> > >>>>>>>>>>>>>>>>>>> times,
> > >>>>>>>>>>>>>>>>>>>>> maybe
> > >>>>>>>>>>>>>>>>>>>>>>>> scattered around different methods. If he
> > >>>>> modifies
> > >>>>>>> his
> > >>>>>>>>>>> program
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> inserting
> > >>>>>>>>>>>>>>>>>>>>>>>> in one place
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> b.cache()
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> This implicitly alters the semantic and
> behaviour
> > >>>>>> of
> > >>>>>>>> his
> > >>>>>>>>>> code
> > >>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>>> over
> > >>>>>>>>>>>>>>>>>>>>>>>> the place, maybe in a ways that might cause
> > >>>>>> problems.
> > >>>>>>>> For
> > >>>>>>>>>>>>> example
> > >>>>>>>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>>> underlying data is changing?
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Having invisible side effects is also not very
> > >>>>>> clean,
> > >>>>>>>> for
> > >>>>>>>>>>>>> example
> > >>>>>>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>> about something like this (but more
> complicated):
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Table b = ...;
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> If (some_condition) {
> > >>>>>>>>>>>>>>>>>>>>>>>> processTable1(b)
> > >>>>>>>>>>>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>>>>>>>>>> else {
> > >>>>>>>>>>>>>>>>>>>>>>>> processTable2(b)
> > >>>>>>>>>>>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> // do more stuff with b
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> And user adds `b.cache()` call to only one of
> the
> > >>>>>>>>>>>>> `processTable1`
> > >>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>>> `processTable2` methods.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On the other hand
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Table materialisedB = b.materialize()
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Avoids (at least some of) the side effect issues
> > >>>>>> and
> > >>>>>>>>> forces
> > >>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> explicitly use `materialisedB` where it’s
> > >>>>>> appropriate
> > >>>>>>>> and
> > >>>>>>>>>>>>> forces
> > >>>>>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> think what does it actually mean. And if
> > >>>>> something
> > >>>>>>>>> doesn’t
> > >>>>>>>>>>> work
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> end
> > >>>>>>>>>>>>>>>>>>>>>>>> for the user, he will know what has he changed
> > >>>>>>> instead
> > >>>>>>>> of
> > >>>>>>>>>>>>> blaming
> > >>>>>>>>>>>>>>>>>>>>> Flink for
> > >>>>>>>>>>>>>>>>>>>>>>>> some “magic” underneath. In the above example,
> > >>>>>> after
> > >>>>>>>>>>>>>>>> materialising
> > >>>>>>>>>>>>>>>>>>> b
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>> only one of the methods, he should/would realise
> > >>>>>>> about
> > >>>>>>>>> the
> > >>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>> when
> > >>>>>>>>>>>>>>>>>>>>>>>> handling the return value `MaterializedTable` of
> > >>>>>> that
> > >>>>>>>>>> method.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I guess it comes down to personal preferences if
> > >>>>>> you
> > >>>>>>>> like
> > >>>>>>>>>>>>> things
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>> implicit or not. The more power is the user,
> > >>>>>> probably
> > >>>>>>>> the
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>> likely
> > >>>>>>>>>>>>>>>>>>>>> he is
> > >>>>>>>>>>>>>>>>>>>>>>>> to like/understand implicit behaviour. And we as
> > >>>>>>> Table
> > >>>>>>>>> API
> > >>>>>>>>>>>>>>>>>>> designers
> > >>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>> the most power users out there, so I would
> > >>>>> proceed
> > >>>>>>> with
> > >>>>>>>>>>> caution
> > >>>>>>>>>>>>>>>> (so
> > >>>>>>>>>>>>>>>>>>>>> that we
> > >>>>>>>>>>>>>>>>>>>>>>>> do not end up in the crazy perl realm with it’s
> > >>>>>>> lovely
> > >>>>>>>>>>> implicit
> > >>>>>>>>>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>>>>>>>>>> arguments ;)  <
> > >>>>>>>>>> https://stackoverflow.com/a/14922656/8149051
> > >>>>>>>>>>>> )
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Table API to also support non-relational
> > >>>>>> processing
> > >>>>>>>>> cases,
> > >>>>>>>>>>>>>>>> cache()
> > >>>>>>>>>>>>>>>>>>>>>>>> might be slightly better.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> I think even such extended Table API could
> > >>>>> benefit
> > >>>>>>> from
> > >>>>>>>>>>>>> sticking
> > >>>>>>>>>>>>>>>>>>>>> to/being
> > >>>>>>>>>>>>>>>>>>>>>>>> consistent with SQL where both SQL and Table API
> > >>>>>> are
> > >>>>>>>>>>> basically
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> same.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> One more thing. `MaterializedTable
> materialize()`
> > >>>>>>> could
> > >>>>>>>>> be
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>> powerful/flexible allowing the user to operate
> > >>>>> both
> > >>>>>>> on
> > >>>>>>>>>>>>>>>> materialised
> > >>>>>>>>>>>>>>>>>>>>> and not
> > >>>>>>>>>>>>>>>>>>>>>>>> materialised view at the same time for whatever
> > >>>>>>> reasons
> > >>>>>>>>>>>>>>>> (underlying
> > >>>>>>>>>>>>>>>>>>>>> data
> > >>>>>>>>>>>>>>>>>>>>>>>> changing/better optimisation opportunities after
> > >>>>>>>> pushing
> > >>>>>>>>>> down
> > >>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>> filters
> > >>>>>>>>>>>>>>>>>>>>>>>> etc). For example:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Table b = …;
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> MaterlizedTable mb = b.materialize();
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Val min = mb.min();
> > >>>>>>>>>>>>>>>>>>>>>>>> Val max = mb.max();
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42);
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Could be more efficient compared to `b.cache()`
> > >>>>> if
> > >>>>>>>>>>>>>>>> `filter(‘userId
> > >>>>>>>>>>>>>>>>>>> =
> > >>>>>>>>>>>>>>>>>>>>>>>> 42);` allows for much more aggressive
> > >>>>>> optimisations.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <
> > >>>>>>>>>> fhueske@xxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I'm not suggesting to add support for Ignite.
> > >>>>> This
> > >>>>>>> was
> > >>>>>>>>>> just
> > >>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>> example.
> > >>>>>>>>>>>>>>>>>>>>>>>>> Plasma and Arrow sound interesting, too.
> > >>>>>>>>>>>>>>>>>>>>>>>>> For the sake of this proposal, it would be up
> to
> > >>>>>> the
> > >>>>>>>>> user
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> implement a
> > >>>>>>>>>>>>>>>>>>>>>>>>> TableFactory and corresponding TableSource /
> > >>>>>>> TableSink
> > >>>>>>>>>>> classes
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> persist
> > >>>>>>>>>>>>>>>>>>>>>>>>> and read the data.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb
> > >>>>> Flavio
> > >>>>>>>>>>> Pompermaier
> > >>>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>> pompermaier@xxxxxxxx>:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow
> as
> > >>>>>> an
> > >>>>>>>>>>>>> alternative
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> Apache
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Ignite?
> > >>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>
> > https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske
> > >>>>> <
> > >>>>>>>>>>>>>>>>>>> fhueske@xxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> To summarize, you propose a new method
> > >>>>>>>> Table.cache():
> > >>>>>>>>>>> Table
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> trigger a job and write the result into some
> > >>>>>>>> temporary
> > >>>>>>>>>>>>> storage
> > >>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>> defined
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> by a TableFactory.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> The cache() call blocks while the job is
> > >>>>> running
> > >>>>>>> and
> > >>>>>>>>>>>>>>>> eventually
> > >>>>>>>>>>>>>>>>>>>>>>>> returns a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Table object that represents a scan of the
> > >>>>>>> temporary
> > >>>>>>>>>>> table.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> When the "session" is closed (closing to be
> > >>>>>>>> defined?),
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> temporary
> > >>>>>>>>>>>>>>>>>>>>>>>>>> tables
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> are all dropped.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think this behavior makes sense and is a
> > >>>>> good
> > >>>>>>>> first
> > >>>>>>>>>> step
> > >>>>>>>>>>>>>>>>>>> towards
> > >>>>>>>>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> interactive workloads.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, its performance suffers from writing
> > >>>>> to
> > >>>>>>> and
> > >>>>>>>>>>> reading
> > >>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>>>> external
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> systems.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think this is OK for now. Changes that
> would
> > >>>>>>>>>>> significantly
> > >>>>>>>>>>>>>>>>>>>> improve
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory
> across
> > >>>>>>> jobs)
> > >>>>>>>>>> would
> > >>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>> large
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> impacts on many components of Flink.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Users could use in-memory filesystems or
> > >>>>> storage
> > >>>>>>>> grids
> > >>>>>>>>>>>>> (Apache
> > >>>>>>>>>>>>>>>>>>>>>>>> Ignite) to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> mitigate some of the performance effects.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb
> > >>>>>> Becket
> > >>>>>>>> Qin
> > >>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> :
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there any extra thing user can do on a
> > >>>>>>>>>>> MaterializedTable
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> cannot do on a Table? After users call
> > >>>>>>>>> *table.cache(),
> > >>>>>>>>>>>>> *users
> > >>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> that table and do anything that is supported
> > >>>>>> on a
> > >>>>>>>>>> Table,
> > >>>>>>>>>>>>>>>>>>>> including
> > >>>>>>>>>>>>>>>>>>>>>>>> SQL.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Naming wise, either cache() or materialize()
> > >>>>>>> sounds
> > >>>>>>>>>> fine
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> me.
> > >>>>>>>>>>>>>>>>>>>>>>>> cache()
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> a bit more general than materialize(). Given
> > >>>>>> that
> > >>>>>>>> we
> > >>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>> enhancing
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Table API to also support non-relational
> > >>>>>>> processing
> > >>>>>>>>>>> cases,
> > >>>>>>>>>>>>>>>>>>>> cache()
> > >>>>>>>>>>>>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> slightly better.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr
> > >>>>>> Nowojski <
> > >>>>>>>>>>>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Becket,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend
> > >>>>> to
> > >>>>>>>> reuse
> > >>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I
> > >>>>>> assumed
> > >>>>>>>> that
> > >>>>>>>>>> you
> > >>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> provide
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternate way of writing the data.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Now that I hopefully understand the
> > >>>>> proposal,
> > >>>>>>>> maybe
> > >>>>>>>>> we
> > >>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>> rename
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> `cache()` to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> void materialize()
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or going step further
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MaterializedTable materialize()
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MaterializedTable createMaterializedView()
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> The second option with returning a handle I
> > >>>>>>> think
> > >>>>>>>> is
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> flexible
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could provide features such as
> > >>>>>>> “refresh”/“delete”
> > >>>>>>>> or
> > >>>>>>>>>>>>>>>> generally
> > >>>>>>>>>>>>>>>>>>>>>>>>>> speaking
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> manage the the view. In the future we could
> > >>>>>> also
> > >>>>>>>>> think
> > >>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>>>> adding
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> hooks
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is
> > >>>>> also
> > >>>>>>> more
> > >>>>>>>>>>>>> explicit
> > >>>>>>>>>>>>>>>> -
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialization returning a new table
> handle
> > >>>>>>> will
> > >>>>>>>>> not
> > >>>>>>>>>>> have
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implicit side effects as adding a simple
> > >>>>> line
> > >>>>>> of
> > >>>>>>>>> code
> > >>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>>>> `b.cache()`
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would have.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would also be more SQL like, making it
> > >>>>> more
> > >>>>>>>>>> intuitive
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> users
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> familiar with the SQL.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin <
> > >>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is
> > >>>>>>>>> equivalent
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> creating
> > >>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> BUILT-IN
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That
> > >>>>>>>>>> functionality
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> missing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> today,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> though. Not sure if I understand your
> > >>>>>> question.
> > >>>>>>>> Do
> > >>>>>>>>>> you
> > >>>>>>>>>>>>> mean
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> already
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the functionality and just need a syntax
> > >>>>>> sugar?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's more interesting in the proposal is
> > >>>>> do
> > >>>>>>> we
> > >>>>>>>>> want
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> stop
> > >>>>>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> creating
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to
> > >>>>>> extend
> > >>>>>>>> that
> > >>>>>>>>>> in
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> future
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful unified data store distributed with
> > >>>>>>> Flink?
> > >>>>>>>>> And
> > >>>>>>>>>>> do
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job
> > >>>>>> pattern
> > >>>>>>>> with
> > >>>>>>>>>>> their
> > >>>>>>>>>>>>>>>> own
> > >>>>>>>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> defined
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> services. These considerations are much
> > >>>>> more
> > >>>>>>>>>>>>> architectural.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr
> > >>>>>> Nowojski
> > >>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to
> understand
> > >>>>>> the
> > >>>>>>>>>>> problem.
> > >>>>>>>>>>>>>>>>>>> Isn’t
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing
> > >>>>> data
> > >>>>>>> to
> > >>>>>>>> a
> > >>>>>>>>>> sink
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> reading
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited
> > >>>>> live
> > >>>>>>>>>> scope/live
> > >>>>>>>>>>>>>>>> time?
> > >>>>>>>>>>>>>>>>>>>> And
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a
> > >>>>> file
> > >>>>>>>> sink?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a
> > >>>>>>>>>> materialised
> > >>>>>>>>>>>>>>>> view
> > >>>>>>>>>>>>>>>>>>>>> from a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and
> > >>>>>> reusing
> > >>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>> materialised
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> view
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to
> > >>>>>>> clean
> > >>>>>>>> up
> > >>>>>>>>>>>>>>>>>>>> materialised
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> views
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example when current session finishes)?
> > >>>>>> Maybe
> > >>>>>>> we
> > >>>>>>>>>> need
> > >>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>>>>>> syntactic
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sugar
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on top of it?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin <
> > >>>>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a
> > >>>>>>> persist()
> > >>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> lifecycle/defined
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the
> > >>>>> future
> > >>>>>>>> work
> > >>>>>>>>>> for
> > >>>>>>>>>>>>>>>> this.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng
> > >>>>>> sun
> > >>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the
> > >>>>>> name
> > >>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> `cache()`, I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you designed this way!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify
> a
> > >>>>>>>>> lifecycle
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>> data
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> persistence?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, persist
> > >>>>> (LifeCycle.SESSION),
> > >>>>>> so
> > >>>>>>>>> that
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worried
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly
> > >>>>> specify
> > >>>>>>> the
> > >>>>>>>>> time
> > >>>>>>>>>>>>> range
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> keeping
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand,
> > >>>>> we
> > >>>>>>> can
> > >>>>>>>>>> also
> > >>>>>>>>>>>>>>>> share
> > >>>>>>>>>>>>>>>>>>>> in a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> certain
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> group of session, for example:
> > >>>>>>>>>>>>>>>>>>>> LifeCycle.SESSION_GROUP(...), I
> > >>>>>>>>>>>>>>>>>>>>>>>>>> am
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for
> > >>>>> reference
> > >>>>>>>> only!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx>
> > >>>>>>>>> 于2018年11月23日周五
> > >>>>>>>>>>>>>>>>>>> 下午1:33写道:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding
> > >>>>>> cache()
> > >>>>>>>> v.s.
> > >>>>>>>>>>>>>>>>>>> persist(),
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> personally I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately
> > >>>>>>> describing
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> behavior,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> i.e.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be
> > >>>>>>>> deleted
> > >>>>>>>>>>> after
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> session
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> closed.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as
> > >>>>>>> people
> > >>>>>>>>>> might
> > >>>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still be there even after the session
> > >>>>> is
> > >>>>>>>> gone.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and
> > >>>>>>> stream
> > >>>>>>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that
> > >>>>>>> goal.
> > >>>>>>>> I
> > >>>>>>>>>>>>> imagine
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change across the board, including
> > >>>>>> sources,
> > >>>>>>>>>>> operators
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several
> > >>>>>>>> separate
> > >>>>>>>>>>>>>>>> in-depth
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM
> Xingcan
> > >>>>>>> Cui <
> > >>>>>>>>>>>>>>>>>>>>>>>>>> xingcanc@xxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or
> > >>>>>> access
> > >>>>>>>>>> domain
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>> both
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this
> > >>>>> may
> > >>>>>>> be
> > >>>>>>>>> the
> > >>>>>>>>>>>>> first
> > >>>>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> plan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism
> > >>>>>> other
> > >>>>>>>> than
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> state.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Maybe
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> it’s
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then
> > >>>>>>>>> concentrate
> > >>>>>>>>>>> on
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>> specific
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> part?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more
> > >>>>>> concerned
> > >>>>>>>>> with
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> underlying
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> service.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change
> > >>>>> to
> > >>>>>>> the
> > >>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>>>>>>>>> codebase.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> As
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be
> > >>>>>> extendible
> > >>>>>>> to
> > >>>>>>>>>>> support
> > >>>>>>>>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> components
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another
> > >>>>>>> thread.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the
> > >>>>>> more
> > >>>>>>>>>>>>> interactive
> > >>>>>>>>>>>>>>>>>>>> Table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> API,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough
> > >>>>> service
> > >>>>>>>>>>> mechanism.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xingcan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM,
> Xiaowei
> > >>>>>>>> Jiang <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> xiaoweij@xxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp
> > >>>>>> table
> > >>>>>>>> for
> > >>>>>>>>>>> clean
> > >>>>>>>>>>>>> up
> > >>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reliable.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will
> be
> > >>>>>>>>> executed
> > >>>>>>>>>>>>>>>>>>>>> successfully.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> We
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> risk
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think
> that
> > >>>>>>> it's
> > >>>>>>>>>> safer
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> association
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id.
> So
> > >>>>>> we
> > >>>>>>>> can
> > >>>>>>>>>>> always
> > >>>>>>>>>>>>>>>>>>> clean
> > >>>>>>>>>>>>>>>>>>>>> up
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> temp
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tables
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with
> > >>>>> any
> > >>>>>>>>> active
> > >>>>>>>>>>>>>>>>>>> sessions.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM
> > >>>>>> jincheng
> > >>>>>>>>> sun <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great
> > >>>>>>> proposal!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very
> > >>>>> useful
> > >>>>>>> and
> > >>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>> friendly
> > >>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> examples.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a
> business
> > >>>>>> has
> > >>>>>>>> to
> > >>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> executed
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> several
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stages
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the
> > >>>>> pipeline
> > >>>>>>> of
> > >>>>>>>>>> Flink
> > >>>>>>>>>>>>> ML,
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> order
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> utilize
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we
> > >>>>>> have
> > >>>>>>>> to
> > >>>>>>>>>>>>> submit a
> > >>>>>>>>>>>>>>>>>>> job
> > >>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> env.execute().
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()`  , I think is
> > >>>>>> better
> > >>>>>>>> to
> > >>>>>>>>>>> named
> > >>>>>>>>>>>>>>>>>>>>>>>>>> `persist()`,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> And
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether
> > >>>>> we
> > >>>>>>>>>> internally
> > >>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> memory
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> persist
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save
> the
> > >>>>>>> data
> > >>>>>>>>> into
> > >>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>> backend
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or
> > >>>>>>> RocksDBStateBackend
> > >>>>>>>>>> etc.)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in
> > >>>>> the
> > >>>>>>>>> future,
> > >>>>>>>>>>>>>>>> support
> > >>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same
> job
> > >>>>>>> will
> > >>>>>>>>> also
> > >>>>>>>>>>>>>>>> benefit
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "Interactive
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Programming",  I am looking forward
> > >>>>> to
> > >>>>>>>> your
> > >>>>>>>>>>> JIRAs
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>> FLIP!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx>
> > >>>>>>>>>>> 于2018年11月20日周二
> > >>>>>>>>>>>>>>>>>>>>> 下午9:56写道:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have
> > >>>>>>>> pointed
> > >>>>>>>>>> out,
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>> is a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> promising
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table
> > >>>>>> API
> > >>>>>>> in
> > >>>>>>>>>>> various
> > >>>>>>>>>>>>>>>>>>>>> aspects,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> including
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use
> among
> > >>>>>>>> others.
> > >>>>>>>>>> One
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> scenarios
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is
> > >>>>>> interactive
> > >>>>>>>>>>>>>>>> programming.
> > >>>>>>>>>>>>>>>>>>> To
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> explain
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on
> > >>>>> the
> > >>>>>>>>>> solution,
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> put
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> together
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our
> > >>>>> proposal.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very
> > >>>>>> welcome!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>
> > >>>
> >
> >
>