osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] FLIP-27: Refactor Source Interface


Hi,

One more thing. I think the Kafka client would be a good example of a connector that could use of this `isBlocked()`/callbacks single threaded API from the “Pattern 2”

If we have N threads per N splits, there would be no need for the (N+1)th thread. It could be implemented as a non blocking queue, that notifies the callback/completes the blocked future whenever the queue becomes non empty. The same thread that handles checkpoints, network flushes, resource management could handle reading from this queue.

Piotrek

> On 15 Nov 2018, at 17:13, Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> wrote:
> 
> Hi
> 
> Re: Becket
> 
>> WRT the confusion between advance() / getCurrent(), do you think it would
>> help if we combine them and have something like:
>> 
>> CompletableFuture<T> getNext();
>> long getWatermark();
>> long getCurrentTimestamp();
> 
> I think that technically this would work the same as `CompletableFuture<?> isBlocked()`, `CompletableFuture<?> advance()` or callbac/`notify()` options. I see two differences:
> 1. in this case once connector unblocks itself and completes the future, Flink’s engine would be responsible for holding the record somewhere, while during this time Flink’s engine can be busy doing other things. Maybe that’s not a big issue, but will slightly complicate the execution engine.
> 2. This might cause some performance overhead, since every record will have to go through the future. As I wrote somewhere before, both `advance()` and `isBlocked()` during full throughput could return static/const NOT_BLOCKED instance, which should/could behave better.
> 
> Nevertheless maybe the choice between those options is secondary one and could be done somewhere else/later or during comparison of some POCs?
> 
> Re: Aljoscha
> 
>> I think it should be as easy as adding a minimumTimestamp()/maximumTimestamp() method pair to the split interface.
> 
> I think that `minimumTimestamp()/maximumTimestamp()` extension seems reasonable if we want Flink to be aware of that. Since watermark handling/emitting would be a custom logic anyway, maybe `minimum` and `maximum` timestamps of a split could be handled as a private fields of the specific connector implementation? I mean, the current proposal with `getCurrentTimestamp()` method indicates that this logic will be hidden from the Flink’s engine anyway, so there might be no need to expose them via API?
> 
>> I see there has been some good discussion but I don't know if we have consensus.
> 
> I think we are converging to a point that having some kind of additional notification that the connector is not blocked anymore would be more flexible for us.
> 
> From the perspective of the execution engine, I would be in favour of testing out our ideas and maybe benchmarking them to make sure that we are not omitting something.
> 
> Piotrek
> 
>> On 15 Nov 2018, at 12:43, Aljoscha Krettek <aljoscha@xxxxxxxxxx> wrote:
>> 
>> Hi,
>> 
>> I thought I had sent this mail a while ago but I must have forgotten to send it.
>> 
>> There is another thing we should consider for splits: the range of timestamps that it can contain. For example, the splits of a file source would know what the minimum and maximum timestamp in the splits is, roughly. For infinite splits, such as Kafka partitions, the minimum would be meaningful but the maximum would be +Inf. If the splits expose the interval of time that they contain the readers, or the component that manages the readers can make decisions about which splits to forward and read first. And it can also influence the minimum watermark that a reader forwards: it should never emit a watermark if it knows there are splits to read that have a lower minimum timestamp. I think it should be as easy as adding a minimumTimestamp()/maximumTimestamp() method pair to the split interface.
>> 
>> Another thing we need to resolve is the actual reader interface. I see there has been some good discussion but I don't know if we have consensus. We should try and see how specific sources could be implemented with the new interface. For example, for Kafka I think we need to have N+1 threads per task (where N is the number of splits that a task is reading from). On thread is responsible for reading from the splits. And each split has its own (internal) thread for reading from Kafka and putting messages in an internal queue to pull from. This is similar to how the current Kafka source is implemented, which has a separate fetcher thread. The reason for this split is that we always need to try reading from Kafka to keep the throughput up. In the current implementation the internal queue (or handover) limits the read rate of the reader threads.
>> 
>> @Thomas, what do you think this would look like for Kinesis?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 15. Nov 2018, at 03:56, Becket Qin <becket.qin@xxxxxxxxx> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks a lot for the detailed reply. All makes sense to me.
>>> 
>>> WRT the confusion between advance() / getCurrent(), do you think it would
>>> help if we combine them and have something like:
>>> 
>>> CompletableFuture<T> getNext();
>>> long getWatermark();
>>> long getCurrentTimestamp();
>>> 
>>> Cheers,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Thanks again for the detailed answer :) Sorry for responding with a delay.
>>>> 
>>>>> Completely agree that in pattern 2, having a callback is necessary for
>>>> that
>>>>> single thread outside of the connectors. And the connectors MUST have
>>>>> internal threads.
>>>> 
>>>> Yes, this thread will have to exists somewhere. In pattern 2 it exists in
>>>> the connector (at least from the perspective of the Flink execution
>>>> engine). In pattern 1 it exists inside the Flink execution engine. With
>>>> completely blocking connectors, like simple reading from files, both of
>>>> those approaches are basically the same. The difference is when user
>>>> implementing Flink source is already working with a non blocking code with
>>>> some internal threads. In this case, pattern 1 would result in "double
>>>> thread wrapping”, while pattern 2 would allow to skip one layer of
>>>> indirection.
>>>> 
>>>>> If we go that way, we should have something like "void
>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>> CompletableFuture work here, though. If 10 readers returns 10 completable
>>>>> futures, will there be 10 additional threads (so 20 threads in total)
>>>>> blocking waiting on them? Or will there be a single thread busy loop
>>>>> checking around?
>>>> 
>>>> To be honest, I haven’t thought this completely through and I haven’t
>>>> tested/POC’ed it. Having said that, I can think of at least couple of
>>>> solutions. First is something like this:
>>>> 
>>>> 
>>>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>> <
>>>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>>> 
>>>> 
>>>> Line:
>>>> 
>>>>                              `blocked = split.process();`
>>>> 
>>>> Is where the execution goes into to the task/sources. This is where the
>>>> returned future is handled:
>>>> 
>>>>                              blocked.addListener(() -> {
>>>>                                  blockedSplits.remove(split);
>>>>                                  // reset the level priority to prevent
>>>> previously-blocked splits from starving existing splits
>>>>                                  split.resetLevelPriority();
>>>>                                  waitingSplits.offer(split);
>>>>                              }, executor);
>>>> 
>>>> Fundamentally callbacks and Futures are more or less interchangeable You
>>>> can always wrap one into another (creating a callback that completes a
>>>> future and attach a callback once future completes). In this case the
>>>> difference for me is mostly:
>>>> - api with passing callback allows the callback to be fired multiple times
>>>> and to fire it even if the connector is not blocked. This is what I meant
>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit simpler.
>>>> Connector can only return either “I’m not blocked” or “I’m blocked and I
>>>> will tell you only once when I’m not blocked anymore”.
>>>> 
>>>> But this is not the most important thing for me here. For me important
>>>> thing is to try our best to make Flink task’s control and execution single
>>>> threaded. For that both callback and future APIs should work the same.
>>>> 
>>>>> WRT pattern 1, a single blocking take() API should just work. The good
>>>>> thing is that a blocking read API is usually simpler to implement.
>>>> 
>>>> Yes, they are easier to implement (especially if you are not the one that
>>>> have to deal with the additional threading required around them ;) ). But
>>>> to answer this issue, if we choose pattern 2, we can always provide a
>>>> proxy/wrapper that would using the internal thread implement the
>>>> non-blocking API while exposing blocking API to the user. It would
>>>> implement pattern 2 for the user exposing to him pattern 1. In other words
>>>> implementing pattern 1 in pattern 2 paradigm, while making it possible to
>>>> implement pure pattern 2 connectors.
>>>> 
>>>>> BTW, one thing I am also trying to avoid is pushing users to perform IO
>>>> in
>>>>> a method like "isBlocked()". If the method is expected to fetch records
>>>>> (even if not returning them), naming it something more explicit would
>>>> help
>>>>> avoid confusion.
>>>> 
>>>> If we choose so, we could rework it into something like:
>>>> 
>>>> CompletableFuture<?> advance()
>>>> T getCurrent();
>>>> Watermark getCurrentWatermark()
>>>> 
>>>> But as I wrote before, this is more confusing to me for the exact reasons
>>>> you mentioned :) I would be confused what should be done in `adanvce()` and
>>>> what in `getCurrent()`. However, again this naming issue is not that
>>>> important to me and probably is matter of taste/personal preferences.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket.qin@xxxxxxxxx> wrote:
>>>>> 
>>>>> Hi Piotrek,
>>>>> 
>>>>> Thanks for the explanation. We are probably talking about the same thing
>>>>> but in different ways. To clarify a little bit, I think there are two
>>>>> patterns to read from a connector.
>>>>> 
>>>>> Pattern 1: Thread-less connector with a blocking read API. Outside of the
>>>>> connector, there is one IO thread per reader, doing blocking read. An
>>>>> additional thread will interact with all the IO threads.
>>>>> Pattern 2: Connector with internal thread(s) and non-blocking API.
>>>> Outside
>>>>> of the connector, there is one thread for ALL readers, doing IO relying
>>>> on
>>>>> notification callbacks in the reader.
>>>>> 
>>>>> In both patterns, there must be at least one thread per connector, either
>>>>> inside (created by connector writers) or outside (created by Flink) of
>>>> the
>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total, to make
>>>>> sure that 1 thread is fully non-blocking.
>>>>> 
>>>>>> Btw, I don’t know if you understand my point. Having only `poll()` and
>>>>> `take()` is not enough for single threaded task. If our source interface
>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?>
>>>>> isBlocked(),`, there is no way to implement single threaded task that
>>>> both
>>>>> reads the data from the source connector and can also react to system
>>>>> events. Ok, non >blocking `poll()` would allow that, but with busy
>>>> looping.
>>>>> 
>>>>> Completely agree that in pattern 2, having a callback is necessary for
>>>> that
>>>>> single thread outside of the connectors. And the connectors MUST have
>>>>> internal threads. If we go that way, we should have something like "void
>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>> CompletableFuture work here, though. If 10 readers returns 10 completable
>>>>> futures, will there be 10 additional threads (so 20 threads in total)
>>>>> blocking waiting on them? Or will there be a single thread busy loop
>>>>> checking around?
>>>>> 
>>>>> WRT pattern 1, a single blocking take() API should just work. The good
>>>>> thing is that a blocking read API is usually simpler to implement. An
>>>>> additional non-blocking "T poll()" method here is indeed optional and
>>>> could
>>>>> be used in cases like Flink does not want the thread to block forever.
>>>> They
>>>>> can also be combined to have a "T poll(Timeout)", which is exactly what
>>>>> KafkaConsumer did.
>>>>> 
>>>>> It sounds that you are proposing pattern 2 with something similar to NIO2
>>>>> AsynchronousByteChannel[1]. That API would work, except that the
>>>> signature
>>>>> returning future seems not necessary. If that is the case, a minor change
>>>>> on the current FLIP proposal to have "void advance(callback)" should
>>>> work.
>>>>> And this means the connectors MUST have their internal threads.
>>>>> 
>>>>> BTW, one thing I am also trying to avoid is pushing users to perform IO
>>>> in
>>>>> a method like "isBlocked()". If the method is expected to fetch records
>>>>> (even if not returning them), naming it something more explicit would
>>>> help
>>>>> avoid confusion.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> [1]
>>>>> 
>>>> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html
>>>>> 
>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
>>>>> wrote:
>>>>> 
>>>>>> Hi
>>>>>> 
>>>>>> Good point with select/epoll, however I do not see how they couldn’t be
>>>>>> with Flink if we would like single task in Flink to be single-threaded
>>>> (and
>>>>>> I believe we should pursue this goal). If your connector blocks on
>>>>>> `select`, then it can not process/handle control messages from Flink,
>>>> like
>>>>>> checkpoints, releasing resources and potentially output flushes. This
>>>> would
>>>>>> require tight integration between connector and Flink’s main event
>>>>>> loop/selects/etc.
>>>>>> 
>>>>>> Looking at it from other perspective. Let’s assume that we have a
>>>>>> connector implemented on top of `select`/`epoll`. In order to integrate
>>>> it
>>>>>> with Flink’s checkpointing/flushes/resource releasing it will have to be
>>>>>> executed in separate thread one way or another. At least if our API will
>>>>>> enforce/encourage non blocking implementations with some kind of
>>>>>> notifications (`isBlocked()` or `notify()` callback), some connectors
>>>> might
>>>>>> skip one layer of wapping threads.
>>>>>> 
>>>>>> Btw, I don’t know if you understand my point. Having only `poll()` and
>>>>>> `take()` is not enough for single threaded task. If our source interface
>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?>
>>>>>> isBlocked(),`, there is no way to implement single threaded task that
>>>> both
>>>>>> reads the data from the source connector and can also react to system
>>>>>> events. Ok, non blocking `poll()` would allow that, but with busy
>>>> looping.
>>>>>> 
>>>>>> Piotrek
>>>>>> 
>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket.qin@xxxxxxxxx> wrote:
>>>>>>> 
>>>>>>> Hi Piotrek,
>>>>>>> 
>>>>>>>> But I don’t see a reason why we should expose both blocking `take()`
>>>> and
>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink
>>>> engine
>>>>>> or
>>>>>>> connector) would have to do the same busy
>>>>>>>> looping anyway and I think it would be better to have a simpler
>>>>>> connector
>>>>>>> API (that would solve our problems) and force connectors to comply one
>>>>>> way
>>>>>>> or another.
>>>>>>> 
>>>>>>> If we let the block happen inside the connector, the blocking does not
>>>>>> have
>>>>>>> to be a busy loop. For example, to do the block waiting efficiently,
>>>> the
>>>>>>> connector can use java NIO selector().select which relies on OS syscall
>>>>>>> like epoll[1] instead of busy looping. But if Flink engine blocks
>>>> outside
>>>>>>> the connector, it pretty much has to do the busy loop. So if there is
>>>>>> only
>>>>>>> one API to get the element, a blocking getNextElement() makes more
>>>> sense.
>>>>>>> In any case, we should avoid ambiguity. It has to be crystal clear
>>>> about
>>>>>>> whether a method is expected to be blocking or non-blocking. Otherwise
>>>> it
>>>>>>> would be very difficult for Flink engine to do the right thing with the
>>>>>>> connectors. At the first glance at getCurrent(), the expected behavior
>>>> is
>>>>>>> not quite clear.
>>>>>>> 
>>>>>>> That said, I do agree that functionality wise, poll() and take() kind
>>>> of
>>>>>>> overlap. But they are actually not quite different from
>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the only
>>>>>>> difference is that poll() also returns the next record if it is
>>>>>> available.
>>>>>>> But I agree that the isBlocked() + getNextElement() is more flexible as
>>>>>>> users can just check the record availability, but not fetch the next
>>>>>>> element.
>>>>>>> 
>>>>>>>> In case of thread-less readers with only non-blocking `queue.poll()`
>>>> (is
>>>>>>> that really a thing? I can not think about a real implementation that
>>>>>>> enforces such constraints)
>>>>>>> Right, it is pretty much a syntax sugar to allow user combine the
>>>>>>> check-and-take into one method. It could be achieved with isBlocked() +
>>>>>>> getNextElement().
>>>>>>> 
>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <
>>>> piotr@xxxxxxxxxxxxxxxxx>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Becket,
>>>>>>>> 
>>>>>>>> With my proposal, both of your examples would have to be solved by the
>>>>>>>> connector and solution to both problems would be the same:
>>>>>>>> 
>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return
>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion
>>>>>> (or
>>>>>>>> semi blocking with return of control from time to time to allow for
>>>>>>>> checkpointing, network flushing and other resource management things
>>>> to
>>>>>>>> happen in the same main thread). In other words, exactly how you would
>>>>>>>> implement `take()` method or how the same source connector would be
>>>>>>>> implemented NOW with current source interface. The difference with
>>>>>> current
>>>>>>>> interface would be only that main loop would be outside of the
>>>>>> connector,
>>>>>>>> and instead of periodically releasing checkpointing lock, periodically
>>>>>>>> `return null;` or `return Optional.empty();` from `getNextElement()`.
>>>>>>>> 
>>>>>>>> In case of thread-less readers with only non-blocking `queue.poll()`
>>>> (is
>>>>>>>> that really a thing? I can not think about a real implementation that
>>>>>>>> enforces such constraints), we could provide a wrapper that hides the
>>>>>> busy
>>>>>>>> looping. The same applies how to solve forever blocking readers - we
>>>>>> could
>>>>>>>> provider another wrapper running the connector in separate thread.
>>>>>>>> 
>>>>>>>> But I don’t see a reason why we should expose both blocking `take()`
>>>> and
>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink
>>>>>> engine or
>>>>>>>> connector) would have to do the same busy looping anyway and I think
>>>> it
>>>>>>>> would be better to have a simpler connector API (that would solve our
>>>>>>>> problems) and force connectors to comply one way or another.
>>>>>>>> 
>>>>>>>> Piotrek
>>>>>>>> 
>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket.qin@xxxxxxxxx> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Piotr,
>>>>>>>>> 
>>>>>>>>> I might have misunderstood you proposal. But let me try to explain my
>>>>>>>>> concern. I am thinking about the following case:
>>>>>>>>> 1. a reader has the following two interfaces,
>>>>>>>>> boolean isBlocked()
>>>>>>>>> T getNextElement()
>>>>>>>>> 2. the implementation of getNextElement() is non-blocking.
>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any internal
>>>>>> thread.
>>>>>>>>> For example, it might just delegate the getNextElement() to a
>>>>>>>> queue.poll(),
>>>>>>>>> and isBlocked() is just queue.isEmpty().
>>>>>>>>> 
>>>>>>>>> How can Flink efficiently implement a blocking reading behavior with
>>>>>> this
>>>>>>>>> reader? Either a tight loop or a backoff interval is needed. Neither
>>>> of
>>>>>>>>> them is ideal.
>>>>>>>>> 
>>>>>>>>> Now let's say in the reader mentioned above implements a blocking
>>>>>>>>> getNextElement() method. Because there is no internal thread in the
>>>>>>>> reader,
>>>>>>>>> after isBlocked() returns false. Flink will still have to loop on
>>>>>>>>> isBlocked() to check whether the next record is available. If the
>>>> next
>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min. You have
>>>>>>>>> probably noticed that in this case, even isBlocked() returns a
>>>> future,
>>>>>>>> that
>>>>>>>>> future() will not be completed if Flink does not call some method
>>>> from
>>>>>>>> the
>>>>>>>>> reader, because the reader has no internal thread to complete that
>>>>>> future
>>>>>>>>> by itself.
>>>>>>>>> 
>>>>>>>>> Due to the above reasons, a blocking take() API would allow Flink to
>>>>>> have
>>>>>>>>> an efficient way to read from a reader. There are many ways to wake
>>>> up
>>>>>>>> the
>>>>>>>>> blocking thread when checkpointing is needed depending on the
>>>>>>>>> implementation. But I think the poll()/take() API would also work in
>>>>>> that
>>>>>>>>> case.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <
>>>> piotr@xxxxxxxxxxxxxxxxx
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> a)
>>>>>>>>>> 
>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more questions.
>>>> 21,
>>>>>>>> Is
>>>>>>>>>> a method isReady() with boolean as a return value
>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing in what
>>>>>> is
>>>>>>>>>> supposed to be returned when the future is completed. 22. if
>>>>>>>>>>> the implementation of isBlocked() is optional, how do the callers
>>>>>> know
>>>>>>>>>> whether the method is properly implemented or not?
>>>>>>>>>>> Does not implemented mean it always return a completed future?
>>>>>>>>>> 
>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some kind of a
>>>>>>>>>> listener/callback that notifies about presence of next element.
>>>> There
>>>>>>>> are
>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a minimal two
>>>>>> state
>>>>>>>>>> logic:
>>>>>>>>>> 
>>>>>>>>>> 1. Future is completed - we have more data
>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we
>>>> might/we
>>>>>>>> will
>>>>>>>>>> have in the future
>>>>>>>>>> 
>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit more
>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()` spam.
>>>>>>>>>> 
>>>>>>>>>> b)
>>>>>>>>>> 
>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>>>>>>>> `getNext`
>>>>>>>>>> the `getNext` would need return a
>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add timestamp
>>>> to
>>>>>>>>>> every element. IMO, this is not so memory friendly
>>>>>>>>>>> so I prefer this design.
>>>>>>>>>> 
>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate why
>>>> having a
>>>>>>>>>> separate `advance()` help?
>>>>>>>>>> 
>>>>>>>>>> c)
>>>>>>>>>> 
>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two separate
>>>>>>>>>> methods: poll and take? Which one of them should be called and which
>>>>>>>>>> implemented? What’s the benefit of having those methods compared to
>>>>>>>> having
>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or
>>>> whatever
>>>>>> we
>>>>>>>>>> name it) with following contract:
>>>>>>>>>> 
>>>>>>>>>> CompletableFuture<?> isBlocked();
>>>>>>>>>> 
>>>>>>>>>> /**
>>>>>>>>>> Return next element - will be called only if `isBlocked()` is
>>>>>> completed.
>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s
>>>> impossible
>>>>>> or
>>>>>>>>>> you just don’t need the effort, you can block in this method.
>>>>>>>>>> */
>>>>>>>>>> T getNextElement();
>>>>>>>>>> 
>>>>>>>>>> I mean, if the connector is implemented non-blockingly, Flink should
>>>>>> use
>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new
>>>>>>>>>> NotImplementedException()`. Implementing both of them and providing
>>>>>>>> both of
>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them into a
>>>>>>>> single
>>>>>>>>>> method call that should preferably (but not necessarily need to) be
>>>>>>>>>> non-blocking? It’s not like we are implementing general purpose
>>>>>> `Queue`,
>>>>>>>>>> which users might want to call either of `poll` or `take`. We would
>>>>>>>> always
>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we have no
>>>>>>>> choice,
>>>>>>>>>> but to call it and block on it.
>>>>>>>>>> 
>>>>>>>>>> d)
>>>>>>>>>> 
>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
>>>> very
>>>>>>>>>>> important. But in addition to `Future/poll`, there may be another
>>>> way
>>>>>>>> to
>>>>>>>>>>> achieve this. I think it may be not very memory friendly if every
>>>>>>>> advance
>>>>>>>>>>> call return a Future.
>>>>>>>>>> 
>>>>>>>>>> I didn’t want to mention this, to not clog my initial proposal, but
>>>>>>>> there
>>>>>>>>>> is a simple solution for the problem:
>>>>>>>>>> 
>>>>>>>>>> public interface SplitReader {
>>>>>>>>>> 
>>>>>>>>>> (…)
>>>>>>>>>> 
>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED =
>>>>>>>>>> CompletableFuture.completedFuture(null);
>>>>>>>>>> 
>>>>>>>>>> /**
>>>>>>>>>> * Returns a future that will be completed when the page source
>>>>>>>> becomes
>>>>>>>>>> * unblocked.  If the page source is not blocked, this method
>>>> should
>>>>>>>>>> return
>>>>>>>>>> * {@code NOT_BLOCKED}.
>>>>>>>>>> */
>>>>>>>>>> default CompletableFuture<?> isBlocked()
>>>>>>>>>> {
>>>>>>>>>>   return NOT_BLOCKED;
>>>>>>>>>> }
>>>>>>>>>> 
>>>>>>>>>> If we are blocked and we are waiting for the IO, then creating a new
>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not blocked
>>>>>>>> sources
>>>>>>>>>> returning a static `NOT_BLOCKED` constant  should also solve the
>>>>>>>> problem.
>>>>>>>>>> 
>>>>>>>>>> One more remark, non-blocking sources might be a necessity in a
>>>> single
>>>>>>>>>> threaded model without a checkpointing lock. (Currently when sources
>>>>>> are
>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire it again
>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints
>>>> to
>>>>>>>>>> happen when source is idling. In that case either `notify()` or my
>>>>>>>> proposed
>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping.
>>>>>>>>>> 
>>>>>>>>>> Piotrek
>>>>>>>>>> 
>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket.qin@xxxxxxxxx> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>> 
>>>>>>>>>>> The iterator-like API was also the first thing that came to me. But
>>>>>> it
>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the stream
>>>> has
>>>>>>>> not
>>>>>>>>>>> ended", but means "the next record is ready", which is repurposing
>>>>>> the
>>>>>>>>>> well
>>>>>>>>>>> known meaning of hasNext(). If we follow the hasNext()/next()
>>>>>> pattern,
>>>>>>>> an
>>>>>>>>>>> additional isNextReady() method to indicate whether the next record
>>>>>> is
>>>>>>>>>>> ready seems more intuitive to me.
>>>>>>>>>>> 
>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of isDone() is
>>>>>>>> needed
>>>>>>>>>>> to indicate whether the stream has ended or not.
>>>>>>>>>>> 
>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern,
>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader
>>>>>>>> implementation.
>>>>>>>>>>> When I am implementing a reader, I could have a couple of choices:
>>>>>>>>>>> 
>>>>>>>>>>> - A thread-less reader that does not have any internal thread.
>>>>>>>>>>> - When poll() is called, the same calling thread will perform a
>>>> bunch
>>>>>>>>>> of
>>>>>>>>>>> IO asynchronously.
>>>>>>>>>>> - When take() is called, the same calling thread will perform a
>>>>>>>>>> bunch
>>>>>>>>>>> of IO and wait until the record is ready.
>>>>>>>>>>> - A reader with internal threads performing network IO and put
>>>>>> records
>>>>>>>>>>> into a buffer.
>>>>>>>>>>> - When poll() is called, the calling thread simply reads from the
>>>>>>>>>>> buffer and return empty result immediately if there is no record.
>>>>>>>>>>> - When take() is called, the calling thread reads from the buffer
>>>>>>>>>> and
>>>>>>>>>>> block waiting if the buffer is empty.
>>>>>>>>>>> 
>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it
>>>> is
>>>>>>>>>> less
>>>>>>>>>>> intuitive for the reader developers to write the thread-less
>>>> pattern.
>>>>>>>>>>> Although technically speaking one can still do the asynchronous IO
>>>> to
>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit and seems
>>>>>>>>>>> somewhat hacky.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <thw@xxxxxxxxxx>
>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Couple more points regarding discovery:
>>>>>>>>>>>> 
>>>>>>>>>>>> The proposal mentions that discovery could be outside the
>>>> execution
>>>>>>>>>> graph.
>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I believe
>>>> that
>>>>>>>>>> will
>>>>>>>>>>>> also need to be the case in the future, even when discovery and
>>>>>>>> reading
>>>>>>>>>> are
>>>>>>>>>>>> split between different tasks.
>>>>>>>>>>>> 
>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the relationship
>>>>>>>>>> between
>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly
>>>> distributed
>>>>>>>>>> over
>>>>>>>>>>>> readers in certain situations. An example was mentioned here:
>>>>>>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>>>>>>>>> 
>>>>>>>>>>>> Thomas
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <thw@xxxxxxxxxx>
>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for getting the ball rolling on this!
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be closed and
>>>> go
>>>>>>>>>> away.
>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing shards
>>>>>> will
>>>>>>>> be
>>>>>>>>>>>>> closed and replaced with a new shard).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach
>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking, caller
>>>>>> retrieves
>>>>>>>>>> new
>>>>>>>>>>>>> records when available). The current Kinesis API requires the use
>>>>>> of
>>>>>>>>>>>>> threads. But that can be internal to the split reader and does
>>>> not
>>>>>>>> need
>>>>>>>>>>>> to
>>>>>>>>>>>>> be a source API concern. In fact, that's what we are working on
>>>>>> right
>>>>>>>>>> now
>>>>>>>>>>>>> as improvement to the existing consumer: Each shard consumer
>>>> thread
>>>>>>>>>> will
>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the queue(s).
>>>>>> It
>>>>>>>> is
>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The proposed SplitReader interface would fit the thread-less IO
>>>>>>>> model.
>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new element
>>>>>>>> (hasNext)
>>>>>>>>>>>> and
>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the meta
>>>>>>>> information
>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer a
>>>> timeout
>>>>>>>>>>>> option,
>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the other
>>>>>>>> hand, a
>>>>>>>>>>>>> caller processing multiple splits may want to cycle through fast,
>>>>>> to
>>>>>>>>>>>>> process elements of other splits as soon as they become
>>>> available.
>>>>>>>> The
>>>>>>>>>>>> nice
>>>>>>>>>>>>> thing is that this "split merge" logic can now live in Flink and
>>>> be
>>>>>>>>>>>>> optimized and shared between different sources.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei.mgw@xxxxxxxxx>
>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
>>>>>>>> very
>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be
>>>> another
>>>>>>>> way
>>>>>>>>>> to
>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if
>>>> every
>>>>>>>>>>>> advance
>>>>>>>>>>>>>> call return a Future.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> public interface Listener {
>>>>>>>>>>>>>> public void notify();
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> public interface SplitReader() {
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>> * When there is no element temporarily, this will return
>>>> false.
>>>>>>>>>>>>>> * When elements is available again splitReader can call
>>>>>>>>>>>>>> listener.notify()
>>>>>>>>>>>>>> * In addition the frame would check `advance` periodically .
>>>>>>>>>>>>>> * Of course advance can always return true and ignore the
>>>>>>>>>> listener
>>>>>>>>>>>>>> argument for simplicity.
>>>>>>>>>>>>>> */
>>>>>>>>>>>>>> public boolean advance(Listener listener);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits
>>>>>> and
>>>>>>>>>> how
>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no strategy
>>>> for
>>>>>>>> the
>>>>>>>>>>>> user
>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think we
>>>> could
>>>>>>>> add
>>>>>>>>>> a
>>>>>>>>>>>>>> Enum to let user to choose.
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>>>>>>>>> Location,
>>>>>>>>>>>>>> Workload,
>>>>>>>>>>>>>> Random,
>>>>>>>>>>>>>> Average
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> */
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>>>>>>>>>> `getNext`
>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because
>>>>>>>> some
>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO, this is not
>>>>>> so
>>>>>>>>>>>> memory
>>>>>>>>>>>>>> friendly so I prefer this design.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> 于2018年11月1日周四
>>>> 下午6:08写道:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
>>>>>>>> other
>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of having a
>>>>>>>>>> method:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> boolean advance() throws IOException;
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would replace it with
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> /*
>>>>>>>>>>>>>>> * Return a future, which when completed means that source has
>>>>>> more
>>>>>>>>>>>> data
>>>>>>>>>>>>>>> and getNext() will not block.
>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking connectors,
>>>> please
>>>>>>>>>>>>>>> implement this method appropriately.
>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>>>>>>>>>  return CompletableFuture.completedFuture(null);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Couple of arguments:
>>>>>>>>>>>>>>> 1. I don’t understand the division of work between `advance()`
>>>>>> and
>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which, especially for
>>>>>>>>>> connectors
>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when should you
>>>>>>>> call
>>>>>>>>>>>>>>> `advance` and when `getCurrent()`.
>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow
>>>> us
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and more
>>>>>>>>>>>> efficiently
>>>>>>>>>>>>>>> handle large number of blocked threads, without busy waiting.
>>>>>> While
>>>>>>>>>> at
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>>>>>>>>>>> implementations can be always blocking.
>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of
>>>>>> task
>>>>>>>>>>>>>>> executors, instead of one thread per task.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
>>>> aljoscha@xxxxxxxxxx
>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new source
>>>>>>>> interface
>>>>>>>>>>>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing
>>>> work/discussion
>>>>>>>>>> about
>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis source
>>>> and
>>>>>>>>>>>> because
>>>>>>>>>>>>>>> this would enable generic implementation of event-time
>>>> alignment
>>>>>>>> for
>>>>>>>>>>>> all
>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time
>>>> alignment
>>>>>>>>>> part,
>>>>>>>>>>>>>>> especially the part about information sharing between
>>>> operations
>>>>>>>> (I'm
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> calling it state sharing because state has a special meaning in
>>>>>>>>>> Flink).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Please discuss away!
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>