[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Hey Becket,

Re 2.


If source is purely single threaded and blocking, then it could be implemented in the following way:

* Return a future, which when completed means that source has more data and getNext() will not block.
* If you wish to use benefits of non blocking connectors, please implement this method appropriately.
CompletableFuture<?> isBlocked() {
	return CompletableFuture.completedFuture(null); // this would be the default behaviour, so user wouldn’t need to override this at all 

T getNext() {
	// do some blocking reading operation
	return result;

Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an optional optimisation that some connectors might provide.

Providing non blocking `poll` method doesn’t solve the problem of actually limiting the number of active threads. One of the potential benefits of `CompletableFuture<?> isBlocked()` is that we could have a fixed size pool of worker threads. Worker thread could pick a non blocked task that’s waiting to be executed and to this `CompletableFuture<?>` would be needed to juggle between blocked/active state. Other potential side benefit could be for reporting in UI/metrics which tasks are blocked (kind of like current back pressure monitoring).

Maybe such extension could use of some PoC that would (or not) show some benefits.


> On 1 Nov 2018, at 19:29, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> Thanks for the FLIP, Aljoscha.
> The proposal makes sense to me. Separating the split discovery and
> consumption is very useful as it enables Flink to better manage the sources.
> Looking at the interface, I have a few questions:
> 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
> of splits can only increase, In your example, the source was Kafka, so the
> assumption was true. But I am wondering are there case that the number of
> splits can decrease?
> 2. I agree with Piotr that we need to be careful about potentially blocking
> implementations. However, it is not clear to me how does the completable
> future work if the underlying reader does not have its own thread (e.g. a
> Kafka consumer). In that case, the future will never be completed unless
> the caller thread touches the reader again. I am wondering if the following
> interfaces for the reader makes sense:
>    boolean isDone(); // Whether the source has more records.
>    T poll(); // non-blocking read. We can add a timeout if needed.
>    T take(); // blocking read;
> This seems more intuitive to people who are familiar with existing
> convention of poll() and take(). And with the non-blocking poll() we could
> have an nio Selector-like API when there are multiple splits.
> BTW, it would be really helpful if there is some Java doc describing the
> behavior of the the interfaces in the FLIP.
> Thanks again for the great proposal.
> Jiangjie (Becket) Qin
> On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
> wrote:
>> Hi,
>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>> possible improvements. I have one proposal. Instead of having a method:
>> boolean advance() throws IOException;
>> I would replace it with
>> /*
>> * Return a future, which when completed means that source has more data
>> and getNext() will not block.
>> * If you wish to use benefits of non blocking connectors, please
>> implement this method appropriately.
>> */
>> default CompletableFuture<?> isBlocked() {
>>        return CompletableFuture.completedFuture(null);
>> }
>> And rename `getCurrent()` to `getNext()`.
>> Couple of arguments:
>> 1. I don’t understand the division of work between `advance()` and
>> `getCurrent()`. What should be done in which, especially for connectors
>> that handle records in batches (like Kafka) and when should you call
>> `advance` and when `getCurrent()`.
>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>> future to have asynchronous/non blocking connectors and more efficiently
>> handle large number of blocked threads, without busy waiting. While at the
>> same time it doesn’t add much complexity, since naive connector
>> implementations can be always blocking.
>> 3. This also would allow us to use a fixed size thread pool of task
>> executors, instead of one thread per task.
>> Piotrek
>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljoscha@xxxxxxxxxx> wrote:
>>> Hi All,
>>> In order to finally get the ball rolling on the new source interface
>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>> adding per-partition watermark support to the Kinesis source and because
>> this would enable generic implementation of event-time alignment for all
>> sources. Maybe we need another FLIP for the event-time alignment part,
>> especially the part about information sharing between operations (I'm not
>> calling it state sharing because state has a special meaning in Flink).
>>> Please discuss away!
>>> Aljoscha