Re: [DISCUSS] FLIP-27: Refactor Source Interface



> BTW, regarding the isBlock() method, I have a few more questions. 21, Is a method isReady() with boolean as a return value
> equivalent? Personally I found it is a little bit confusing in what is supposed to be returned when the future is completed. 22. if
> the implementation of isBlocked() is optional, how do the callers know whether the method is properly implemented or not?
> Does not implemented mean it always return a completed future?

`CompletableFuture<?> isBlocked()` is more or less an equivalent to `boolean hasNext()` which in case of “false” provides some kind of a listener/callback that notifies about presence of next element. There are some minor details, like `CompletableFuture<?>` has a minimal two state logic:

1. Future is completed - we have more data
2. Future not yet completed - we don’t have data now, but we might/we will have in the future

While `boolean hasNext()` and `notify()` callback are a bit more complicated/dispersed and can lead/encourage `notify()` spam.


> 3. If merge the `advance` and `getCurrent`  to one method like `getNext` the `getNext` would need return a
>`ElementWithTimestamp` because some sources want to add timestamp to every element. IMO, this is not so memory friendly
> so I prefer this design.

Guowei I don’t quite understand this. Could you elaborate why having a separate `advance()` help?


Regarding advance/poll/take. What’s the value of having two separate methods: poll and take? Which one of them should be called and which implemented? What’s the benefit of having those methods compared to having a one single method `getNextElement()` (or `pollElement() or whatever we name it) with following contract:

CompletableFuture<?> isBlocked();

Return next element - will be called only if `isBlocked()` is completed. Try to implement it in non blocking fashion, but if that’s impossible or you just don’t need the effort, you can block in this method.
T getNextElement();

I mean, if the connector is implemented non-blockingly, Flink should use it that way. If it’s not, then `poll()` will `throw new NotImplementedException()`. Implementing both of them and providing both of them to Flink wouldn’t make a sense, thus why not merge them into a single method call that should preferably (but not necessarily need to) be non-blocking? It’s not like we are implementing general purpose `Queue`, which users might want to call either of `poll` or `take`. We would always prefer to call `poll`, but if it’s blocking, then still we have no choice, but to call it and block on it.


> 1. I agree with Piotr and Becket that the non-blocking source is very
> important. But in addition to `Future/poll`, there may be another way to
> achieve this. I think it may be not very memory friendly if every advance
> call return a Future.

I didn’t want to mention this, to not clog my initial proposal, but there is a simple solution for the problem:

public interface SplitReader {

    CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);

     * Returns a future that will be completed when the page source becomes
     * unblocked.  If the page source is not blocked, this method should return
     * {@code NOT_BLOCKED}.
    default CompletableFuture<?> isBlocked()
        return NOT_BLOCKED;

If we are blocked and we are waiting for the IO, then creating a new Future is non-issue. Under full throttle/throughput and not blocked sources returning a static `NOT_BLOCKED` constant  should also solve the problem.

One more remark, non-blocking sources might be a necessity in a single threaded model without a checkpointing lock. (Currently when sources are blocked, they can release checkpointing lock and re-acquire it again later). Non-blocking `poll`/`getNext()` would allow for checkpoints to happen when source is idling. In that case either `notify()` or my proposed `isBlocked()` would allow to avoid busy-looping.


> On 5 Nov 2018, at 03:59, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> Hi Thomas,
> The iterator-like API was also the first thing that came to me. But it
> seems a little confusing that hasNext() does not mean "the stream has not
> ended", but means "the next record is ready", which is repurposing the well
> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
> additional isNextReady() method to indicate whether the next record is
> ready seems more intuitive to me.
> Similarly, in poll()/take() pattern, another method of isDone() is needed
> to indicate whether the stream has ended or not.
> Compared with hasNext()/next()/isNextReady() pattern,
> isDone()/poll()/take() seems more flexible for the reader implementation.
> When I am implementing a reader, I could have a couple of choices:
>   - A thread-less reader that does not have any internal thread.
>   - When poll() is called, the same calling thread will perform a bunch of
>      IO asynchronously.
>      - When take() is called, the same calling thread will perform a bunch
>      of IO and wait until the record is ready.
>   - A reader with internal threads performing network IO and put records
>   into a buffer.
>      - When poll() is called, the calling thread simply reads from the
>      buffer and return empty result immediately if there is no record.
>      - When take() is called, the calling thread reads from the buffer and
>      block waiting if the buffer is empty.
> On the other hand, with the hasNext()/next()/isNextReady() API, it is less
> intuitive for the reader developers to write the thread-less pattern.
> Although technically speaking one can still do the asynchronous IO to
> prepare the record in isNextReady(). But it is inexplicit and seems
> somewhat hacky.
> Thanks,
> Jiangjie (Becket) Qin
> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <thw@xxxxxxxxxx> wrote:
>> Couple more points regarding discovery:
>> The proposal mentions that discovery could be outside the execution graph.
>> Today, discovered partitions/shards are checkpointed. I believe that will
>> also need to be the case in the future, even when discovery and reading are
>> split between different tasks.
>> For cases such as resharding of a Kinesis stream, the relationship between
>> splits needs to be considered. Splits cannot be randomly distributed over
>> readers in certain situations. An example was mentioned here:
>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>> Thomas
>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <thw@xxxxxxxxxx> wrote:
>>> Thanks for getting the ball rolling on this!
>>> Can the number of splits decrease? Yes, splits can be closed and go away.
>>> An example would be a shard merge in Kinesis (2 existing shards will be
>>> closed and replaced with a new shard).
>>> Regarding advance/poll/take: IMO the least restrictive approach would be
>>> the thread-less IO model (pull based, non-blocking, caller retrieves new
>>> records when available). The current Kinesis API requires the use of
>>> threads. But that can be internal to the split reader and does not need
>> to
>>> be a source API concern. In fact, that's what we are working on right now
>>> as improvement to the existing consumer: Each shard consumer thread will
>>> push to a queue, the consumer main thread will poll the queue(s). It is
>>> essentially a mapping from threaded IO to non-blocking.
>>> The proposed SplitReader interface would fit the thread-less IO model.
>>> Similar to an iterator, we find out if there is a new element (hasNext)
>> and
>>> if so, move to it (next()). Separate calls deliver the meta information
>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>> option,
>>> so that the caller does not end up in a busy wait. On the other hand, a
>>> caller processing multiple splits may want to cycle through fast, to
>>> process elements of other splits as soon as they become available. The
>> nice
>>> thing is that this "split merge" logic can now live in Flink and be
>>> optimized and shared between different sources.
>>> Thanks,
>>> Thomas
>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei.mgw@xxxxxxxxx> wrote:
>>>> Hi,
>>>> Thanks Aljoscha for this FLIP.
>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>> important. But in addition to `Future/poll`, there may be another way to
>>>> achieve this. I think it may be not very memory friendly if every
>> advance
>>>> call return a Future.
>>>> public interface Listener {
>>>>     public void notify();
>>>> }
>>>> public interface SplitReader() {
>>>>     /**
>>>>      * When there is no element temporarily, this will return false.
>>>>      * When elements is available again splitReader can call
>>>> listener.notify()
>>>>      * In addition the frame would check `advance` periodically .
>>>>      * Of course advance can always return true and ignore the listener
>>>> argument for simplicity.
>>>>      */
>>>>     public boolean advance(Listener listener);
>>>> }
>>>> 2.  The FLIP tells us very clearly that how to create all Splits and how
>>>> to create a SplitReader from a Split. But there is no strategy for the
>> user
>>>> to choose how to assign the splits to the tasks. I think we could add a
>>>> Enum to let user to choose.
>>>> /**
>>>>  public Enum SplitsAssignmentPolicy {
>>>>    Location,
>>>>    Workload,
>>>>    Random,
>>>>    Average
>>>>  }
>>>> */
>>>> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
>>>> the `getNext` would need return a `ElementWithTimestamp` because some
>>>> sources want to add timestamp to every element. IMO, this is not so
>> memory
>>>> friendly so I prefer this design.
>>>> Thanks
>>>> Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> 于2018年11月1日周四 下午6:08写道:
>>>>> Hi,
>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>>>> possible improvements. I have one proposal. Instead of having a method:
>>>>> boolean advance() throws IOException;
>>>>> I would replace it with
>>>>> /*
>>>>> * Return a future, which when completed means that source has more
>> data
>>>>> and getNext() will not block.
>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>> implement this method appropriately.
>>>>> */
>>>>> default CompletableFuture<?> isBlocked() {
>>>>>        return CompletableFuture.completedFuture(null);
>>>>> }
>>>>> And rename `getCurrent()` to `getNext()`.
>>>>> Couple of arguments:
>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>> `getCurrent()`. What should be done in which, especially for connectors
>>>>> that handle records in batches (like Kafka) and when should you call
>>>>> `advance` and when `getCurrent()`.
>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>>>>> future to have asynchronous/non blocking connectors and more
>> efficiently
>>>>> handle large number of blocked threads, without busy waiting. While at
>> the
>>>>> same time it doesn’t add much complexity, since naive connector
>>>>> implementations can be always blocking.
>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>> executors, instead of one thread per task.
>>>>> Piotrek
>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljoscha@xxxxxxxxxx>
>>>>> wrote:
>>>>>> Hi All,
>>>>>> In order to finally get the ball rolling on the new source interface
>>>>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>>>>> adding per-partition watermark support to the Kinesis source and
>> because
>>>>> this would enable generic implementation of event-time alignment for
>> all
>>>>> sources. Maybe we need another FLIP for the event-time alignment part,
>>>>> especially the part about information sharing between operations (I'm
>> not
>>>>> calling it state sharing because state has a special meaning in Flink).
>>>>>> Please discuss away!
>>>>>> Aljoscha