osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Support Interactive Programming in Flink Table API


What about to add also Apache Plasma + Arrow as an alternative to Apache
Ignite?
[1] https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/

On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske <fhueske@xxxxxxxxx> wrote:

> Hi,
>
> Thanks for the proposal!
>
> To summarize, you propose a new method Table.cache(): Table that will
> trigger a job and write the result into some temporary storage as defined
> by a TableFactory.
> The cache() call blocks while the job is running and eventually returns a
> Table object that represents a scan of the temporary table.
> When the "session" is closed (closing to be defined?), the temporary tables
> are all dropped.
>
> I think this behavior makes sense and is a good first step towards more
> interactive workloads.
> However, its performance suffers from writing to and reading from external
> systems.
> I think this is OK for now. Changes that would significantly improve the
> situation (i.e., pinning data in-memory across jobs) would have large
> impacts on many components of Flink.
> Users could use in-memory filesystems or storage grids (Apache Ignite) to
> mitigate some of the performance effects.
>
> Best, Fabian
>
>
>
> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin <
> becket.qin@xxxxxxxxx
> >:
>
> > Thanks for the explanation, Piotrek.
> >
> > Is there any extra thing user can do on a MaterializedTable that they
> > cannot do on a Table? After users call *table.cache(), *users can just
> use
> > that table and do anything that is supported on a Table, including SQL.
> >
> > Naming wise, either cache() or materialize() sounds fine to me. cache()
> is
> > a bit more general than materialize(). Given that we are enhancing the
> > Table API to also support non-relational processing cases, cache() might
> be
> > slightly better.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx
> >
> > wrote:
> >
> > > Hi Becket,
> > >
> > > Ops, sorry I didn’t notice that you intend to reuse existing
> > > `TableFactory`. I don’t know why, but I assumed that you want to
> provide
> > an
> > > alternate way of writing the data.
> > >
> > > Now that I hopefully understand the proposal, maybe we could rename
> > > `cache()` to
> > >
> > > void materialize()
> > >
> > > or going step further
> > >
> > > MaterializedTable materialize()
> > > MaterializedTable createMaterializedView()
> > >
> > > ?
> > >
> > > The second option with returning a handle I think is more flexible and
> > > could provide features such as “refresh”/“delete” or generally speaking
> > > manage the the view. In the future we could also think about adding
> hooks
> > > to automatically refresh view etc. It is also more explicit -
> > > materialization returning a new table handle will not have the same
> > > implicit side effects as adding a simple line of code like `b.cache()`
> > > would have.
> > >
> > > It would also be more SQL like, making it more intuitive for users
> > already
> > > familiar with the SQL.
> > >
> > > Piotrek
> > >
> > > > On 23 Nov 2018, at 14:53, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> > > >
> > > > Hi Piotrek,
> > > >
> > > > For the cache() method itself, yes, it is equivalent to creating a
> > > BUILT-IN
> > > > materialized view with a lifecycle. That functionality is missing
> > today,
> > > > though. Not sure if I understand your question. Do you mean we
> already
> > > have
> > > > the functionality and just need a syntax sugar?
> > > >
> > > > What's more interesting in the proposal is do we want to stop at
> > creating
> > > > the materialized view? Or do we want to extend that in the future to
> a
> > > more
> > > > useful unified data store distributed with Flink? And do we want to
> > have
> > > a
> > > > mechanism allow more flexible user job pattern with their own user
> > > defined
> > > > services. These considerations are much more architectural.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski <
> > piotr@xxxxxxxxxxxxxxxxx>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> Interesting idea. I’m trying to understand the problem. Isn’t the
> > > >> `cache()` call an equivalent of writing data to a sink and later
> > reading
> > > >> from it? Where this sink has a limited live scope/live time? And the
> > > sink
> > > >> could be implemented as in memory or a file sink?
> > > >>
> > > >> If so, what’s the problem with creating a materialised view from a
> > table
> > > >> “b” (from your document’s example) and reusing this materialised
> view
> > > >> later? Maybe we are lacking mechanisms to clean up materialised
> views
> > > (for
> > > >> example when current session finishes)? Maybe we need some syntactic
> > > sugar
> > > >> on top of it?
> > > >>
> > > >> Piotrek
> > > >>
> > > >>> On 23 Nov 2018, at 07:21, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> > > >>>
> > > >>> Thanks for the suggestion, Jincheng.
> > > >>>
> > > >>> Yes, I think it makes sense to have a persist() with
> > lifecycle/defined
> > > >>> scope. I just added a section in the future work for this.
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Jiangjie (Becket) Qin
> > > >>>
> > > >>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <
> > sunjincheng121@xxxxxxxxx
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Jiangjie,
> > > >>>>
> > > >>>> Thank you for the explanation about the name of `cache()`, I
> > > understand
> > > >> why
> > > >>>> you designed this way!
> > > >>>>
> > > >>>> Another idea is whether we can specify a lifecycle for data
> > > persistence?
> > > >>>> For example, persist (LifeCycle.SESSION), so that the user is not
> > > >> worried
> > > >>>> about data loss, and will clearly specify the time range for
> keeping
> > > >> time.
> > > >>>> At the same time, if we want to expand, we can also share in a
> > certain
> > > >>>> group of session, for example: LifeCycle.SESSION_GROUP(...), I am
> > not
> > > >> sure,
> > > >>>> just an immature suggestion, for reference only!
> > > >>>>
> > > >>>> Bests,
> > > >>>> Jincheng
> > > >>>>
> > > >>>> Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月23日周五 下午1:33写道:
> > > >>>>
> > > >>>>> Re: Jincheng,
> > > >>>>>
> > > >>>>> Thanks for the feedback. Regarding cache() v.s. persist(),
> > > personally I
> > > >>>>> find cache() to be more accurately describing the behavior, i.e.
> > the
> > > >>>> Table
> > > >>>>> is cached for the session, but will be deleted after the session
> is
> > > >>>> closed.
> > > >>>>> persist() seems a little misleading as people might think the
> table
> > > >> will
> > > >>>>> still be there even after the session is gone.
> > > >>>>>
> > > >>>>> Great point about mixing the batch and stream processing in the
> > same
> > > >> job.
> > > >>>>> We should absolutely move towards that goal. I imagine that would
> > be
> > > a
> > > >>>> huge
> > > >>>>> change across the board, including sources, operators and
> > > >> optimizations,
> > > >>>> to
> > > >>>>> name some. Likely we will need several separate in-depth
> > discussions.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>>
> > > >>>>> Jiangjie (Becket) Qin
> > > >>>>>
> > > >>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <xingcanc@xxxxxxxxx>
> > > >> wrote:
> > > >>>>>
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> @Shaoxuan, I think the lifecycle or access domain are both
> > > orthogonal
> > > >>>> to
> > > >>>>>> the cache problem. Essentially, this may be the first time we
> plan
> > > to
> > > >>>>>> introduce another storage mechanism other than the state. Maybe
> > it’s
> > > >>>>> better
> > > >>>>>> to first draw a big picture and then concentrate on a specific
> > part?
> > > >>>>>>
> > > >>>>>> @Becket, yes, actually I am more concerned with the underlying
> > > >> service.
> > > >>>>>> This seems to be quite a major change to the existing codebase.
> As
> > > you
> > > >>>>>> claimed, the service should be extendible to support other
> > > components
> > > >>>> and
> > > >>>>>> we’d better discussed it in another thread.
> > > >>>>>>
> > > >>>>>> All in all, I also eager to enjoy the more interactive Table
> API,
> > in
> > > >>>> case
> > > >>>>>> of a general and flexible enough service mechanism.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Xingcan
> > > >>>>>>
> > > >>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <
> xiaoweij@xxxxxxxxx>
> > > >>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Relying on a callback for the temp table for clean up is not
> very
> > > >>>>>> reliable.
> > > >>>>>>> There is no guarantee that it will be executed successfully. We
> > may
> > > >>>>> risk
> > > >>>>>>> leaks when that happens. I think that it's safer to have an
> > > >>>> association
> > > >>>>>>> between temp table and session id. So we can always clean up
> temp
> > > >>>>> tables
> > > >>>>>>> which are no longer associated with any active sessions.
> > > >>>>>>>
> > > >>>>>>> Regards,
> > > >>>>>>> Xiaowei
> > > >>>>>>>
> > > >>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
> > > >>>>> sunjincheng121@xxxxxxxxx>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi Jiangjie&Shaoxuan,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for initiating this great proposal!
> > > >>>>>>>>
> > > >>>>>>>> Interactive Programming is very useful and user friendly in
> case
> > > of
> > > >>>>> your
> > > >>>>>>>> examples.
> > > >>>>>>>> Moreover, especially when a business has to be executed in
> > several
> > > >>>>>> stages
> > > >>>>>>>> with dependencies,such as the pipeline of Flink ML, in order
> to
> > > >>>>> utilize
> > > >>>>>> the
> > > >>>>>>>> intermediate calculation results we have to submit a job by
> > > >>>>>> env.execute().
> > > >>>>>>>>
> > > >>>>>>>> About the `cache()`  , I think is better to named `persist()`,
> > And
> > > >>>> The
> > > >>>>>>>> Flink framework determines whether we internally cache in
> memory
> > > or
> > > >>>>>> persist
> > > >>>>>>>> to the storage system,Maybe save the data into state backend
> > > >>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.)
> > > >>>>>>>>
> > > >>>>>>>> BTW, from the points of my view in the future, support for
> > > streaming
> > > >>>>> and
> > > >>>>>>>> batch mode switching in the same job will also benefit in
> > > >>>> "Interactive
> > > >>>>>>>> Programming",  I am looking forward to your JIRAs and FLIP!
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Jincheng
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月20日周二 下午9:56写道:
> > > >>>>>>>>
> > > >>>>>>>>> Hi all,
> > > >>>>>>>>>
> > > >>>>>>>>> As a few recent email threads have pointed out, it is a
> > promising
> > > >>>>>>>>> opportunity to enhance Flink Table API in various aspects,
> > > >>>> including
> > > >>>>>>>>> functionality and ease of use among others. One of the
> > scenarios
> > > >>>>> where
> > > >>>>>> we
> > > >>>>>>>>> feel Flink could improve is interactive programming. To
> explain
> > > the
> > > >>>>>>>> issues
> > > >>>>>>>>> and facilitate the discussion on the solution, we put
> together
> > > the
> > > >>>>>>>>> following document with our proposal.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> > > >>>>>>>>>
> > > >>>>>>>>> Feedback and comments are very welcome!
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks,
> > > >>>>>>>>>
> > > >>>>>>>>> Jiangjie (Becket) Qin
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > > >>
> > >
> > >
> >
>