osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] FLIP-27: Refactor Source Interface


Regarding the naming style.

The advantage of `poll()` style is that basically the name of `poll` means
it should be a non-blocking operator, same with `Queue` in Java API. It's
easy to understand. We don't need to write too much in docs to imply the
implementation should not do something heavy.
However `poll` also means it should return the thing we want. In our
scenario, there are 3 types currently, record, timestamp and watermark. So
the return type of `poll` should be tuple3 or something like that. It looks
a little hacky IMO.

The `advance()` style is more like RecordReader
<https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/RecordReader.html>
of
MapReduce, or ISpout
<https://storm.apache.org/releases/1.1.2/javadocs/org/apache/storm/spout/ISpout.html>
of
Storm. It means moving the offset forward indeed. It makes sense to me.
To be honest I like `advance()` style more.

And there is also another small point I can't get.

Why use `start()` and `close()` in `SplitReader`? `start()` makes me think
of "starting a thread" or something like that. We should not assume there
would be some thread. I prefer `open()`, it also matches the `close()`
better.


Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月6日周二 上午11:04写道:

> Thanks for updating the wiki, Aljoscha.
>
> The isDone()/advance()/getCurrent() API looks more similar to
> hasNext()/isNextReady()/getNext(), but implying some different behaviors.
>
> If users call getCurrent() twice without calling advance() in between, will
> they get the same record back? From the API itself, users might think
> advance() is the API that moves the offset forward, and getCurrent() just
> return the record at the current offset.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 5, 2018 at 10:41 PM Aljoscha Krettek <aljoscha@xxxxxxxxxx>
> wrote:
>
> > I updated the FLIP [1] with some Javadoc for the SplitReader to outline
> > what I had in mind with the interface. Sorry for not doing that earlier,
> > it's not quite clear how the methods should work from the name alone.
> >
> > The gist of it is that advance() should be non-blocking, so
> > isDone/advance()/getCurrent() are very similar to isDone()/poll()/take()
> > that I have seen mentioned.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > >
> >
> > > On 5. Nov 2018, at 11:05, Biao Liu <mmyy1110@xxxxxxxxx> wrote:
> > >
> > > Thanks Aljoscha for bringing us this discussion!
> > >
> > > 1. I think one of the reason about separating `advance()` and
> > > `getCurrent()` is that we have several different types returned by
> > source.
> > > Not just the `record`, but also the timestamp of record and the
> > watermark.
> > > If we don't separate these into different methods, the source has to
> > return
> > > a tuple3 which is not so user friendly. The prototype of Aljoscha is
> > > acceptable to me. Regarding the specific method name, I'm not sure
> which
> > > one is better. Both of them are reasonable for me.
> > >
> > > 2. As Thomas and Becket mentioned before, I think a non-blocking API is
> > > necessary. Moreover, IMO we should not offer a blocking API. It doesn't
> > > help but makes things more complicated.
> > >
> > > 3. About the thread model.
> > > I agree with Thomas about the thread-less IO model. A standard workflow
> > > should look like below.
> > >  - If there is available data, Flink would read it.
> > >  - If there is no data available temporary, Flink would check again a
> > > moment later. Maybe waiting on a semaphore until a timer wake it up.
> > > Furthermore, we can offer an optional optimization for source which has
> > > external thread. Like Guowei mentioned, there can be a listener which
> the
> > > reader can wake the framework up as soon as new data comes. This can
> > solve
> > > Piotr's concern about efficiency.
> > >
> > > 4. One more thing. After taking a look at the prototype codes. Off the
> > top
> > > of my head, the implementation is more fit for batch job not streaming
> > job.
> > > There are two types of tasks in prototype. First is a source task that
> > > discovers the splits. The source passes the splits to the second task
> > which
> > > process the splits one by one. And then the source keeps watch to
> > discover
> > > more splits.
> > >
> > > However, I think the more common scenario of streaming job is:
> > > there are fixed splits, each of the subtasks takes several splits. The
> > > subtasks just keep processing the fixed splits. There would be
> continuous
> > > datum in each split. We don't need a source task to discover more
> splits.
> > > It can not be finished in streaming job since we don't want the
> > processing
> > > task finished even there are no more splits.
> > >
> > > So IMO we should offer another source operator for the new interface.
> It
> > > would discover all splits when it is opening. Then picks the splits
> > belong
> > > to this subtask. Keep processing these splits until all of them are
> > > finished.
> > >
> > >
> > > Becket Qin <becket.qin@xxxxxxxxx> 于2018年11月5日周一 上午11:00写道:
> > >
> > >> Hi Thomas,
> > >>
> > >> The iterator-like API was also the first thing that came to me. But it
> > >> seems a little confusing that hasNext() does not mean "the stream has
> > not
> > >> ended", but means "the next record is ready", which is repurposing the
> > well
> > >> known meaning of hasNext(). If we follow the hasNext()/next() pattern,
> > an
> > >> additional isNextReady() method to indicate whether the next record is
> > >> ready seems more intuitive to me.
> > >>
> > >> Similarly, in poll()/take() pattern, another method of isDone() is
> > needed
> > >> to indicate whether the stream has ended or not.
> > >>
> > >> Compared with hasNext()/next()/isNextReady() pattern,
> > >> isDone()/poll()/take() seems more flexible for the reader
> > implementation.
> > >> When I am implementing a reader, I could have a couple of choices:
> > >>
> > >>   - A thread-less reader that does not have any internal thread.
> > >>   - When poll() is called, the same calling thread will perform a
> bunch
> > of
> > >>      IO asynchronously.
> > >>      - When take() is called, the same calling thread will perform a
> > bunch
> > >>      of IO and wait until the record is ready.
> > >>   - A reader with internal threads performing network IO and put
> records
> > >>   into a buffer.
> > >>      - When poll() is called, the calling thread simply reads from the
> > >>      buffer and return empty result immediately if there is no record.
> > >>      - When take() is called, the calling thread reads from the buffer
> > and
> > >>      block waiting if the buffer is empty.
> > >>
> > >> On the other hand, with the hasNext()/next()/isNextReady() API, it is
> > less
> > >> intuitive for the reader developers to write the thread-less pattern.
> > >> Although technically speaking one can still do the asynchronous IO to
> > >> prepare the record in isNextReady(). But it is inexplicit and seems
> > >> somewhat hacky.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <thw@xxxxxxxxxx> wrote:
> > >>
> > >>> Couple more points regarding discovery:
> > >>>
> > >>> The proposal mentions that discovery could be outside the execution
> > >> graph.
> > >>> Today, discovered partitions/shards are checkpointed. I believe that
> > will
> > >>> also need to be the case in the future, even when discovery and
> reading
> > >> are
> > >>> split between different tasks.
> > >>>
> > >>> For cases such as resharding of a Kinesis stream, the relationship
> > >> between
> > >>> splits needs to be considered. Splits cannot be randomly distributed
> > over
> > >>> readers in certain situations. An example was mentioned here:
> > >>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> > >>>
> > >>> Thomas
> > >>>
> > >>>
> > >>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <thw@xxxxxxxxxx> wrote:
> > >>>
> > >>>> Thanks for getting the ball rolling on this!
> > >>>>
> > >>>> Can the number of splits decrease? Yes, splits can be closed and go
> > >> away.
> > >>>> An example would be a shard merge in Kinesis (2 existing shards will
> > be
> > >>>> closed and replaced with a new shard).
> > >>>>
> > >>>> Regarding advance/poll/take: IMO the least restrictive approach
> would
> > >> be
> > >>>> the thread-less IO model (pull based, non-blocking, caller retrieves
> > >> new
> > >>>> records when available). The current Kinesis API requires the use of
> > >>>> threads. But that can be internal to the split reader and does not
> > need
> > >>> to
> > >>>> be a source API concern. In fact, that's what we are working on
> right
> > >> now
> > >>>> as improvement to the existing consumer: Each shard consumer thread
> > >> will
> > >>>> push to a queue, the consumer main thread will poll the queue(s). It
> > is
> > >>>> essentially a mapping from threaded IO to non-blocking.
> > >>>>
> > >>>> The proposed SplitReader interface would fit the thread-less IO
> model.
> > >>>> Similar to an iterator, we find out if there is a new element
> > (hasNext)
> > >>> and
> > >>>> if so, move to it (next()). Separate calls deliver the meta
> > information
> > >>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> > >>> option,
> > >>>> so that the caller does not end up in a busy wait. On the other
> hand,
> > a
> > >>>> caller processing multiple splits may want to cycle through fast, to
> > >>>> process elements of other splits as soon as they become available.
> The
> > >>> nice
> > >>>> thing is that this "split merge" logic can now live in Flink and be
> > >>>> optimized and shared between different sources.
> > >>>>
> > >>>> Thanks,
> > >>>> Thomas
> > >>>>
> > >>>>
> > >>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei.mgw@xxxxxxxxx>
> > wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>> Thanks Aljoscha for this FLIP.
> > >>>>>
> > >>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> very
> > >>>>> important. But in addition to `Future/poll`, there may be another
> way
> > >> to
> > >>>>> achieve this. I think it may be not very memory friendly if every
> > >>> advance
> > >>>>> call return a Future.
> > >>>>>
> > >>>>> public interface Listener {
> > >>>>>     public void notify();
> > >>>>> }
> > >>>>>
> > >>>>> public interface SplitReader() {
> > >>>>>     /**
> > >>>>>      * When there is no element temporarily, this will return
> false.
> > >>>>>      * When elements is available again splitReader can call
> > >>>>> listener.notify()
> > >>>>>      * In addition the frame would check `advance` periodically .
> > >>>>>      * Of course advance can always return true and ignore the
> > >> listener
> > >>>>> argument for simplicity.
> > >>>>>      */
> > >>>>>     public boolean advance(Listener listener);
> > >>>>> }
> > >>>>>
> > >>>>> 2.  The FLIP tells us very clearly that how to create all Splits
> and
> > >> how
> > >>>>> to create a SplitReader from a Split. But there is no strategy for
> > the
> > >>> user
> > >>>>> to choose how to assign the splits to the tasks. I think we could
> add
> > >> a
> > >>>>> Enum to let user to choose.
> > >>>>> /**
> > >>>>>  public Enum SplitsAssignmentPolicy {
> > >>>>>    Location,
> > >>>>>    Workload,
> > >>>>>    Random,
> > >>>>>    Average
> > >>>>>  }
> > >>>>> */
> > >>>>>
> > >>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> > >> `getNext`
> > >>>>> the `getNext` would need return a `ElementWithTimestamp` because
> some
> > >>>>> sources want to add timestamp to every element. IMO, this is not so
> > >>> memory
> > >>>>> friendly so I prefer this design.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Thanks
> > >>>>>
> > >>>>> Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> 于2018年11月1日周四 下午6:08写道:
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> > other
> > >>>>>> possible improvements. I have one proposal. Instead of having a
> > >> method:
> > >>>>>>
> > >>>>>> boolean advance() throws IOException;
> > >>>>>>
> > >>>>>> I would replace it with
> > >>>>>>
> > >>>>>> /*
> > >>>>>> * Return a future, which when completed means that source has more
> > >>> data
> > >>>>>> and getNext() will not block.
> > >>>>>> * If you wish to use benefits of non blocking connectors, please
> > >>>>>> implement this method appropriately.
> > >>>>>> */
> > >>>>>> default CompletableFuture<?> isBlocked() {
> > >>>>>>        return CompletableFuture.completedFuture(null);
> > >>>>>> }
> > >>>>>>
> > >>>>>> And rename `getCurrent()` to `getNext()`.
> > >>>>>>
> > >>>>>> Couple of arguments:
> > >>>>>> 1. I don’t understand the division of work between `advance()` and
> > >>>>>> `getCurrent()`. What should be done in which, especially for
> > >> connectors
> > >>>>>> that handle records in batches (like Kafka) and when should you
> call
> > >>>>>> `advance` and when `getCurrent()`.
> > >>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us
> in
> > >> the
> > >>>>>> future to have asynchronous/non blocking connectors and more
> > >>> efficiently
> > >>>>>> handle large number of blocked threads, without busy waiting.
> While
> > >> at
> > >>> the
> > >>>>>> same time it doesn’t add much complexity, since naive connector
> > >>>>>> implementations can be always blocking.
> > >>>>>> 3. This also would allow us to use a fixed size thread pool of
> task
> > >>>>>> executors, instead of one thread per task.
> > >>>>>>
> > >>>>>> Piotrek
> > >>>>>>
> > >>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljoscha@xxxxxxxxxx>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi All,
> > >>>>>>>
> > >>>>>>> In order to finally get the ball rolling on the new source
> > >> interface
> > >>>>>> that we have discussed for so long I finally created a FLIP:
> > >>>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >>>>>>>
> > >>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> > >> about
> > >>>>>> adding per-partition watermark support to the Kinesis source and
> > >>> because
> > >>>>>> this would enable generic implementation of event-time alignment
> for
> > >>> all
> > >>>>>> sources. Maybe we need another FLIP for the event-time alignment
> > >> part,
> > >>>>>> especially the part about information sharing between operations
> > (I'm
> > >>> not
> > >>>>>> calling it state sharing because state has a special meaning in
> > >> Flink).
> > >>>>>>>
> > >>>>>>> Please discuss away!
> > >>>>>>>
> > >>>>>>> Aljoscha
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>
> > >>
> >
> >
>