Re: [DISCUSS] FLIP-27: Refactor Source Interface

Hi Becket,

With my proposal, both of your examples would have to be solved by the connector and solution to both problems would be the same:

Pretend that connector is never blocked (`isBlocked() { return NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or semi blocking with return of control from time to time to allow for checkpointing, network flushing and other resource management things to happen in the same main thread). In other words, exactly how you would implement `take()` method or how the same source connector would be implemented NOW with current source interface. The difference with current interface would be only that main loop would be outside of the connector, and instead of periodically releasing checkpointing lock, periodically `return null;` or `return Optional.empty();` from `getNextElement()`.

In case of thread-less readers with only non-blocking `queue.poll()` (is that really a thing? I can not think about a real implementation that enforces such constraints), we could provide a wrapper that hides the busy looping. The same applies how to solve forever blocking readers - we could provider another wrapper running the connector in separate thread.  

But I don’t see a reason why we should expose both blocking `take()` and non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or connector) would have to do the same busy looping anyway and I think it would be better to have a simpler connector API (that would solve our problems) and force connectors to comply one way or another. 


> Hi Piotr,
> I might have misunderstood you proposal. But let me try to explain my
> concern. I am thinking about the following case:
> 1. a reader has the following two interfaces,
>    boolean isBlocked()
>    T getNextElement()
> 2. the implementation of getNextElement() is non-blocking.
> 3. The reader is thread-less, i.e. it does not have any internal thread.
> For example, it might just delegate the getNextElement() to a queue.poll(),
> and isBlocked() is just queue.isEmpty().
> How can Flink efficiently implement a blocking reading behavior with this
> reader? Either a tight loop or a backoff interval is needed. Neither of
> them is ideal.
> Now let's say in the reader mentioned above implements a blocking
> getNextElement() method. Because there is no internal thread in the reader,
> after isBlocked() returns false. Flink will still have to loop on
> isBlocked() to check whether the next record is available. If the next
> record reaches after 10 min, it is a tight loop for 10 min. You have
> probably noticed that in this case, even isBlocked() returns a future, that
> future() will not be completed if Flink does not call some method from the
> reader, because the reader has no internal thread to complete that future
> by itself.
> Due to the above reasons, a blocking take() API would allow Flink to have
> an efficient way to read from a reader. There are many ways to wake up the
> blocking thread when checkpointing is needed depending on the
> implementation. But I think the poll()/take() API would also work in that
> case.
> Thanks,
>> Hi,
>> a)
>>> BTW, regarding the isBlock() method, I have a few more questions. 21, Is
>> a method isReady() with boolean as a return value
>>> equivalent? Personally I found it is a little bit confusing in what is
>> supposed to be returned when the future is completed. 22. if
>>> the implementation of isBlocked() is optional, how do the callers know
>> whether the method is properly implemented or not?
>>> Does not implemented mean it always return a completed future?
>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
>> `boolean hasNext()` which in case of “false” provides some kind of a
>> listener/callback that notifies about presence of next element. There are
>> some minor details, like `CompletableFuture<?>` has a minimal two state
>> logic:
>> 1. Future is completed - we have more data
>> 2. Future not yet completed - we don’t have data now, but we might/we will
>> have in the future
>> While `boolean hasNext()` and `notify()` callback are a bit more
>> complicated/dispersed and can lead/encourage `notify()` spam.
>> b)
>>> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
>> the `getNext` would need return a
>>> `ElementWithTimestamp` because some sources want to add timestamp to
>> every element. IMO, this is not so memory friendly
>>> so I prefer this design.
>> Guowei I don’t quite understand this. Could you elaborate why having a
>> separate `advance()` help?
>> c)
>> Regarding advance/poll/take. What’s the value of having two separate
>> methods: poll and take? Which one of them should be called and which
>> implemented? What’s the benefit of having those methods compared to having
>> a one single method `getNextElement()` (or `pollElement() or whatever we
>> name it) with following contract:
>> CompletableFuture<?> isBlocked();
>> /**
>> Return next element - will be called only if `isBlocked()` is completed.
>> Try to implement it in non blocking fashion, but if that’s impossible or
>> you just don’t need the effort, you can block in this method.
>> */
>> T getNextElement();
>> I mean, if the connector is implemented non-blockingly, Flink should use
>> it that way. If it’s not, then `poll()` will `throw new
>> NotImplementedException()`. Implementing both of them and providing both of
>> them to Flink wouldn’t make a sense, thus why not merge them into a single
>> method call that should preferably (but not necessarily need to) be
>> non-blocking? It’s not like we are implementing general purpose `Queue`,
>> which users might want to call either of `poll` or `take`. We would always
>> prefer to call `poll`, but if it’s blocking, then still we have no choice,
>> but to call it and block on it.
>> d)
>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>> important. But in addition to `Future/poll`, there may be another way to
>>> achieve this. I think it may be not very memory friendly if every advance
>>> call return a Future.
>> I didn’t want to mention this, to not clog my initial proposal, but there
>> is a simple solution for the problem:
>> public interface SplitReader {
>>    (…)
>>    CompletableFuture<?> NOT_BLOCKED =
>> CompletableFuture.completedFuture(null);
>>    /**
>>     * Returns a future that will be completed when the page source becomes
>>     * unblocked.  If the page source is not blocked, this method should
>> return
>>     * {@code NOT_BLOCKED}.
>>     */
>>    default CompletableFuture<?> isBlocked()
>>    {
>>        return NOT_BLOCKED;
>>    }
>> If we are blocked and we are waiting for the IO, then creating a new
>> Future is non-issue. Under full throttle/throughput and not blocked sources
>> returning a static `NOT_BLOCKED` constant  should also solve the problem.
>> One more remark, non-blocking sources might be a necessity in a single
>> threaded model without a checkpointing lock. (Currently when sources are
>> blocked, they can release checkpointing lock and re-acquire it again
>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
>> happen when source is idling. In that case either `notify()` or my proposed
>> `isBlocked()` would allow to avoid busy-looping.
>> Piotrek
>>> Hi Thomas,
>>> The iterator-like API was also the first thing that came to me. But it
>>> seems a little confusing that hasNext() does not mean "the stream has not
>>> ended", but means "the next record is ready", which is repurposing the
>> well
>>> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
>>> additional isNextReady() method to indicate whether the next record is
>>> ready seems more intuitive to me.
>>> Similarly, in poll()/take() pattern, another method of isDone() is needed
>>> to indicate whether the stream has ended or not.
>>> Compared with hasNext()/next()/isNextReady() pattern,
>>> isDone()/poll()/take() seems more flexible for the reader implementation.
>>> When I am implementing a reader, I could have a couple of choices:
>>>  - A thread-less reader that does not have any internal thread.
>>>  - When poll() is called, the same calling thread will perform a bunch
>> of
>>>     IO asynchronously.
>>>     - When take() is called, the same calling thread will perform a
>> bunch
>>>     of IO and wait until the record is ready.
>>>  - A reader with internal threads performing network IO and put records
>>>  into a buffer.
>>>     - When poll() is called, the calling thread simply reads from the
>>>     buffer and return empty result immediately if there is no record.
>>>     - When take() is called, the calling thread reads from the buffer
>> and
>>>     block waiting if the buffer is empty.
>>> On the other hand, with the hasNext()/next()/isNextReady() API, it is
>> less
>>> intuitive for the reader developers to write the thread-less pattern.
>>> Although technically speaking one can still do the asynchronous IO to
>>> prepare the record in isNextReady(). But it is inexplicit and seems
>>> somewhat hacky.
>>> Thanks,
>>> Jiangjie (Becket) Qin
>>>> Couple more points regarding discovery:
>>>> The proposal mentions that discovery could be outside the execution
>> graph.
>>>> Today, discovered partitions/shards are checkpointed. I believe that
>> will
>>>> also need to be the case in the future, even when discovery and reading
>> are
>>>> split between different tasks.
>>>> For cases such as resharding of a Kinesis stream, the relationship
>> between
>>>> splits needs to be considered. Splits cannot be randomly distributed
>> over
>>>> readers in certain situations. An example was mentioned here:
>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>> Thomas
>>>>> Thanks for getting the ball rolling on this!
>>>>> Can the number of splits decrease? Yes, splits can be closed and go
>> away.
>>>>> An example would be a shard merge in Kinesis (2 existing shards will be
>>>>> closed and replaced with a new shard).
>>>>> Regarding advance/poll/take: IMO the least restrictive approach would
>> be
>>>>> the thread-less IO model (pull based, non-blocking, caller retrieves
>> new
>>>>> records when available). The current Kinesis API requires the use of
>>>>> threads. But that can be internal to the split reader and does not need
>>>> to
>>>>> be a source API concern. In fact, that's what we are working on right
>> now
>>>>> as improvement to the existing consumer: Each shard consumer thread
>> will
>>>>> push to a queue, the consumer main thread will poll the queue(s). It is
>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>> The proposed SplitReader interface would fit the thread-less IO model.
>>>>> Similar to an iterator, we find out if there is a new element (hasNext)
>>>> and
>>>>> if so, move to it (next()). Separate calls deliver the meta information
>>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>>>> option,
>>>>> so that the caller does not end up in a busy wait. On the other hand, a
>>>>> caller processing multiple splits may want to cycle through fast, to
>>>>> process elements of other splits as soon as they become available. The
>>>> nice
>>>>> thing is that this "split merge" logic can now live in Flink and be
>>>>> optimized and shared between different sources.
>>>>> Thanks,
>>>>> Thomas
>>>>>> Hi,
>>>>>> Thanks Aljoscha for this FLIP.
>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>>>> important. But in addition to `Future/poll`, there may be another way
>> to
>>>>>> achieve this. I think it may be not very memory friendly if every
>>>> advance
>>>>>> call return a Future.
>>>>>> public interface Listener {
>>>>>>    public void notify();
>>>>>> }
>>>>>> public interface SplitReader() {
>>>>>>    /**
>>>>>>     * When there is no element temporarily, this will return false.
>>>>>>     * When elements is available again splitReader can call
>>>>>> listener.notify()
>>>>>>     * In addition the frame would check `advance` periodically .
>>>>>>     * Of course advance can always return true and ignore the
>> listener
>>>>>> argument for simplicity.
>>>>>>     */
>>>>>>    public boolean advance(Listener listener);
>>>>>> }
>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
>> how
>>>>>> to create a SplitReader from a Split. But there is no strategy for the
>>>> user
>>>>>> to choose how to assign the splits to the tasks. I think we could add
>> a
>>>>>> Enum to let user to choose.
>>>>>> /**
>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>   Location,
>>>>>>   Workload,
>>>>>>   Random,
>>>>>>   Average
>>>>>> }
>>>>>> */
>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>> `getNext`
>>>>>> the `getNext` would need return a `ElementWithTimestamp` because some
>>>>>> sources want to add timestamp to every element. IMO, this is not so
>>>> memory
>>>>>> friendly so I prefer this design.
>>>>>> Thanks
>>>>>>> Hi,
>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>>>>>> possible improvements. I have one proposal. Instead of having a
>> method:
>>>>>>> boolean advance() throws IOException;
>>>>>>> I would replace it with
>>>>>>> /*
>>>>>>> * Return a future, which when completed means that source has more
>>>> data
>>>>>>> and getNext() will not block.
>>>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>>>> implement this method appropriately.
>>>>>>> */
>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>       return CompletableFuture.completedFuture(null);
>>>>>>> }
>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>> Couple of arguments:
>>>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>>>> `getCurrent()`. What should be done in which, especially for
>> connectors
>>>>>>> that handle records in batches (like Kafka) and when should you call
>>>>>>> `advance` and when `getCurrent()`.
>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
>> the
>>>>>>> future to have asynchronous/non blocking connectors and more
>>>> efficiently
>>>>>>> handle large number of blocked threads, without busy waiting. While
>> at
>>>> the
>>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>>> implementations can be always blocking.
>>>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>>>> executors, instead of one thread per task.
>>>>>>> Piotrek
>>>>>>> wrote:
>>>>>>>> Hi All,
>>>>>>>> In order to finally get the ball rolling on the new source interface
>>>>>>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
>> about
>>>>>>> adding per-partition watermark support to the Kinesis source and
>>>> because
>>>>>>> this would enable generic implementation of event-time alignment for
>>>> all
>>>>>>> sources. Maybe we need another FLIP for the event-time alignment
>> part,
>>>>>>> especially the part about information sharing between operations (I'm
>>>> not
>>>>>>> calling it state sharing because state has a special meaning in
>> Flink).
>>>>>>>> Please discuss away!
>>>>>>>> Aljoscha