osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Support Interactive Programming in Flink Table API


It's true that b, c, d and e will all read from the original DAG that
generates a. But all subsequent operators (when running multiple queries)
which reference cachedTableA should not need to reproduce `a` but directly
consume the intermediate result.

Conceptually one could think of cache() as introducing a caching operator
from which you need to consume from if you want to benefit from the caching
functionality.

I agree, ideally the optimizer makes this kind of decision which
intermediate result should be cached. But especially when executing ad-hoc
queries the user might better know which results need to be cached because
Flink might not see the full DAG. In that sense, I would consider the
cache() method as a hint for the optimizer. Of course, in the future we
might add functionality which tries to automatically cache results (e.g.
caching the latest intermediate results until so and so much space is
used). But this should hopefully not contradict with `CachedTable cache()`.

Cheers,
Till

On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <becket.qin@xxxxxxxxx> wrote:

> Hi Till,
>
> Thanks for the clarification. I am still a little confused.
>
> If cache() returns a CachedTable, the example might become:
>
> b = a.map(...)
> c = a.map(...)
>
> cachedTableA = a.cache()
> d = cachedTableA.map(...)
> e = a.map()
>
> In the above case, if cache() is lazily evaluated, b, c, d and e are all
> going to be reading from the original DAG that generates a. But with a
> naive expectation, d should be reading from the cache. This seems not
> solving the potential confusion you raised, right?
>
> Just to be clear, my understanding are all based on the assumption that the
> tables are immutable. Therefore, after a.cache(), a the c*achedTableA* and
> original table *a * should be completely interchangeable.
>
> That said, I think a valid argument is optimization. There are indeed cases
> that reading from the original DAG could be faster than reading from the
> cache. For example, in the following example:
>
> a.filter(f1' > 100)
> a.cache()
> b = a.filter(f1' < 100)
>
> Ideally the optimizer should be intelligent enough to decide which way is
> faster, without user intervention. In this case, it will identify that b
> would just be an empty table, thus skip reading from the cache completely.
> But I agree that returning a CachedTable would give user the control of
> when to use cache, even though I still feel that letting the optimizer
> handle this is a better option in long run.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:
>
> > Yes you are right Becket that it still depends on the actual execution of
> > the job whether a consumer reads from a cached result or not.
> >
> > My point was actually about the properties of a (cached vs. non-cached)
> and
> > not about the execution. I would not make cache trigger the execution of
> > the job because one loses some flexibility by eagerly triggering the
> > execution.
> >
> > I tried to argue for an explicit CachedTable which is returned by the
> > cache() method like Piotr did in order to make the API more explicit.
> >
> > Cheers,
> > Till
> >
> > On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <becket.qin@xxxxxxxxx> wrote:
> >
> > > Hi Till,
> > >
> > > That is a good example. Just a minor correction, in this case, b, c
> and d
> > > will all consume from a non-cached a. This is because cache will only
> be
> > > created on the very first job submission that generates the table to be
> > > cached.
> > >
> > > If I understand correctly, this is example is about whether .cache()
> > method
> > > should be eagerly evaluated or lazily evaluated. In another word, if
> > > cache() method actually triggers a job that creates the cache, there
> will
> > > be no such confusion. Is that right?
> > >
> > > In the example, although d will not consume from the cached Table while
> > it
> > > looks supposed to, from correctness perspective the code will still
> > return
> > > correct result, assuming that tables are immutable.
> > >
> > > Personally I feel it is OK because users probably won't really worry
> > about
> > > whether the table is cached or not. And lazy cache could avoid some
> > > unnecessary caching if a cached table is never created in the user
> > > application. But I am not opposed to do eager evaluation of cache.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <trohrmann@xxxxxxxxxx>
> > > wrote:
> > >
> > > > Another argument for Piotr's point is that lazily changing properties
> > of
> > > a
> > > > node affects all down stream consumers but does not necessarily have
> to
> > > > happen before these consumers are defined. From a user's perspective
> > this
> > > > can be quite confusing:
> > > >
> > > > b = a.map(...)
> > > > c = a.map(...)
> > > >
> > > > a.cache()
> > > > d = a.map(...)
> > > >
> > > > now b, c and d will consume from a cached operator. In this case, the
> > > user
> > > > would most likely expect that only d reads from a cached result.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <
> > piotr@xxxxxxxxxxxxxxxxx>
> > > > wrote:
> > > >
> > > > > Hey Shaoxuan and Becket,
> > > > >
> > > > > > Can you explain a bit more one what are the side effects? So far
> my
> > > > > > understanding is that such side effects only exist if a table is
> > > > mutable.
> > > > > > Is that the case?
> > > > >
> > > > > Not only that. There are also performance implications and those
> are
> > > > > another implicit side effects of using `void cache()`. As I wrote
> > > before,
> > > > > reading from cache might not always be desirable, thus it can cause
> > > > > performance degradation and I’m fine with that - user's or
> > optimiser’s
> > > > > choice. What I do not like is that this implicit side effect can
> > > manifest
> > > > > in completely different part of code, that wasn’t touched by a user
> > > while
> > > > > he was adding `void cache()` call somewhere else. And even if
> caching
> > > > > improves performance, it’s still a side effect of `void cache()`.
> > > Almost
> > > > > from the definition `void` methods have only side effects. As I
> wrote
> > > > > before, there are couple of scenarios where this might be
> undesirable
> > > > > and/or unexpected, for example:
> > > > >
> > > > > 1.
> > > > > Table b = …;
> > > > > b.cache()
> > > > > x = b.join(…)
> > > > > y = b.count()
> > > > > // ...
> > > > > // 100
> > > > > // hundred
> > > > > // lines
> > > > > // of
> > > > > // code
> > > > > // later
> > > > > z = b.filter(…).groupBy(…) // this might be even hidden in a
> > different
> > > > > method/file/package/dependency
> > > > >
> > > > > 2.
> > > > >
> > > > > Table b = ...
> > > > > If (some_condition) {
> > > > >   foo(b)
> > > > > }
> > > > > Else {
> > > > >   bar(b)
> > > > > }
> > > > > z = b.filter(…).groupBy(…)
> > > > >
> > > > >
> > > > > Void foo(Table b) {
> > > > >   b.cache()
> > > > >   // do something with b
> > > > > }
> > > > >
> > > > > In both above examples, `b.cache()` will implicitly affect
> (semantic
> > > of a
> > > > > program in case of sources being mutable and performance) `z =
> > > > > b.filter(…).groupBy(…)` which might be far from obvious.
> > > > >
> > > > > On top of that, there is still this argument of mine that having a
> > > > > `MaterializedTable` or `CachedTable` handle is more flexible for us
> > for
> > > > the
> > > > > future and for the user (as a manual option to bypass cache reads).
> > > > >
> > > > > >  But Jiangjie is correct,
> > > > > > the source table in batching should be immutable. It is the
> user’s
> > > > > > responsibility to ensure it, otherwise even a regular failover
> may
> > > lead
> > > > > > to inconsistent results.
> > > > >
> > > > > Yes, I agree that’s what perfect world/good deployment should be.
> But
> > > its
> > > > > often isn’t and while I’m not trying to fix this (since the proper
> > fix
> > > is
> > > > > to support transactions), I’m just trying to minimise confusion for
> > the
> > > > > users that are not fully aware what’s going on and operate in less
> > then
> > > > > perfect setup. And if something bites them after adding `b.cache()`
> > > call,
> > > > > to make sure that they at least know all of the places that adding
> > this
> > > > > line can affect.
> > > > >
> > > > > Thanks, Piotrek
> > > > >
> > > > > > On 1 Dec 2018, at 15:39, Becket Qin <becket.qin@xxxxxxxxx>
> wrote:
> > > > > >
> > > > > > Hi Piotrek,
> > > > > >
> > > > > > Thanks again for the clarification. Some more replies are
> > following.
> > > > > >
> > > > > > But keep in mind that `.cache()` will/might not only be used in
> > > > > interactive
> > > > > >> programming and not only in batching.
> > > > > >
> > > > > > It is true. Actually in stream processing, cache() has the same
> > > > semantic
> > > > > as
> > > > > > batch processing. The semantic is following:
> > > > > > For a table created via a series of computation, save that table
> > for
> > > > > later
> > > > > > reference to avoid running the computation logic to regenerate
> the
> > > > table.
> > > > > > Once the application exits, drop all the cache.
> > > > > > This semantic is same for both batch and stream processing. The
> > > > > difference
> > > > > > is that stream applications will only run once as they are long
> > > > running.
> > > > > > And the batch applications may be run multiple times, hence the
> > cache
> > > > may
> > > > > > be created and dropped each time the application runs.
> > > > > > Admittedly, there will probably be some resource management
> > > > requirements
> > > > > > for the streaming cached table, such as time based / size based
> > > > > retention,
> > > > > > to address the infinite data issue. But such requirement does not
> > > > change
> > > > > > the semantic.
> > > > > > You are right that interactive programming is just one use case
> of
> > > > > cache().
> > > > > > It is not the only use case.
> > > > > >
> > > > > > For me the more important issue is of not having the `void
> cache()`
> > > > with
> > > > > >> side effects.
> > > > > >
> > > > > > This is indeed the key point. The argument around whether cache()
> > > > should
> > > > > > return something already indicates that cache() and materialize()
> > > > address
> > > > > > different issues.
> > > > > > Can you explain a bit more one what are the side effects? So far
> my
> > > > > > understanding is that such side effects only exist if a table is
> > > > mutable.
> > > > > > Is that the case?
> > > > > >
> > > > > > I don’t know, probably initially we should make CachedTable
> > > read-only.
> > > > I
> > > > > >> don’t find it more confusing than the fact that user can not
> write
> > > to
> > > > > views
> > > > > >> or materialised views in SQL or that user currently can not
> write
> > > to a
> > > > > >> Table.
> > > > > >
> > > > > > I don't think anyone should insert something to a cache. By
> > > definition
> > > > > the
> > > > > > cache should only be updated when the corresponding original
> table
> > is
> > > > > > updated. What I am wondering is that given the following two
> facts:
> > > > > > 1. If and only if a table is mutable (with something like
> > insert()),
> > > a
> > > > > > CachedTable may have implicit behavior.
> > > > > > 2. A CachedTable extends a Table.
> > > > > > We can come to the conclusion that a CachedTable is mutable and
> > users
> > > > can
> > > > > > insert into the CachedTable directly. This is where I thought
> > > > confusing.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <
> > > piotr@xxxxxxxxxxxxxxxxx
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> Regarding naming `cache()` vs `materialize()`. One more
> > explanation
> > > > why
> > > > > I
> > > > > >> think `materialize()` is more natural to me is that I think of
> all
> > > > > “Table”s
> > > > > >> in Table-API as views. They behave the same way as SQL views,
> the
> > > only
> > > > > >> difference for me is that their live scope is short - current
> > > session
> > > > > which
> > > > > >> is limited by different execution model. That’s why “cashing” a
> > view
> > > > > for me
> > > > > >> is just materialising it.
> > > > > >>
> > > > > >> However I see and I understand your point of view. Coming from
> > > > > >> DataSet/DataStream and generally speaking non-SQL world,
> `cache()`
> > > is
> > > > > more
> > > > > >> natural. But keep in mind that `.cache()` will/might not only be
> > > used
> > > > in
> > > > > >> interactive programming and not only in batching. But naming is
> > one
> > > > > issue,
> > > > > >> and not that critical to me. Especially that once we implement
> > > proper
> > > > > >> materialised views, we can always deprecate/rename `cache()` if
> we
> > > > deem
> > > > > so.
> > > > > >>
> > > > > >>
> > > > > >> For me the more important issue is of not having the `void
> > cache()`
> > > > with
> > > > > >> side effects. Exactly for the reasons that you have mentioned.
> > True:
> > > > > >> results might be non deterministic if underlying source table
> are
> > > > > changing.
> > > > > >> Problem is that `void cache()` implicitly changes the semantic
> of
> > > > > >> subsequent uses of the cached/materialized Table. It can cause
> > “wtf”
> > > > > moment
> > > > > >> for a user if he inserts “b.cache()” call in some place in his
> > code
> > > > and
> > > > > >> suddenly some other random places are behaving differently. If
> > > > > >> `materialize()` or `cache()` returns a Table handle, we force
> user
> > > to
> > > > > >> explicitly use the cache which removes the “random” part from
> the
> > > > > "suddenly
> > > > > >> some other random places are behaving differently”.
> > > > > >>
> > > > > >> This argument and others that I’ve raised (greater
> > > > flexibility/allowing
> > > > > >> user to explicitly bypass the cache) are independent of
> `cache()`
> > vs
> > > > > >> `materialize()` discussion.
> > > > > >>
> > > > > >>> Does that mean one can also insert into the CachedTable? This
> > > sounds
> > > > > >> pretty confusing.
> > > > > >>
> > > > > >> I don’t know, probably initially we should make CachedTable
> > > > read-only. I
> > > > > >> don’t find it more confusing than the fact that user can not
> write
> > > to
> > > > > views
> > > > > >> or materialised views in SQL or that user currently can not
> write
> > > to a
> > > > > >> Table.
> > > > > >>
> > > > > >> Piotrek
> > > > > >>
> > > > > >>> On 30 Nov 2018, at 17:38, Xingcan Cui <xingcanc@xxxxxxxxx>
> > wrote:
> > > > > >>>
> > > > > >>> Hi all,
> > > > > >>>
> > > > > >>> I agree with @Becket that `cache()` and `materialize()` should
> be
> > > > > >> considered as two different methods where the later one is more
> > > > > >> sophisticated.
> > > > > >>>
> > > > > >>> According to my understanding, the initial idea is just to
> > > introduce
> > > > a
> > > > > >> simple cache or persist mechanism, but as the TableAPI is a
> > > high-level
> > > > > API,
> > > > > >> it’s naturally for as to think in a SQL way.
> > > > > >>>
> > > > > >>> Maybe we can add the `cache()` method to the DataSet API and
> > force
> > > > > users
> > > > > >> to translate a Table to a Dataset before caching it. Then the
> > users
> > > > > should
> > > > > >> manually register the cached dataset to a table again (we may
> need
> > > > some
> > > > > >> table replacement mechanisms for datasets with an identical
> schema
> > > but
> > > > > >> different contents here). After all, it’s the dataset rather
> than
> > > the
> > > > > >> dynamic table that need to be cached, right?
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Xingcan
> > > > > >>>
> > > > > >>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <
> becket.qin@xxxxxxxxx>
> > > > > wrote:
> > > > > >>>>
> > > > > >>>> Hi Piotrek and Jark,
> > > > > >>>>
> > > > > >>>> Thanks for the feedback and explanation. Those are good
> > arguments.
> > > > > But I
> > > > > >>>> think those arguments are mostly about materialized view. Let
> me
> > > try
> > > > > to
> > > > > >>>> explain the reason I believe cache() and materialize() are
> > > > different.
> > > > > >>>>
> > > > > >>>> I think cache() and materialize() have quite different
> > > implications.
> > > > > An
> > > > > >>>> analogy I can think of is save()/publish(). When users call
> > > cache(),
> > > > > it
> > > > > >> is
> > > > > >>>> just like they are saving an intermediate result as a draft of
> > > their
> > > > > >> work,
> > > > > >>>> this intermediate result may not have any realistic meaning.
> > > Calling
> > > > > >>>> cache() does not mean users want to publish the cached table
> in
> > > any
> > > > > >> manner.
> > > > > >>>> But when users call materialize(), that means "I have
> something
> > > > > >> meaningful
> > > > > >>>> to be reused by others", now users need to think about the
> > > > validation,
> > > > > >>>> update & versioning, lifecycle of the result, etc.
> > > > > >>>>
> > > > > >>>> Piotrek's suggestions on variations of the materialize()
> methods
> > > are
> > > > > >> very
> > > > > >>>> useful. It would be great if Flink have them. The concept of
> > > > > >> materialized
> > > > > >>>> view is actually a pretty big feature, not to say the related
> > > stuff
> > > > > like
> > > > > >>>> triggers/hooks you mentioned earlier. I think the materialized
> > > view
> > > > > >> itself
> > > > > >>>> should be discussed in a more thorough and systematic manner.
> > And
> > > I
> > > > > >> found
> > > > > >>>> that discussion is kind of orthogonal and way beyond
> interactive
> > > > > >>>> programming experience.
> > > > > >>>>
> > > > > >>>> The example you gave was interesting. I still have some
> > questions,
> > > > > >> though.
> > > > > >>>>
> > > > > >>>> Table source = … // some source that scans files from a
> > directory
> > > > > >>>>> “/foo/bar/“
> > > > > >>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> > > > > >>>>> Table t2 = t1.materialize() // (or `cache()`)
> > > > > >>>>
> > > > > >>>> t2.count() // initialise cache (if it’s lazily initialised)
> > > > > >>>>> int a1 = t1.count()
> > > > > >>>>> int b1 = t2.count()
> > > > > >>>>> // something in the background (or we trigger it) writes new
> > > files
> > > > to
> > > > > >>>>> /foo/bar
> > > > > >>>>> int a2 = t1.count()
> > > > > >>>>> int b2 = t2.count()
> > > > > >>>>> t2.refresh() // possible future extension, not to be
> > implemented
> > > in
> > > > > the
> > > > > >>>>> initial version
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>> what if someone else added some more files to /foo/bar at this
> > > > point?
> > > > > In
> > > > > >>>> that case, a3 won't equals to b3, and the result become
> > > > > >> non-deterministic,
> > > > > >>>> right?
> > > > > >>>>
> > > > > >>>> int a3 = t1.count()
> > > > > >>>>> int b3 = t2.count()
> > > > > >>>>> t2.drop() // another possible future extension, manual
> “cache”
> > > > > dropping
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> When we talk about interactive programming, in most cases, we
> > are
> > > > > >> talking
> > > > > >>>> about batch applications. A fundamental assumption of such
> case
> > is
> > > > > that
> > > > > >> the
> > > > > >>>> source data is complete before the data processing begins, and
> > the
> > > > > data
> > > > > >>>> will not change during the data processing. IMO, if additional
> > > rows
> > > > > >> needs
> > > > > >>>> to be added to some source during the processing, it should be
> > > done
> > > > in
> > > > > >> ways
> > > > > >>>> like union the source with another table containing the rows
> to
> > be
> > > > > >> added.
> > > > > >>>>
> > > > > >>>> There are a few cases that computations are executed
> repeatedly
> > on
> > > > the
> > > > > >>>> changing data source.
> > > > > >>>>
> > > > > >>>> For example, people may run a ML training job every hour with
> > the
> > > > > >> samples
> > > > > >>>> newly added in the past hour. In that case, the source data
> > > between
> > > > > will
> > > > > >>>> indeed change. But still, the data remain unchanged within one
> > > run.
> > > > > And
> > > > > >>>> usually in that case, the result will need versioning, i.e.
> for
> > a
> > > > > given
> > > > > >>>> result, it tells that the result is a result from the source
> > data
> > > > by a
> > > > > >>>> certain timestamp.
> > > > > >>>>
> > > > > >>>> Another example is something like data warehouse. In this
> case,
> > > > there
> > > > > >> are a
> > > > > >>>> few source of original/raw data. On top of those sources, many
> > > > > >> materialized
> > > > > >>>> view / queries / reports / dashboards can be created to
> generate
> > > > > derived
> > > > > >>>> data. Those derived data needs to be updated when the
> underlying
> > > > > >> original
> > > > > >>>> data changes. In that case, the processing logic that derives
> > the
> > > > > >> original
> > > > > >>>> data needs to be executed repeatedly to update those
> > > reports/views.
> > > > > >> Again,
> > > > > >>>> all those derived data also need to have version management,
> > such
> > > as
> > > > > >>>> timestamp.
> > > > > >>>>
> > > > > >>>> In any of the above two cases, during a single run of the
> > > processing
> > > > > >> logic,
> > > > > >>>> the data cannot change. Otherwise the behavior of the
> processing
> > > > logic
> > > > > >> may
> > > > > >>>> be undefined. In the above two examples, when writing the
> > > processing
> > > > > >> logic,
> > > > > >>>> Users can use .cache() to hint Flink that those results should
> > be
> > > > > saved
> > > > > >> to
> > > > > >>>> avoid repeated computation. And then for the result of my
> > > > application
> > > > > >>>> logic, I'll call materialize(), so that these results could be
> > > > managed
> > > > > >> by
> > > > > >>>> the system with versioning, metadata management, lifecycle
> > > > management,
> > > > > >>>> ACLs, etc.
> > > > > >>>>
> > > > > >>>> It is true we can use materialize() to do the cache() job,
> but I
> > > am
> > > > > >> really
> > > > > >>>> reluctant to shoehorn cache() into materialize() and force
> users
> > > to
> > > > > >> worry
> > > > > >>>> about a bunch of implications that they needn't have to. I am
> > > > > >> absolutely on
> > > > > >>>> your side that redundant API is bad. But it is equally
> > > frustrating,
> > > > if
> > > > > >> not
> > > > > >>>> more, that the same API does different things.
> > > > > >>>>
> > > > > >>>> Thanks,
> > > > > >>>>
> > > > > >>>> Jiangjie (Becket) Qin
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <
> > > wshaoxuan@xxxxxxxxx
> > > > >
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>> Thanks Piotrek,
> > > > > >>>>> You provided a very good example, it explains all the
> > confusions
> > > I
> > > > > >> have.
> > > > > >>>>> It is clear that there is something we have not considered in
> > the
> > > > > >> initial
> > > > > >>>>> proposal. We intend to force the user to reuse the
> > > > > cached/materialized
> > > > > >>>>> table, if its cache() method is executed. We did not expect
> > that
> > > > user
> > > > > >> may
> > > > > >>>>> want to re-executed the plan from the source table. Let me
> > > re-think
> > > > > >> about
> > > > > >>>>> it and get back to you later.
> > > > > >>>>>
> > > > > >>>>> In the meanwhile, this example/observation also infers that
> we
> > > > cannot
> > > > > >> fully
> > > > > >>>>> involve the optimizer to decide the plan if a
> cache/materialize
> > > is
> > > > > >>>>> explicitly used, because weather to reuse the cache data or
> > > > > re-execute
> > > > > >> the
> > > > > >>>>> query from source data may lead to different results. (But I
> > > guess
> > > > > >>>>> optimizer can still help in some cases ---- as long as it
> does
> > > not
> > > > > >>>>> re-execute from the varied source, we should be safe).
> > > > > >>>>>
> > > > > >>>>> Regards,
> > > > > >>>>> Shaoxuan
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski <
> > > > > >> piotr@xxxxxxxxxxxxxxxxx>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi Shaoxuan,
> > > > > >>>>>>
> > > > > >>>>>> Re 2:
> > > > > >>>>>>
> > > > > >>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is modified
> > > to->
> > > > > t1’
> > > > > >>>>>>
> > > > > >>>>>> What do you mean that “ t1 is modified to-> t1’ ” ? That
> > > > > >>>>>> `methodThatAppliesOperators()` method has changed it’s plan?
> > > > > >>>>>>
> > > > > >>>>>> I was thinking more about something like this:
> > > > > >>>>>>
> > > > > >>>>>> Table source = … // some source that scans files from a
> > > directory
> > > > > >>>>>> “/foo/bar/“
> > > > > >>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….;
> > > > > >>>>>> Table t2 = t1.materialize() // (or `cache()`)
> > > > > >>>>>>
> > > > > >>>>>> t2.count() // initialise cache (if it’s lazily initialised)
> > > > > >>>>>>
> > > > > >>>>>> int a1 = t1.count()
> > > > > >>>>>> int b1 = t2.count()
> > > > > >>>>>>
> > > > > >>>>>> // something in the background (or we trigger it) writes new
> > > files
> > > > > to
> > > > > >>>>>> /foo/bar
> > > > > >>>>>>
> > > > > >>>>>> int a2 = t1.count()
> > > > > >>>>>> int b2 = t2.count()
> > > > > >>>>>>
> > > > > >>>>>> t2.refresh() // possible future extension, not to be
> > implemented
> > > > in
> > > > > >> the
> > > > > >>>>>> initial version
> > > > > >>>>>>
> > > > > >>>>>> int a3 = t1.count()
> > > > > >>>>>> int b3 = t2.count()
> > > > > >>>>>>
> > > > > >>>>>> t2.drop() // another possible future extension, manual
> “cache”
> > > > > >> dropping
> > > > > >>>>>>
> > > > > >>>>>> assertTrue(a1 == b1) // same results, but b1 comes from the
> > > > “cache"
> > > > > >>>>>> assertTrue(b1 == b2) // both values come from the same cache
> > > > > >>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed
> > full
> > > > > table
> > > > > >>>>> scan
> > > > > >>>>>> and has more data
> > > > > >>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache
> > > > > >>>>>> assertTrue(b3 == a2 == a3)
> > > > > >>>>>>
> > > > > >>>>>> Piotrek
> > > > > >>>>>>
> > > > > >>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imjark@xxxxxxxxx>
> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>> Hi,
> > > > > >>>>>>>
> > > > > >>>>>>> It is an very interesting and useful design!
> > > > > >>>>>>>
> > > > > >>>>>>> Here I want to share some of my thoughts:
> > > > > >>>>>>>
> > > > > >>>>>>> 1. Agree with that cache() method should return some Table
> to
> > > > avoid
> > > > > >>>>> some
> > > > > >>>>>>> unexpected problems because of the mutable object.
> > > > > >>>>>>> All the existing methods of Table are returning a new Table
> > > > > instance.
> > > > > >>>>>>>
> > > > > >>>>>>> 2. I think materialize() would be more consistent with SQL,
> > > this
> > > > > >> makes
> > > > > >>>>> it
> > > > > >>>>>>> possible to support the same feature for SQL (materialize
> > view)
> > > > and
> > > > > >>>>> keep
> > > > > >>>>>>> the same API for users in the future.
> > > > > >>>>>>> But I'm also fine if we choose cache().
> > > > > >>>>>>>
> > > > > >>>>>>> 3. In the proposal, a TableService (or FlinkService?) is
> used
> > > to
> > > > > >> cache
> > > > > >>>>>> the
> > > > > >>>>>>> result of the (intermediate) table.
> > > > > >>>>>>> But the name of TableService may be a bit general which is
> > not
> > > > > quite
> > > > > >>>>>>> understanding correctly in the first glance (a metastore
> for
> > > > > >> tables?).
> > > > > >>>>>>> Maybe a more specific name would be better, such as
> > > > > TableCacheSerive
> > > > > >>>>> or
> > > > > >>>>>>> TableMaterializeSerivce or something else.
> > > > > >>>>>>>
> > > > > >>>>>>> Best,
> > > > > >>>>>>> Jark
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <
> > fhueske@xxxxxxxxx
> > > >
> > > > > >> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hi,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks for the clarification Becket!
> > > > > >>>>>>>>
> > > > > >>>>>>>> I have a few thoughts to share / questions:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1) I'd like to know how you plan to implement the feature
> > on a
> > > > > plan
> > > > > >> /
> > > > > >>>>>>>> planner level.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I would imaging the following to happen when Table.cache()
> > is
> > > > > >> called:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1) immediately optimize the Table and internally convert
> it
> > > > into a
> > > > > >>>>>>>> DataSet/DataStream. This is necessary, to avoid that
> > operators
> > > > of
> > > > > >>>>> later
> > > > > >>>>>>>> queries on top of the Table are pushed down.
> > > > > >>>>>>>> 2) register the DataSet/DataStream as a
> > > > DataSet/DataStream-backed
> > > > > >>>>> Table
> > > > > >>>>>> X
> > > > > >>>>>>>> 3) add a sink to the DataSet/DataStream. This is the
> > > > > materialization
> > > > > >>>>> of
> > > > > >>>>>> the
> > > > > >>>>>>>> Table X
> > > > > >>>>>>>>
> > > > > >>>>>>>> Based on your proposal the following would happen:
> > > > > >>>>>>>>
> > > > > >>>>>>>> Table t1 = ....
> > > > > >>>>>>>> t1.cache(); // cache() returns void. The logical plan of
> t1
> > is
> > > > > >>>>> replaced
> > > > > >>>>>> by
> > > > > >>>>>>>> a scan of X. There is also a reference to the
> > materialization
> > > of
> > > > > X.
> > > > > >>>>>>>>
> > > > > >>>>>>>> t1.count(); // this executes the program, including the
> > > > > >>>>>> DataSet/DataStream
> > > > > >>>>>>>> that backs X and the sink that writes the materialization
> > of X
> > > > > >>>>>>>> t1.count(); // this executes the program, but reads X from
> > the
> > > > > >>>>>>>> materialization.
> > > > > >>>>>>>>
> > > > > >>>>>>>> My question is, how do you determine when whether the scan
> > of
> > > t1
> > > > > >>>>> should
> > > > > >>>>>> go
> > > > > >>>>>>>> against the DataSet/DataStream program and when against
> the
> > > > > >>>>>>>> materialization?
> > > > > >>>>>>>> AFAIK, there is no hook that will tell you that a part of
> > the
> > > > > >> program
> > > > > >>>>>> was
> > > > > >>>>>>>> executed. Flipping a switch during optimization or plan
> > > > generation
> > > > > >> is
> > > > > >>>>>> not
> > > > > >>>>>>>> sufficient as there is no guarantee that the plan is also
> > > > > executed.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Overall, this behavior is somewhat similar to what I
> > proposed
> > > in
> > > > > >>>>>>>> FLINK-8950, which does not include persisting the table,
> but
> > > > just
> > > > > >>>>>>>> optimizing and reregistering it as DataSet/DataStream
> scan.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 2) I think Piotr has a point about the implicit behavior
> and
> > > > side
> > > > > >>>>>> effects
> > > > > >>>>>>>> of the cache() method if it does not return anything.
> > > > > >>>>>>>> Consider the following example:
> > > > > >>>>>>>>
> > > > > >>>>>>>> Table t1 = ???
> > > > > >>>>>>>> Table t2 = methodThatAppliesOperators(t1);
> > > > > >>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1);
> > > > > >>>>>>>>
> > > > > >>>>>>>> In this case, the behavior/performance of the plan that
> > > results
> > > > > from
> > > > > >>>>> the
> > > > > >>>>>>>> second method call depends on whether t1 was modified by
> the
> > > > first
> > > > > >>>>>> method
> > > > > >>>>>>>> or not.
> > > > > >>>>>>>> This is the classic issue of mutable vs. immutable
> objects.
> > > > > >>>>>>>> Also, as Piotr pointed out, it might also be good to have
> > the
> > > > > >> original
> > > > > >>>>>> plan
> > > > > >>>>>>>> of t1, because in some cases it is possible to push
> filters
> > > down
> > > > > >> such
> > > > > >>>>>> that
> > > > > >>>>>>>> evaluating the query from scratch might be more efficient
> > than
> > > > > >>>>> accessing
> > > > > >>>>>>>> the cache.
> > > > > >>>>>>>> Moreover, a CachedTable could extend Table() and offer a
> > > method
> > > > > >>>>>> refresh().
> > > > > >>>>>>>> This sounds quite useful in an interactive session mode.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 3) Regarding the name, I can see both arguments. IMO,
> > > > > materialize()
> > > > > >>>>>> seems
> > > > > >>>>>>>> to be more future proof.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Best, Fabian
> > > > > >>>>>>>>
> > > > > >>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang <
> > > > > >>>>>>>> wshaoxuan@xxxxxxxxx>:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Hi Piotr,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thanks for sharing your ideas on the method naming. We
> will
> > > > think
> > > > > >>>>> about
> > > > > >>>>>>>>> your suggestions. But I don't understand why we need to
> > > change
> > > > > the
> > > > > >>>>>> return
> > > > > >>>>>>>>> type of cache().
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Cache() is a physical operation, it does not change the
> > logic
> > > > of
> > > > > >>>>>>>>> the `Table`. On the tableAPI layer, we should not
> > introduce a
> > > > new
> > > > > >>>>> table
> > > > > >>>>>>>>> type unless the logic of table has been changed. If we
> > > > introduce
> > > > > a
> > > > > >>>>> new
> > > > > >>>>>>>>> table type `CachedTable`, we need create the same set of
> > > > methods
> > > > > of
> > > > > >>>>>>>> `Table`
> > > > > >>>>>>>>> for it. I don't think it is worth doing this. Or can you
> > > please
> > > > > >>>>>> elaborate
> > > > > >>>>>>>>> more on what could be the "implicit behaviours/side
> > effects"
> > > > you
> > > > > >> are
> > > > > >>>>>>>>> thinking about?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Regards,
> > > > > >>>>>>>>> Shaoxuan
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski <
> > > > > >>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> Hi Becket,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks for the response.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 1. I wasn’t saying that materialised view must be
> mutable
> > or
> > > > > not.
> > > > > >>>>> The
> > > > > >>>>>>>>> same
> > > > > >>>>>>>>>> thing applies to caches as well. To the contrary, I
> would
> > > > expect
> > > > > >>>>> more
> > > > > >>>>>>>>>> consistency and updates from something that is called
> > > “cache”
> > > > vs
> > > > > >>>>>>>>> something
> > > > > >>>>>>>>>> that’s a “materialised view”. In other words, IMO most
> > > caches
> > > > do
> > > > > >> not
> > > > > >>>>>>>>> serve
> > > > > >>>>>>>>>> you invalid/outdated data and they handle updates on
> their
> > > > own.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 2. I don’t think that having in the future two very
> > similar
> > > > > >> concepts
> > > > > >>>>>> of
> > > > > >>>>>>>>>> `materialized` view and `cache` is a good idea. It would
> > be
> > > > > >>>>> confusing
> > > > > >>>>>>>> for
> > > > > >>>>>>>>>> the users. I think it could be handled by
> > > > variations/overloading
> > > > > >> of
> > > > > >>>>>>>>>> materialised view concept. We could start with:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> `MaterializedTable materialize()` - immutable, session
> > life
> > > > > scope
> > > > > >>>>>>>>>> (basically the same semantic as you are proposing
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> And then in the future (if ever) build on top of
> > that/expand
> > > > it
> > > > > >>>>> with:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or
> > > > > >> `MaterializedTable
> > > > > >>>>>>>>>> materialize(refreshHook=…)`
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Or with cross session support:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> `MaterializedTable materializeInto(connector=…)` or
> > > > > >>>>> `MaterializedTable
> > > > > >>>>>>>>>> materializeInto(tableFactory=…)`
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I’m not saying that we should implement cross
> > > > session/refreshing
> > > > > >> now
> > > > > >>>>>> or
> > > > > >>>>>>>>>> even in the near future. I’m just arguing that naming
> > > current
> > > > > >>>>>> immutable
> > > > > >>>>>>>>>> session life scope method `materialize()` is more future
> > > proof
> > > > > and
> > > > > >>>>>> more
> > > > > >>>>>>>>>> consistent with SQL (on which after all table-api is
> > heavily
> > > > > >> basing
> > > > > >>>>>>>> on).
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 3. Even if we agree on naming it `cache()`, I would
> still
> > > > insist
> > > > > >> on
> > > > > >>>>>>>>>> `cache()` returning `CachedTable` handle to avoid
> implicit
> > > > > >>>>>>>>> behaviours/side
> > > > > >>>>>>>>>> effects and to give both us & users more flexibility.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Piotrek
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <
> > becket.qin@xxxxxxxxx
> > > >
> > > > > >> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Just to add a little bit, the materialized view is
> > probably
> > > > > more
> > > > > >>>>>>>>> similar
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>>> the persistent() brought up earlier in the thread. So
> it
> > is
> > > > > >> usually
> > > > > >>>>>>>>> cross
> > > > > >>>>>>>>>>> session and could be used in a larger scope. For
> > example, a
> > > > > >>>>>>>>> materialized
> > > > > >>>>>>>>>>> view created by user A may be visible to user B. It is
> > > > probably
> > > > > >>>>>>>>> something
> > > > > >>>>>>>>>>> we want to have in the future. I'll put it in the
> future
> > > work
> > > > > >>>>>>>> section.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <
> > > > > becket.qin@xxxxxxxxx
> > > > > >>>
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> Hi Piotrek,
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Thanks for the explanation.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Right now we are mostly thinking of the cached table
> as
> > > > > >>>>> immutable. I
> > > > > >>>>>>>>> can
> > > > > >>>>>>>>>>>> see the Materialized view would be useful in the
> future.
> > > > That
> > > > > >>>>> said,
> > > > > >>>>>>>> I
> > > > > >>>>>>>>>> think
> > > > > >>>>>>>>>>>> a simple cache mechanism is probably still needed. So
> to
> > > me,
> > > > > >>>>> cache()
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>>> materialize() should be two separate method as they
> > > address
> > > > > >>>>>>>> different
> > > > > >>>>>>>>>>>> needs. Materialize() is a higher level concept usually
> > > > > implying
> > > > > >>>>>>>>>> periodical
> > > > > >>>>>>>>>>>> update, while cache() has much simpler semantic. For
> > > > example,
> > > > > >> one
> > > > > >>>>>>>> may
> > > > > >>>>>>>>>>>> create a materialized view and use cache() method in
> the
> > > > > >>>>>>>> materialized
> > > > > >>>>>>>>>> view
> > > > > >>>>>>>>>>>> creation logic. So that during the materialized view
> > > update,
> > > > > >> they
> > > > > >>>>> do
> > > > > >>>>>>>>> not
> > > > > >>>>>>>>>>>> need to worry about the case that the cached table is
> > also
> > > > > >>>>> changed.
> > > > > >>>>>>>>>> Maybe
> > > > > >>>>>>>>>>>> under the hood, materialized() and cache() could share
> > > some
> > > > > >>>>>>>> mechanism,
> > > > > >>>>>>>>>> but
> > > > > >>>>>>>>>>>> I think a simple cache() method would be handy in a
> lot
> > of
> > > > > >> cases.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski <
> > > > > >>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Hi Becket,
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Is there any extra thing user can do on a
> > > > MaterializedTable
> > > > > >> that
> > > > > >>>>>>>>> they
> > > > > >>>>>>>>>>>>> cannot do on a Table?
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Maybe not in the initial implementation, but various
> > DBs
> > > > > offer
> > > > > >>>>>>>>>> different
> > > > > >>>>>>>>>>>>> ways to “refresh” the materialised view. Hooks,
> > triggers,
> > > > > >> timers,
> > > > > >>>>>>>>>> manually
> > > > > >>>>>>>>>>>>> etc. Having `MaterializedTable` would help us to
> handle
> > > > that
> > > > > in
> > > > > >>>>> the
> > > > > >>>>>>>>>> future.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> After users call *table.cache(), *users can just use
> > > that
> > > > > >> table
> > > > > >>>>>>>> and
> > > > > >>>>>>>>> do
> > > > > >>>>>>>>>>>>> anything that is supported on a Table, including SQL.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> This is some implicit behaviour with side effects.
> > > Imagine
> > > > if
> > > > > >>>>> user
> > > > > >>>>>>>>> has
> > > > > >>>>>>>>>> a
> > > > > >>>>>>>>>>>>> long and complicated program, that touches table `b`
> > > > multiple
> > > > > >>>>>>>> times,
> > > > > >>>>>>>>>> maybe
> > > > > >>>>>>>>>>>>> scattered around different methods. If he modifies
> his
> > > > > program
> > > > > >> by
> > > > > >>>>>>>>>> inserting
> > > > > >>>>>>>>>>>>> in one place
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> b.cache()
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> This implicitly alters the semantic and behaviour of
> > his
> > > > code
> > > > > >> all
> > > > > >>>>>>>>> over
> > > > > >>>>>>>>>>>>> the place, maybe in a ways that might cause problems.
> > For
> > > > > >> example
> > > > > >>>>>>>>> what
> > > > > >>>>>>>>>> if
> > > > > >>>>>>>>>>>>> underlying data is changing?
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Having invisible side effects is also not very clean,
> > for
> > > > > >> example
> > > > > >>>>>>>>> think
> > > > > >>>>>>>>>>>>> about something like this (but more complicated):
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Table b = ...;
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> If (some_condition) {
> > > > > >>>>>>>>>>>>> processTable1(b)
> > > > > >>>>>>>>>>>>> }
> > > > > >>>>>>>>>>>>> else {
> > > > > >>>>>>>>>>>>> processTable2(b)
> > > > > >>>>>>>>>>>>> }
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> // do more stuff with b
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> And user adds `b.cache()` call to only one of the
> > > > > >> `processTable1`
> > > > > >>>>>>>> or
> > > > > >>>>>>>>>>>>> `processTable2` methods.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On the other hand
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Table materialisedB = b.materialize()
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Avoids (at least some of) the side effect issues and
> > > forces
> > > > > >> user
> > > > > >>>>> to
> > > > > >>>>>>>>>>>>> explicitly use `materialisedB` where it’s appropriate
> > and
> > > > > >> forces
> > > > > >>>>>>>> user
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>>>>> think what does it actually mean. And if something
> > > doesn’t
> > > > > work
> > > > > >>>>> in
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>> end
> > > > > >>>>>>>>>>>>> for the user, he will know what has he changed
> instead
> > of
> > > > > >> blaming
> > > > > >>>>>>>>>> Flink for
> > > > > >>>>>>>>>>>>> some “magic” underneath. In the above example, after
> > > > > >>>>> materialising
> > > > > >>>>>>>> b
> > > > > >>>>>>>>> in
> > > > > >>>>>>>>>>>>> only one of the methods, he should/would realise
> about
> > > the
> > > > > >> issue
> > > > > >>>>>>>> when
> > > > > >>>>>>>>>>>>> handling the return value `MaterializedTable` of that
> > > > method.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> I guess it comes down to personal preferences if you
> > like
> > > > > >> things
> > > > > >>>>> to
> > > > > >>>>>>>>> be
> > > > > >>>>>>>>>>>>> implicit or not. The more power is the user, probably
> > the
> > > > > more
> > > > > >>>>>>>> likely
> > > > > >>>>>>>>>> he is
> > > > > >>>>>>>>>>>>> to like/understand implicit behaviour. And we as
> Table
> > > API
> > > > > >>>>>>>> designers
> > > > > >>>>>>>>>> are
> > > > > >>>>>>>>>>>>> the most power users out there, so I would proceed
> with
> > > > > caution
> > > > > >>>>> (so
> > > > > >>>>>>>>>> that we
> > > > > >>>>>>>>>>>>> do not end up in the crazy perl realm with it’s
> lovely
> > > > > implicit
> > > > > >>>>>>>>> method
> > > > > >>>>>>>>>>>>> arguments ;)  <
> > > > https://stackoverflow.com/a/14922656/8149051
> > > > > >)
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Table API to also support non-relational processing
> > > cases,
> > > > > >>>>> cache()
> > > > > >>>>>>>>>>>>> might be slightly better.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> I think even such extended Table API could benefit
> from
> > > > > >> sticking
> > > > > >>>>>>>>>> to/being
> > > > > >>>>>>>>>>>>> consistent with SQL where both SQL and Table API are
> > > > > basically
> > > > > >>>>> the
> > > > > >>>>>>>>>> same.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> One more thing. `MaterializedTable materialize()`
> could
> > > be
> > > > > more
> > > > > >>>>>>>>>>>>> powerful/flexible allowing the user to operate both
> on
> > > > > >>>>> materialised
> > > > > >>>>>>>>>> and not
> > > > > >>>>>>>>>>>>> materialised view at the same time for whatever
> reasons
> > > > > >>>>> (underlying
> > > > > >>>>>>>>>> data
> > > > > >>>>>>>>>>>>> changing/better optimisation opportunities after
> > pushing
> > > > down
> > > > > >>>>> more
> > > > > >>>>>>>>>> filters
> > > > > >>>>>>>>>>>>> etc). For example:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Table b = …;
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> MaterlizedTable mb = b.materialize();
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Val min = mb.min();
> > > > > >>>>>>>>>>>>> Val max = mb.max();
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42);
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Could be more efficient compared to `b.cache()` if
> > > > > >>>>> `filter(‘userId
> > > > > >>>>>>>> =
> > > > > >>>>>>>>>>>>> 42);` allows for much more aggressive optimisations.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Piotrek
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <
> > > > fhueske@xxxxxxxxx>
> > > > > >>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I'm not suggesting to add support for Ignite. This
> was
> > > > just
> > > > > an
> > > > > >>>>>>>>>> example.
> > > > > >>>>>>>>>>>>>> Plasma and Arrow sound interesting, too.
> > > > > >>>>>>>>>>>>>> For the sake of this proposal, it would be up to the
> > > user
> > > > to
> > > > > >>>>>>>>>> implement a
> > > > > >>>>>>>>>>>>>> TableFactory and corresponding TableSource /
> TableSink
> > > > > classes
> > > > > >>>>> to
> > > > > >>>>>>>>>>>>> persist
> > > > > >>>>>>>>>>>>>> and read the data.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio
> > > > > Pompermaier
> > > > > >> <
> > > > > >>>>>>>>>>>>>> pompermaier@xxxxxxxx>:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as an
> > > > > >> alternative
> > > > > >>>>> to
> > > > > >>>>>>>>>>>>> Apache
> > > > > >>>>>>>>>>>>>>> Ignite?
> > > > > >>>>>>>>>>>>>>> [1]
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>
> > > > > >>
> > > >
> > https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske <
> > > > > >>>>>>>> fhueske@xxxxxxxxx>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Hi,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Thanks for the proposal!
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> To summarize, you propose a new method
> > Table.cache():
> > > > > Table
> > > > > >>>>> that
> > > > > >>>>>>>>>> will
> > > > > >>>>>>>>>>>>>>>> trigger a job and write the result into some
> > temporary
> > > > > >> storage
> > > > > >>>>>>>> as
> > > > > >>>>>>>>>>>>> defined
> > > > > >>>>>>>>>>>>>>>> by a TableFactory.
> > > > > >>>>>>>>>>>>>>>> The cache() call blocks while the job is running
> and
> > > > > >>>>> eventually
> > > > > >>>>>>>>>>>>> returns a
> > > > > >>>>>>>>>>>>>>>> Table object that represents a scan of the
> temporary
> > > > > table.
> > > > > >>>>>>>>>>>>>>>> When the "session" is closed (closing to be
> > defined?),
> > > > the
> > > > > >>>>>>>>> temporary
> > > > > >>>>>>>>>>>>>>> tables
> > > > > >>>>>>>>>>>>>>>> are all dropped.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> I think this behavior makes sense and is a good
> > first
> > > > step
> > > > > >>>>>>>> towards
> > > > > >>>>>>>>>>>>> more
> > > > > >>>>>>>>>>>>>>>> interactive workloads.
> > > > > >>>>>>>>>>>>>>>> However, its performance suffers from writing to
> and
> > > > > reading
> > > > > >>>>>>>> from
> > > > > >>>>>>>>>>>>>>> external
> > > > > >>>>>>>>>>>>>>>> systems.
> > > > > >>>>>>>>>>>>>>>> I think this is OK for now. Changes that would
> > > > > significantly
> > > > > >>>>>>>>> improve
> > > > > >>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory across
> jobs)
> > > > would
> > > > > >>>>> have
> > > > > >>>>>>>>>> large
> > > > > >>>>>>>>>>>>>>>> impacts on many components of Flink.
> > > > > >>>>>>>>>>>>>>>> Users could use in-memory filesystems or storage
> > grids
> > > > > >> (Apache
> > > > > >>>>>>>>>>>>> Ignite) to
> > > > > >>>>>>>>>>>>>>>> mitigate some of the performance effects.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Best, Fabian
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket
> > Qin
> > > <
> > > > > >>>>>>>>>>>>>>>> becket.qin@xxxxxxxxx
> > > > > >>>>>>>>>>>>>>>>> :
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Is there any extra thing user can do on a
> > > > > MaterializedTable
> > > > > >>>>>>>> that
> > > > > >>>>>>>>>> they
> > > > > >>>>>>>>>>>>>>>>> cannot do on a Table? After users call
> > > *table.cache(),
> > > > > >> *users
> > > > > >>>>>>>> can
> > > > > >>>>>>>>>>>>> just
> > > > > >>>>>>>>>>>>>>>> use
> > > > > >>>>>>>>>>>>>>>>> that table and do anything that is supported on a
> > > > Table,
> > > > > >>>>>>>>> including
> > > > > >>>>>>>>>>>>> SQL.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Naming wise, either cache() or materialize()
> sounds
> > > > fine
> > > > > to
> > > > > >>>>> me.
> > > > > >>>>>>>>>>>>> cache()
> > > > > >>>>>>>>>>>>>>>> is
> > > > > >>>>>>>>>>>>>>>>> a bit more general than materialize(). Given that
> > we
> > > > are
> > > > > >>>>>>>>> enhancing
> > > > > >>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>> Table API to also support non-relational
> processing
> > > > > cases,
> > > > > >>>>>>>>> cache()
> > > > > >>>>>>>>>>>>>>> might
> > > > > >>>>>>>>>>>>>>>> be
> > > > > >>>>>>>>>>>>>>>>> slightly better.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski <
> > > > > >>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Hi Becket,
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to
> > reuse
> > > > > >> existing
> > > > > >>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed
> > that
> > > > you
> > > > > >>>>> want
> > > > > >>>>>>>> to
> > > > > >>>>>>>>>>>>>>>> provide
> > > > > >>>>>>>>>>>>>>>>> an
> > > > > >>>>>>>>>>>>>>>>>> alternate way of writing the data.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Now that I hopefully understand the proposal,
> > maybe
> > > we
> > > > > >> could
> > > > > >>>>>>>>>> rename
> > > > > >>>>>>>>>>>>>>>>>> `cache()` to
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> void materialize()
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> or going step further
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> MaterializedTable materialize()
> > > > > >>>>>>>>>>>>>>>>>> MaterializedTable createMaterializedView()
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> ?
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> The second option with returning a handle I
> think
> > is
> > > > > more
> > > > > >>>>>>>>> flexible
> > > > > >>>>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>> could provide features such as
> “refresh”/“delete”
> > or
> > > > > >>>>> generally
> > > > > >>>>>>>>>>>>>>> speaking
> > > > > >>>>>>>>>>>>>>>>>> manage the the view. In the future we could also
> > > think
> > > > > >> about
> > > > > >>>>>>>>>> adding
> > > > > >>>>>>>>>>>>>>>> hooks
> > > > > >>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is also
> more
> > > > > >> explicit
> > > > > >>>>> -
> > > > > >>>>>>>>>>>>>>>>>> materialization returning a new table handle
> will
> > > not
> > > > > have
> > > > > >>>>> the
> > > > > >>>>>>>>>> same
> > > > > >>>>>>>>>>>>>>>>>> implicit side effects as adding a simple line of
> > > code
> > > > > like
> > > > > >>>>>>>>>>>>>>> `b.cache()`
> > > > > >>>>>>>>>>>>>>>>>> would have.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> It would also be more SQL like, making it more
> > > > intuitive
> > > > > >> for
> > > > > >>>>>>>>> users
> > > > > >>>>>>>>>>>>>>>>> already
> > > > > >>>>>>>>>>>>>>>>>> familiar with the SQL.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Piotrek
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin <
> > > > > >> becket.qin@xxxxxxxxx
> > > > > >>>>>>
> > > > > >>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Hi Piotrek,
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is
> > > equivalent
> > > > to
> > > > > >>>>>>>>> creating
> > > > > >>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>> BUILT-IN
> > > > > >>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That
> > > > functionality
> > > > > is
> > > > > >>>>>>>>> missing
> > > > > >>>>>>>>>>>>>>>>> today,
> > > > > >>>>>>>>>>>>>>>>>>> though. Not sure if I understand your question.
> > Do
> > > > you
> > > > > >> mean
> > > > > >>>>>>>> we
> > > > > >>>>>>>>>>>>>>>> already
> > > > > >>>>>>>>>>>>>>>>>> have
> > > > > >>>>>>>>>>>>>>>>>>> the functionality and just need a syntax sugar?
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> What's more interesting in the proposal is do
> we
> > > want
> > > > > to
> > > > > >>>>> stop
> > > > > >>>>>>>>> at
> > > > > >>>>>>>>>>>>>>>>> creating
> > > > > >>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to extend
> > that
> > > > in
> > > > > >> the
> > > > > >>>>>>>>> future
> > > > > >>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>> more
> > > > > >>>>>>>>>>>>>>>>>>> useful unified data store distributed with
> Flink?
> > > And
> > > > > do
> > > > > >> we
> > > > > >>>>>>>>> want
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>> have
> > > > > >>>>>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job pattern
> > with
> > > > > their
> > > > > >>>>> own
> > > > > >>>>>>>>>> user
> > > > > >>>>>>>>>>>>>>>>>> defined
> > > > > >>>>>>>>>>>>>>>>>>> services. These considerations are much more
> > > > > >> architectural.
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski
> <
> > > > > >>>>>>>>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
> > > > > >>>>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>> Hi,
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand the
> > > > > problem.
> > > > > >>>>>>>> Isn’t
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data
> to
> > a
> > > > sink
> > > > > >> and
> > > > > >>>>>>>>> later
> > > > > >>>>>>>>>>>>>>>>> reading
> > > > > >>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live
> > > > scope/live
> > > > > >>>>> time?
> > > > > >>>>>>>>> And
> > > > > >>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>> sink
> > > > > >>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a file
> > sink?
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a
> > > > materialised
> > > > > >>>>> view
> > > > > >>>>>>>>>> from a
> > > > > >>>>>>>>>>>>>>>>> table
> > > > > >>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and reusing
> > > this
> > > > > >>>>>>>>> materialised
> > > > > >>>>>>>>>>>>>>>> view
> > > > > >>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to
> clean
> > up
> > > > > >>>>>>>>> materialised
> > > > > >>>>>>>>>>>>>>>> views
> > > > > >>>>>>>>>>>>>>>>>> (for
> > > > > >>>>>>>>>>>>>>>>>>>> example when current session finishes)? Maybe
> we
> > > > need
> > > > > >> some
> > > > > >>>>>>>>>>>>>>> syntactic
> > > > > >>>>>>>>>>>>>>>>>> sugar
> > > > > >>>>>>>>>>>>>>>>>>>> on top of it?
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>> Piotrek
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin <
> > > > > >>>>> becket.qin@xxxxxxxxx
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng.
> > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a
> persist()
> > > > with
> > > > > >>>>>>>>>>>>>>>>> lifecycle/defined
> > > > > >>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the future
> > work
> > > > for
> > > > > >>>>> this.
> > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun
> <
> > > > > >>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie,
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the name
> > of
> > > > > >>>>>>>> `cache()`, I
> > > > > >>>>>>>>>>>>>>>>>> understand
> > > > > >>>>>>>>>>>>>>>>>>>> why
> > > > > >>>>>>>>>>>>>>>>>>>>>> you designed this way!
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a
> > > lifecycle
> > > > > for
> > > > > >>>>>>>> data
> > > > > >>>>>>>>>>>>>>>>>> persistence?
> > > > > >>>>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so
> > > that
> > > > > the
> > > > > >>>>> user
> > > > > >>>>>>>>> is
> > > > > >>>>>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>>>>>>>>> worried
> > > > > >>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify
> the
> > > time
> > > > > >> range
> > > > > >>>>>>>> for
> > > > > >>>>>>>>>>>>>>>> keeping
> > > > > >>>>>>>>>>>>>>>>>>>> time.
> > > > > >>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we
> can
> > > > also
> > > > > >>>>> share
> > > > > >>>>>>>>> in a
> > > > > >>>>>>>>>>>>>>>>> certain
> > > > > >>>>>>>>>>>>>>>>>>>>>> group of session, for example:
> > > > > >>>>>>>>> LifeCycle.SESSION_GROUP(...), I
> > > > > >>>>>>>>>>>>>>> am
> > > > > >>>>>>>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>>>>>>>>> sure,
> > > > > >>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference
> > only!
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>> Bests,
> > > > > >>>>>>>>>>>>>>>>>>>>>> Jincheng
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx>
> > > 于2018年11月23日周五
> > > > > >>>>>>>> 下午1:33写道:
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache()
> > v.s.
> > > > > >>>>>>>> persist(),
> > > > > >>>>>>>>>>>>>>>>>> personally I
> > > > > >>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately
> describing
> > > the
> > > > > >>>>>>>> behavior,
> > > > > >>>>>>>>>>>>>>> i.e.
> > > > > >>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>>>> Table
> > > > > >>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be
> > deleted
> > > > > after
> > > > > >>>>> the
> > > > > >>>>>>>>>>>>>>> session
> > > > > >>>>>>>>>>>>>>>> is
> > > > > >>>>>>>>>>>>>>>>>>>>>> closed.
> > > > > >>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as
> people
> > > > might
> > > > > >>>>> think
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>> table
> > > > > >>>>>>>>>>>>>>>>>>>> will
> > > > > >>>>>>>>>>>>>>>>>>>>>>> still be there even after the session is
> > gone.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and
> stream
> > > > > >>>>> processing
> > > > > >>>>>>>> in
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>> same
> > > > > >>>>>>>>>>>>>>>>>>>> job.
> > > > > >>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that
> goal.
> > I
> > > > > >> imagine
> > > > > >>>>>>>> that
> > > > > >>>>>>>>>>>>>>> would
> > > > > >>>>>>>>>>>>>>>>> be
> > > > > >>>>>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>>>>>> huge
> > > > > >>>>>>>>>>>>>>>>>>>>>>> change across the board, including sources,
> > > > > operators
> > > > > >>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>> optimizations,
> > > > > >>>>>>>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several
> > separate
> > > > > >>>>> in-depth
> > > > > >>>>>>>>>>>>>>>>> discussions.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan
> Cui <
> > > > > >>>>>>>>>>>>>>> xingcanc@xxxxxxxxx>
> > > > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access
> > > > domain
> > > > > >> are
> > > > > >>>>>>>> both
> > > > > >>>>>>>>>>>>>>>>>> orthogonal
> > > > > >>>>>>>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may
> be
> > > the
> > > > > >> first
> > > > > >>>>>>>> time
> > > > > >>>>>>>>>> we
> > > > > >>>>>>>>>>>>>>>> plan
> > > > > >>>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism other
> > than
> > > > the
> > > > > >>>>>>>> state.
> > > > > >>>>>>>>>>>>>>> Maybe
> > > > > >>>>>>>>>>>>>>>>> it’s
> > > > > >>>>>>>>>>>>>>>>>>>>>>> better
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then
> > > concentrate
> > > > > on
> > > > > >> a
> > > > > >>>>>>>>>> specific
> > > > > >>>>>>>>>>>>>>>>> part?
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned
> > > with
> > > > > the
> > > > > >>>>>>>>>> underlying
> > > > > >>>>>>>>>>>>>>>>>>>> service.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to
> the
> > > > > >> existing
> > > > > >>>>>>>>>>>>>>> codebase.
> > > > > >>>>>>>>>>>>>>>> As
> > > > > >>>>>>>>>>>>>>>>>> you
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be extendible
> to
> > > > > support
> > > > > >>>>>>>> other
> > > > > >>>>>>>>>>>>>>>>>> components
> > > > > >>>>>>>>>>>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another
> thread.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more
> > > > > >> interactive
> > > > > >>>>>>>>> Table
> > > > > >>>>>>>>>>>>>>>> API,
> > > > > >>>>>>>>>>>>>>>>> in
> > > > > >>>>>>>>>>>>>>>>>>>>>> case
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service
> > > > > mechanism.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> Xingcan
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei
> > Jiang <
> > > > > >>>>>>>>>>>>>>>> xiaoweij@xxxxxxxxx>
> > > > > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table
> > for
> > > > > clean
> > > > > >> up
> > > > > >>>>>>>> is
> > > > > >>>>>>>>>> not
> > > > > >>>>>>>>>>>>>>>> very
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> reliable.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be
> > > executed
> > > > > >>>>>>>>>> successfully.
> > > > > >>>>>>>>>>>>>>> We
> > > > > >>>>>>>>>>>>>>>>> may
> > > > > >>>>>>>>>>>>>>>>>>>>>>> risk
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that
> it's
> > > > safer
> > > > > to
> > > > > >>>>>>>> have
> > > > > >>>>>>>>> an
> > > > > >>>>>>>>>>>>>>>>>>>>>> association
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So we
> > can
> > > > > always
> > > > > >>>>>>>> clean
> > > > > >>>>>>>>>> up
> > > > > >>>>>>>>>>>>>>>> temp
> > > > > >>>>>>>>>>>>>>>>>>>>>>> tables
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any
> > > active
> > > > > >>>>>>>> sessions.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng
> > > sun <
> > > > > >>>>>>>>>>>>>>>>>>>>>>> sunjincheng121@xxxxxxxxx>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great
> proposal!
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful
> and
> > > > user
> > > > > >>>>>>>> friendly
> > > > > >>>>>>>>>> in
> > > > > >>>>>>>>>>>>>>>> case
> > > > > >>>>>>>>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>>>>> your
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> examples.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has
> > to
> > > be
> > > > > >>>>>>>> executed
> > > > > >>>>>>>>> in
> > > > > >>>>>>>>>>>>>>>>> several
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> stages
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline
> of
> > > > Flink
> > > > > >> ML,
> > > > > >>>>> in
> > > > > >>>>>>>>>> order
> > > > > >>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>>>>> utilize
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we have
> > to
> > > > > >> submit a
> > > > > >>>>>>>> job
> > > > > >>>>>>>>>> by
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> env.execute().
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()`  , I think is better
> > to
> > > > > named
> > > > > >>>>>>>>>>>>>>> `persist()`,
> > > > > >>>>>>>>>>>>>>>>> And
> > > > > >>>>>>>>>>>>>>>>>>>>>> The
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we
> > > > internally
> > > > > >>>>> cache
> > > > > >>>>>>>>> in
> > > > > >>>>>>>>>>>>>>>> memory
> > > > > >>>>>>>>>>>>>>>>>> or
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> persist
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the
> data
> > > into
> > > > > >> state
> > > > > >>>>>>>>>> backend
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or
> RocksDBStateBackend
> > > > etc.)
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the
> > > future,
> > > > > >>>>> support
> > > > > >>>>>>>>> for
> > > > > >>>>>>>>>>>>>>>>>> streaming
> > > > > >>>>>>>>>>>>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job
> will
> > > also
> > > > > >>>>> benefit
> > > > > >>>>>>>>> in
> > > > > >>>>>>>>>>>>>>>>>>>>>> "Interactive
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Programming",  I am looking forward to
> > your
> > > > > JIRAs
> > > > > >>>>> and
> > > > > >>>>>>>>>> FLIP!
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket.qin@xxxxxxxxx>
> > > > > 于2018年11月20日周二
> > > > > >>>>>>>>>> 下午9:56写道:
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have
> > pointed
> > > > out,
> > > > > >> it
> > > > > >>>>>>>> is a
> > > > > >>>>>>>>>>>>>>>>> promising
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API
> in
> > > > > various
> > > > > >>>>>>>>>> aspects,
> > > > > >>>>>>>>>>>>>>>>>>>>>> including
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among
> > others.
> > > > One
> > > > > >> of
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>> scenarios
> > > > > >>>>>>>>>>>>>>>>>>>>>>> where
> > > > > >>>>>>>>>>>>>>>>>>>>>>>> we
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive
> > > > > >>>>> programming.
> > > > > >>>>>>>> To
> > > > > >>>>>>>>>>>>>>>> explain
> > > > > >>>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> issues
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the
> > > > solution,
> > > > > we
> > > > > >>>>> put
> > > > > >>>>>>>>>>>>>>>> together
> > > > > >>>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal.
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome!
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>