osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] FLIP-27: Refactor Source Interface


Hi Becket,

I think the problem is not with the split re-assignment, but with dynamic split discovery. We do not always know before the hand the number of splits (for example Kafka partition/topic discovery, but this can also happen in batch), while the source parallelism is fixed/known before hand.

> 1. What if the SplitReader implementation cannot easily add a split to read on the fly?

Always initiating one consumer per split will not be efficient in many cases. While if the connector needs to instantiate a new reader per each split, connector can handle this internally (addSplit() would close previous reader and create new one).

> 2. Does Flink have to be involved in splits assignment?

I think that this might be a good shared logic between different connectors.

> @Biao,
> If I understand correctly, the concern you raised was that a Source may
> return a lot of splits and thus Flink may have to create a lot of fetcher
> threads. This is a valid concern, but I cannot think of a solution to that.
> After all, the SplitReaders may be written by third parties. Poor
> implementations seem difficult to prevent.

I think we can solve this and this is not as uncommon as you might think. In batch word, usually/often you have one split per HDFS chunk, each chunk being 64-256MB. With peta byte tables you end up with range from millions to billions of splits. This becomes a bottleneck if splits can be efficiently filtered out/eliminated based on some header (ORC header for example). In other words, if you have huge number of splits that are very cheap/quick to process.

Piotrek

> On 22 Nov 2018, at 04:54, Becket Qin <becket.qin@xxxxxxxxx> wrote:
> 
> Thanks Piotrek,
> 
>> void SplitReader#addSplit(Split)
>> boolean SplitReader#doesWantMoreSplits()
> 
> I have two questions about this API.
> 1. What if the SplitReader implementation cannot easily add a split to read
> on the fly?
> 2. Does Flink have to be involved in splits assignment?
> 
> I am wondering if it would be simpler to let the enumerator indicate
> whether a split reassignment is needed. If the answer is yes, Flink can
> just start from the beginning to get all the splits and create one reader
> per split. This might be a little more expensive than dynamically adding a
> split to a reader, but given that the splits change should be rare, it is
> probably acceptable.
> 
> In the Kafka case, the SplitT may just be a consumer. The enumerator will
> simply check if the topic has new partitions to be assigned to this reader.
> 
> @Biao,
> If I understand correctly, the concern you raised was that a Source may
> return a lot of splits and thus Flink may have to create a lot of fetcher
> threads. This is a valid concern, but I cannot think of a solution to that.
> After all, the SplitReaders may be written by third parties. Poor
> implementations seem difficult to prevent.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Wed, Nov 21, 2018 at 10:13 PM Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>
> wrote:
> 
>> Hi again,
>> 
>>> However I don't like the thread mode which starts a thread for each
>> split.
>>> Starting extra thread in operator is not an ideal way IMO. Especially
>>> thread count is decided by split count. So I was wondering if there is a
>>> more elegant way. Do we really want these threads in Flink core?
>> 
>> Biao you have raised an important issue. Indeed it seems like the current
>> proposal is missing something. I would guess that we need a mechanism for
>> adding new splits to an already existing SplitReader and some logic to
>> determine whether current instance can accept more splits or not. For
>> example
>> 
>> void SplitReader#addSplit(Split)
>> boolean SplitReader#doesWantMoreSplits()
>> 
>> Flink could randomly/round robin assign new splits to the SplitReaders
>> that `doWantMoreSplits()`. Batch file readers might implement some custom
>> logic in `doesWantMoreSplits()`, like one SplitReader can have at most N
>> enqueued splits?
>> 
>> Also what about Kafka. Isn’t it the case that one KafkaConsumer can read
>> from multiple splits? So Kafka’s SplitReader should always return true from
>> `doesWantMoreSplits()`?
>> 
>> What do you think?
>> 
>> Re: Becket
>> 
>> I’m +1 for Sync and AsyncSplitReader.
>> 
>> Piotrek
>> 
>>> On 21 Nov 2018, at 14:49, Becket Qin <becket.qin@xxxxxxxxx> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> Good point on the potential optimization in the source. One thing to
>>> clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair
>> to
>>> the split interface", did you mean "split reader interface"? If so, what
>>> should the readers do if they do not have such additional information? I
>> am
>>> wondering if it is possible to leave such optimization to the source
>>> internal implementation.
>>> 
>>> @all
>>> After reading all the feedback, Biao and I talked a little bit offline.
>> We
>>> would like to share some new thoughts with you and see what do you think.
>>> 
>>> When looking at the Source API, we were trying to answer two questions.
>>> First of all, how would Flink use this API if someone else implemented
>> it.
>>> Secondly, how would the connector contributors implement the interface?
>> How
>>> difficult is the implementation.
>>> 
>>> KafkaConsumer is a typical example of a thread-less reader. The idea was
>> to
>>> allow different threading model on top of it. It could be a global single
>>> thread handles record fetching and processing in an event loop pattern;
>> it
>>> could also be one dedicated fetcher thread for each consumer and a
>> separate
>>> thread pool for record processing. The API gives the freedom of picking
>> up
>>> threading model to the users. To answer the first question, I would love
>> to
>>> have such a source reader API so Flink can choose whatever threading
>> model
>>> it wants. However, implementing such an interface could be pretty
>>> challenging and error prone.
>>> 
>>> On the other hand, having a source reader with a naive blocking socket is
>>> probably simple enough in most cases (actually sometimes this might even
>> be
>>> the most efficient way). But it does not leave much option to Flink other
>>> than creating one thread per reader.
>>> 
>>> Given the above thoughts, it might be reasonable to separate the
>>> SplitReader API into two: SyncReader and AsyncReader. The sync reader
>> just
>>> has a simple blocking takeNext() API. And the AsyncReader just has a
>>> pollNext(Callback) or Future<?> pollNext(). All the other methods are
>>> shared by both readers and could be put into a package private parent
>>> interface like BaseSplitReader.
>>> 
>>> Having these two readers allows both complicated and simple
>> implementation,
>>> depending on the SplitReader writers. From Flink's perspective, it will
>>> choose a more efficient threading model if the SplitReader is an
>>> AsyncReader. Otherwise, it may have to use the one thread per reader
>> model
>>> if the reader is a SyncReader. Users can also choose to implement both
>>> interface, in that case, it is up to Flink to choose which interface to
>> use.
>>> 
>>> Admittedly, this solution does have one more interface, but still seems
>>> rewarding. Any thoughts?
>>> 
>>> Thanks,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> 
>>> On Sun, Nov 18, 2018 at 11:33 PM Biao Liu <mmyy1110@xxxxxxxxx> wrote:
>>> 
>>>> Hi community,
>>>> 
>>>> Thank you guys for sharing ideas.
>>>> 
>>>> The thing I really concern is about the thread mode.
>>>> Actually in Alibaba, we have implemented our "split reader" based source
>>>> two years ago. That's based on "SourceFunction", it's just an extension
>> not
>>>> a refactoring. It's almost same with the version Thomas and Jamie
>> described
>>>> in Google Doc. It really helps in many scenarios.
>>>> 
>>>> However I don't like the thread mode which starts a thread for each
>> split.
>>>> Starting extra thread in operator is not an ideal way IMO. Especially
>>>> thread count is decided by split count. So I was wondering if there is a
>>>> more elegant way. Do we really want these threads in Flink core?
>>>> 
>>>> I agree that blocking interface is more easy to implement. Could we at
>>>> least separate the split reader with source function into different
>>>> interfaces? Not all sources would like to read all splits concurrently.
>> In
>>>> batch scenario, reading splits one by one is more general. And also not
>> all
>>>> sources are partitioned, right?
>>>> I prefer there is a new source interface with "pull mode" only, no
>> split.
>>>> There is a splittable source extended it. And there is one
>> implementation
>>>> that starting threads for each split, reading all splits concurrently.
>>>> 
>>>> 
>>>> Thomas Weise <thw@xxxxxxxxxx> 于2018年11月18日周日 上午3:18写道:
>>>> 
>>>>> @Aljoscha to address your question first: In the case of the Kinesis
>>>>> consumer (with current Kinesis consumer API), there would also be N+1
>>>>> threads. I have implemented a prototype similar to what is shown in
>>>> Jamie's
>>>>> document, where the thread ownership is similar to what you have done
>> for
>>>>> Kafka.
>>>>> 
>>>>> The equivalent of split reader manages its own thread and the "source
>>>> main
>>>>> thread" is responsible for emitting the data. The interface between
>> the N
>>>>> reader threads and the 1 emitter is a blocking queue per consumer
>> thread.
>>>>> The emitter can now control which queue to consume from based on the
>>>> event
>>>>> time progress.
>>>>> 
>>>>> This is akin to a "non-blocking" interface *between emitter and split
>>>>> reader*. Emitter uses poll to retrieve records from the N queues (which
>>>>> requires non-blocking interaction). The emitter is independent of the
>>>> split
>>>>> reader implementation, that part could live in Flink.
>>>>> 
>>>>> Regarding whether or not to assume that split readers always need a
>>>> thread
>>>>> and in addition that these reader threads should be managed by Flink:
>> It
>>>>> depends on the API of respective external systems and I would not bake
>>>> that
>>>>> assumption into Flink. Some client libraries manage their own threads
>>>> (see
>>>>> push based API like JMS and as I understand it may also apply to the
>> new
>>>>> fan-out Kinesis API:
>>>>> 
>>>>> 
>>>> 
>> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>>>>> ).
>>>>> In such cases it would not make sense to layer another reader thread on
>>>>> top. It may instead be better if Flink provides to the split reader the
>>>>> queue/buffer to push records to.
>>>>> 
>>>>> The discussion so far has largely ignored the discovery aspect. There
>> are
>>>>> some important considerations such as ordering dependency of splits and
>>>>> work rebalancing that may affect the split reader interface. Should we
>>>> fork
>>>>> this into a separate thread?
>>>>> 
>>>>> Thanks,
>>>>> Thomas
>>>>> 
>>>>> 
>>>>> On Fri, Nov 16, 2018 at 8:09 AM Piotr Nowojski <
>> piotr@xxxxxxxxxxxxxxxxx>
>>>>> wrote:
>>>>> 
>>>>>> Hi Jamie,
>>>>>> 
>>>>>> As it was already covered with my discussion with Becket, there is an
>>>>> easy
>>>>>> way to provide blocking API on top of non-blocking API. And yes we
>> both
>>>>>> agreed that blocking API is easier to implement by users.
>>>>>> 
>>>>>> I also do not agree with respect to usefulness of non blocking API.
>>>>>> Actually Kafka connector is the one that could be more efficient
>> thanks
>>>>> to
>>>>>> the removal of the one layer of threading.
>>>>>> 
>>>>>> Piotrek
>>>>>> 
>>>>>>> On 16 Nov 2018, at 02:21, Jamie Grier <jgrier@xxxxxxxx.INVALID>
>>>> wrote:
>>>>>>> 
>>>>>>> Thanks Aljoscha for getting this effort going!
>>>>>>> 
>>>>>>> There's been plenty of discussion here already and I'll add my big +1
>>>>> to
>>>>>>> making this interface very simple to implement for a new
>>>>>>> Source/SplitReader.  Writing a new production quality connector for
>>>>> Flink
>>>>>>> is very difficult today and requires a lot of detailed knowledge
>>>> about
>>>>>>> Flink, event time progress, watermarking, idle shard detection, etc
>>>> and
>>>>>> it
>>>>>>> would be good to move almost all of this type of code into Flink
>>>> itself
>>>>>> and
>>>>>>> out of source implementations.  I also think this is totally doable
>>>> and
>>>>>> I'm
>>>>>>> really excited to see this happening.
>>>>>>> 
>>>>>>> I do have a couple of thoughts about the API and the implementation..
>>>>>>> 
>>>>>>> In a perfect world there would be a single thread per Flink source
>>>>>> sub-task
>>>>>>> and no additional threads for SplitReaders -- but this assumes a
>>>> world
>>>>>>> where you have true async IO APIs for the upstream systems (like
>>>> Kafka
>>>>>> and
>>>>>>> Kinesis, S3, HDFS, etc).  If that world did exist the single thread
>>>>> could
>>>>>>> just sit in an efficient select() call waiting for new data to arrive
>>>>> on
>>>>>>> any Split.  That'd be awesome..
>>>>>>> 
>>>>>>> But, that world doesn't exist and given that practical consideration
>>>> I
>>>>>>> would think the next best implementation is going to be, in practice,
>>>>>>> probably a thread per SplitReader that does nothing but call the
>>>> source
>>>>>> API
>>>>>>> and drop whatever it reads into a (blocking) queue -- as Aljoscha
>>>>>> mentioned
>>>>>>> (calling it N+1) and as we started to describe here:
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa
>>>>>>> 
>>>>>>> I guess my point is that I think we should strive to move as much of
>>>>>>> something like the diagram referenced in the above doc into Flink
>>>>> itself
>>>>>>> and out of sources and simplify the SplitReader API as much as
>>>> possible
>>>>>> as
>>>>>>> well.
>>>>>>> 
>>>>>>> With the above in mind and with regard to the discussion about
>>>>> blocking,
>>>>>>> etc..  I'm not sure I agree with some of the discussion so far with
>>>>>> regard
>>>>>>> to this API design.  The calls to the upstream systems
>>>> (kafka/kinesis)
>>>>>> are
>>>>>>> in fact going to be blocking calls.  So a simple API without the
>>>>>> constraint
>>>>>>> that the methods must be implemented in a non-blocking way seems
>>>> better
>>>>>> to
>>>>>>> me from the point of view of somebody writing a new source
>>>>>> implementation.
>>>>>>> My concern is that if you force the implementer of the SplitReader
>>>>>>> interface to do so in a non-blocking way you're just going to make it
>>>>>>> harder to write those implementations.  Those calls to read the next
>>>>> bit
>>>>>> of
>>>>>>> data are going to be blocking calls with most known important sources
>>>>> --
>>>>>> at
>>>>>>> least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with
>>>>>> that
>>>>>>> head on and work around it a higher level so the SplitReader
>>>> interface
>>>>>>> stays super simple to implement.  This means we manage all the
>>>>> threading
>>>>>> in
>>>>>>> Flink core, the API stays pull-based, and the implementer is allowed
>>>> to
>>>>>>> simply block until they have data to return.
>>>>>>> 
>>>>>>> I maybe would change my mind about this if truly asynchronous APIs to
>>>>> the
>>>>>>> upstream source systems were likely to be available in the near
>>>> future
>>>>> or
>>>>>>> are now and I'm just ignorant of it.  But even then the supporting
>>>> code
>>>>>> in
>>>>>>> Flink to drive async and sync sources would be different and in fact
>>>>> they
>>>>>>> might just have different APIs altogether -- SplitReader vs
>>>>>>> AsyncSplitReader maybe.
>>>>>>> 
>>>>>>> In the end I think playing with the implementation, across more than
>>>>> one
>>>>>>> source, and moving as much common code into Flink itself will reveal
>>>>> the
>>>>>>> best API of course.
>>>>>>> 
>>>>>>> One other interesting note is that you need to preserve per-partition
>>>>>>> ordering so you have to take care with the implementation if it were
>>>> to
>>>>>> be
>>>>>>> based on a thread pool and futures so as not to reorder the reads.
>>>>>>> 
>>>>>>> Anyway, I'm thrilled to see this starting to move forward and I'd
>>>> very
>>>>>> much
>>>>>>> like to help with the implementation wherever I can.  We're doing a
>>>>>>> simplified internal version of some of this at Lyft for just Kinesis
>>>>>>> because we need a solution for event time alignment in the very short
>>>>>> term
>>>>>>> but we'd like to immediately start helping to do this properly in
>>>> Flink
>>>>>>> after that.  One of the end goals for us is event time alignment
>>>> across
>>>>>>> heterogeneous sources.  Another is making it possible for non-expert
>>>>>> users
>>>>>>> to have a high probability of being able to write their own, correct,
>>>>>>> connectors.
>>>>>>> 
>>>>>>> -Jamie
>>>>>>> 
>>>>>>> On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek <
>>>> aljoscha@xxxxxxxxxx>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> I thought I had sent this mail a while ago but I must have forgotten
>>>>> to
>>>>>>>> send it.
>>>>>>>> 
>>>>>>>> There is another thing we should consider for splits: the range of
>>>>>>>> timestamps that it can contain. For example, the splits of a file
>>>>> source
>>>>>>>> would know what the minimum and maximum timestamp in the splits is,
>>>>>>>> roughly. For infinite splits, such as Kafka partitions, the minimum
>>>>>> would
>>>>>>>> be meaningful but the maximum would be +Inf. If the splits expose
>>>> the
>>>>>>>> interval of time that they contain the readers, or the component
>>>> that
>>>>>>>> manages the readers can make decisions about which splits to forward
>>>>> and
>>>>>>>> read first. And it can also influence the minimum watermark that a
>>>>>> reader
>>>>>>>> forwards: it should never emit a watermark if it knows there are
>>>>> splits
>>>>>> to
>>>>>>>> read that have a lower minimum timestamp. I think it should be as
>>>> easy
>>>>>> as
>>>>>>>> adding a minimumTimestamp()/maximumTimestamp() method pair to the
>>>>> split
>>>>>>>> interface.
>>>>>>>> 
>>>>>>>> Another thing we need to resolve is the actual reader interface. I
>>>> see
>>>>>>>> there has been some good discussion but I don't know if we have
>>>>>> consensus.
>>>>>>>> We should try and see how specific sources could be implemented with
>>>>> the
>>>>>>>> new interface. For example, for Kafka I think we need to have N+1
>>>>>> threads
>>>>>>>> per task (where N is the number of splits that a task is reading
>>>>> from).
>>>>>> On
>>>>>>>> thread is responsible for reading from the splits. And each split
>>>> has
>>>>>> its
>>>>>>>> own (internal) thread for reading from Kafka and putting messages in
>>>>> an
>>>>>>>> internal queue to pull from. This is similar to how the current
>>>> Kafka
>>>>>>>> source is implemented, which has a separate fetcher thread. The
>>>> reason
>>>>>> for
>>>>>>>> this split is that we always need to try reading from Kafka to keep
>>>>> the
>>>>>>>> throughput up. In the current implementation the internal queue (or
>>>>>>>> handover) limits the read rate of the reader threads.
>>>>>>>> 
>>>>>>>> @Thomas, what do you think this would look like for Kinesis?
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>>>>> On 15. Nov 2018, at 03:56, Becket Qin <becket.qin@xxxxxxxxx>
>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Piotrek,
>>>>>>>>> 
>>>>>>>>> Thanks a lot for the detailed reply. All makes sense to me.
>>>>>>>>> 
>>>>>>>>> WRT the confusion between advance() / getCurrent(), do you think it
>>>>>> would
>>>>>>>>> help if we combine them and have something like:
>>>>>>>>> 
>>>>>>>>> CompletableFuture<T> getNext();
>>>>>>>>> long getWatermark();
>>>>>>>>> long getCurrentTimestamp();
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <
>>>>>> piotr@xxxxxxxxxxxxxxxxx>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> Thanks again for the detailed answer :) Sorry for responding with
>>>> a
>>>>>>>> delay.
>>>>>>>>>> 
>>>>>>>>>>> Completely agree that in pattern 2, having a callback is
>>>> necessary
>>>>>> for
>>>>>>>>>> that
>>>>>>>>>>> single thread outside of the connectors. And the connectors MUST
>>>>> have
>>>>>>>>>>> internal threads.
>>>>>>>>>> 
>>>>>>>>>> Yes, this thread will have to exists somewhere. In pattern 2 it
>>>>> exists
>>>>>>>> in
>>>>>>>>>> the connector (at least from the perspective of the Flink
>>>> execution
>>>>>>>>>> engine). In pattern 1 it exists inside the Flink execution engine.
>>>>>> With
>>>>>>>>>> completely blocking connectors, like simple reading from files,
>>>> both
>>>>>> of
>>>>>>>>>> those approaches are basically the same. The difference is when
>>>> user
>>>>>>>>>> implementing Flink source is already working with a non blocking
>>>>> code
>>>>>>>> with
>>>>>>>>>> some internal threads. In this case, pattern 1 would result in
>>>>> "double
>>>>>>>>>> thread wrapping”, while pattern 2 would allow to skip one layer of
>>>>>>>>>> indirection.
>>>>>>>>>> 
>>>>>>>>>>> If we go that way, we should have something like "void
>>>>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10
>>>>>>>> completable
>>>>>>>>>>> futures, will there be 10 additional threads (so 20 threads in
>>>>> total)
>>>>>>>>>>> blocking waiting on them? Or will there be a single thread busy
>>>>> loop
>>>>>>>>>>> checking around?
>>>>>>>>>> 
>>>>>>>>>> To be honest, I haven’t thought this completely through and I
>>>>> haven’t
>>>>>>>>>> tested/POC’ed it. Having said that, I can think of at least couple
>>>>> of
>>>>>>>>>> solutions. First is something like this:
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Line:
>>>>>>>>>> 
>>>>>>>>>>                             `blocked = split.process();`
>>>>>>>>>> 
>>>>>>>>>> Is where the execution goes into to the task/sources. This is
>>>> where
>>>>>> the
>>>>>>>>>> returned future is handled:
>>>>>>>>>> 
>>>>>>>>>>                             blocked.addListener(() -> {
>>>>>>>>>>                                 blockedSplits.remove(split);
>>>>>>>>>>                                 // reset the level priority to
>>>>>>>> prevent
>>>>>>>>>> previously-blocked splits from starving existing splits
>>>>>>>>>>                                 split.resetLevelPriority();
>>>>>>>>>>                                 waitingSplits.offer(split);
>>>>>>>>>>                             }, executor);
>>>>>>>>>> 
>>>>>>>>>> Fundamentally callbacks and Futures are more or less
>>>> interchangeable
>>>>>> You
>>>>>>>>>> can always wrap one into another (creating a callback that
>>>>> completes a
>>>>>>>>>> future and attach a callback once future completes). In this case
>>>>> the
>>>>>>>>>> difference for me is mostly:
>>>>>>>>>> - api with passing callback allows the callback to be fired
>>>> multiple
>>>>>>>> times
>>>>>>>>>> and to fire it even if the connector is not blocked. This is what
>>>> I
>>>>>>>> meant
>>>>>>>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit
>>>>>> simpler.
>>>>>>>>>> Connector can only return either “I’m not blocked” or “I’m blocked
>>>>>> and I
>>>>>>>>>> will tell you only once when I’m not blocked anymore”.
>>>>>>>>>> 
>>>>>>>>>> But this is not the most important thing for me here. For me
>>>>> important
>>>>>>>>>> thing is to try our best to make Flink task’s control and
>>>> execution
>>>>>>>> single
>>>>>>>>>> threaded. For that both callback and future APIs should work the
>>>>> same.
>>>>>>>>>> 
>>>>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The
>>>>>> good
>>>>>>>>>>> thing is that a blocking read API is usually simpler to
>>>> implement.
>>>>>>>>>> 
>>>>>>>>>> Yes, they are easier to implement (especially if you are not the
>>>> one
>>>>>>>> that
>>>>>>>>>> have to deal with the additional threading required around them ;)
>>>>> ).
>>>>>>>> But
>>>>>>>>>> to answer this issue, if we choose pattern 2, we can always
>>>> provide
>>>>> a
>>>>>>>>>> proxy/wrapper that would using the internal thread implement the
>>>>>>>>>> non-blocking API while exposing blocking API to the user. It would
>>>>>>>>>> implement pattern 2 for the user exposing to him pattern 1. In
>>>> other
>>>>>>>> words
>>>>>>>>>> implementing pattern 1 in pattern 2 paradigm, while making it
>>>>> possible
>>>>>>>> to
>>>>>>>>>> implement pure pattern 2 connectors.
>>>>>>>>>> 
>>>>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to
>>>>> perform
>>>>>> IO
>>>>>>>>>> in
>>>>>>>>>>> a method like "isBlocked()". If the method is expected to fetch
>>>>>> records
>>>>>>>>>>> (even if not returning them), naming it something more explicit
>>>>> would
>>>>>>>>>> help
>>>>>>>>>>> avoid confusion.
>>>>>>>>>> 
>>>>>>>>>> If we choose so, we could rework it into something like:
>>>>>>>>>> 
>>>>>>>>>> CompletableFuture<?> advance()
>>>>>>>>>> T getCurrent();
>>>>>>>>>> Watermark getCurrentWatermark()
>>>>>>>>>> 
>>>>>>>>>> But as I wrote before, this is more confusing to me for the exact
>>>>>>>> reasons
>>>>>>>>>> you mentioned :) I would be confused what should be done in
>>>>>> `adanvce()`
>>>>>>>> and
>>>>>>>>>> what in `getCurrent()`. However, again this naming issue is not
>>>> that
>>>>>>>>>> important to me and probably is matter of taste/personal
>>>>> preferences.
>>>>>>>>>> 
>>>>>>>>>> Piotrek
>>>>>>>>>> 
>>>>>>>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket.qin@xxxxxxxxx>
>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the explanation. We are probably talking about the
>>>> same
>>>>>>>> thing
>>>>>>>>>>> but in different ways. To clarify a little bit, I think there are
>>>>> two
>>>>>>>>>>> patterns to read from a connector.
>>>>>>>>>>> 
>>>>>>>>>>> Pattern 1: Thread-less connector with a blocking read API.
>>>> Outside
>>>>> of
>>>>>>>> the
>>>>>>>>>>> connector, there is one IO thread per reader, doing blocking
>>>> read.
>>>>> An
>>>>>>>>>>> additional thread will interact with all the IO threads.
>>>>>>>>>>> Pattern 2: Connector with internal thread(s) and non-blocking
>>>> API.
>>>>>>>>>> Outside
>>>>>>>>>>> of the connector, there is one thread for ALL readers, doing IO
>>>>>> relying
>>>>>>>>>> on
>>>>>>>>>>> notification callbacks in the reader.
>>>>>>>>>>> 
>>>>>>>>>>> In both patterns, there must be at least one thread per
>>>> connector,
>>>>>>>> either
>>>>>>>>>>> inside (created by connector writers) or outside (created by
>>>> Flink)
>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total,
>>>>> to
>>>>>>>> make
>>>>>>>>>>> sure that 1 thread is fully non-blocking.
>>>>>>>>>>> 
>>>>>>>>>>>> Btw, I don’t know if you understand my point. Having only
>>>> `poll()`
>>>>>> and
>>>>>>>>>>> `take()` is not enough for single threaded task. If our source
>>>>>>>> interface
>>>>>>>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?>
>>>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task
>>>>> that
>>>>>>>>>> both
>>>>>>>>>>> reads the data from the source connector and can also react to
>>>>> system
>>>>>>>>>>> events. Ok, non >blocking `poll()` would allow that, but with
>>>> busy
>>>>>>>>>> looping.
>>>>>>>>>>> 
>>>>>>>>>>> Completely agree that in pattern 2, having a callback is
>>>> necessary
>>>>>> for
>>>>>>>>>> that
>>>>>>>>>>> single thread outside of the connectors. And the connectors MUST
>>>>> have
>>>>>>>>>>> internal threads. If we go that way, we should have something
>>>> like
>>>>>>>> "void
>>>>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10
>>>>>>>> completable
>>>>>>>>>>> futures, will there be 10 additional threads (so 20 threads in
>>>>> total)
>>>>>>>>>>> blocking waiting on them? Or will there be a single thread busy
>>>>> loop
>>>>>>>>>>> checking around?
>>>>>>>>>>> 
>>>>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The
>>>>>> good
>>>>>>>>>>> thing is that a blocking read API is usually simpler to
>>>> implement.
>>>>> An
>>>>>>>>>>> additional non-blocking "T poll()" method here is indeed optional
>>>>> and
>>>>>>>>>> could
>>>>>>>>>>> be used in cases like Flink does not want the thread to block
>>>>>> forever.
>>>>>>>>>> They
>>>>>>>>>>> can also be combined to have a "T poll(Timeout)", which is
>>>> exactly
>>>>>> what
>>>>>>>>>>> KafkaConsumer did.
>>>>>>>>>>> 
>>>>>>>>>>> It sounds that you are proposing pattern 2 with something similar
>>>>> to
>>>>>>>> NIO2
>>>>>>>>>>> AsynchronousByteChannel[1]. That API would work, except that the
>>>>>>>>>> signature
>>>>>>>>>>> returning future seems not necessary. If that is the case, a
>>>> minor
>>>>>>>> change
>>>>>>>>>>> on the current FLIP proposal to have "void advance(callback)"
>>>>> should
>>>>>>>>>> work.
>>>>>>>>>>> And this means the connectors MUST have their internal threads.
>>>>>>>>>>> 
>>>>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to
>>>>> perform
>>>>>> IO
>>>>>>>>>> in
>>>>>>>>>>> a method like "isBlocked()". If the method is expected to fetch
>>>>>> records
>>>>>>>>>>> (even if not returning them), naming it something more explicit
>>>>> would
>>>>>>>>>> help
>>>>>>>>>>> avoid confusion.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <
>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi
>>>>>>>>>>>> 
>>>>>>>>>>>> Good point with select/epoll, however I do not see how they
>>>>> couldn’t
>>>>>>>> be
>>>>>>>>>>>> with Flink if we would like single task in Flink to be
>>>>>> single-threaded
>>>>>>>>>> (and
>>>>>>>>>>>> I believe we should pursue this goal). If your connector blocks
>>>> on
>>>>>>>>>>>> `select`, then it can not process/handle control messages from
>>>>>> Flink,
>>>>>>>>>> like
>>>>>>>>>>>> checkpoints, releasing resources and potentially output flushes.
>>>>>> This
>>>>>>>>>> would
>>>>>>>>>>>> require tight integration between connector and Flink’s main
>>>> event
>>>>>>>>>>>> loop/selects/etc.
>>>>>>>>>>>> 
>>>>>>>>>>>> Looking at it from other perspective. Let’s assume that we have
>>>> a
>>>>>>>>>>>> connector implemented on top of `select`/`epoll`. In order to
>>>>>>>> integrate
>>>>>>>>>> it
>>>>>>>>>>>> with Flink’s checkpointing/flushes/resource releasing it will
>>>> have
>>>>>> to
>>>>>>>> be
>>>>>>>>>>>> executed in separate thread one way or another. At least if our
>>>>> API
>>>>>>>> will
>>>>>>>>>>>> enforce/encourage non blocking implementations with some kind of
>>>>>>>>>>>> notifications (`isBlocked()` or `notify()` callback), some
>>>>>> connectors
>>>>>>>>>> might
>>>>>>>>>>>> skip one layer of wapping threads.
>>>>>>>>>>>> 
>>>>>>>>>>>> Btw, I don’t know if you understand my point. Having only
>>>> `poll()`
>>>>>> and
>>>>>>>>>>>> `take()` is not enough for single threaded task. If our source
>>>>>>>> interface
>>>>>>>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?>
>>>>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task
>>>>>> that
>>>>>>>>>> both
>>>>>>>>>>>> reads the data from the source connector and can also react to
>>>>>> system
>>>>>>>>>>>> events. Ok, non blocking `poll()` would allow that, but with
>>>> busy
>>>>>>>>>> looping.
>>>>>>>>>>>> 
>>>>>>>>>>>> Piotrek
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket.qin@xxxxxxxxx>
>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> But I don’t see a reason why we should expose both blocking
>>>>>> `take()`
>>>>>>>>>> and
>>>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone
>>>> (Flink
>>>>>>>>>> engine
>>>>>>>>>>>> or
>>>>>>>>>>>>> connector) would have to do the same busy
>>>>>>>>>>>>>> looping anyway and I think it would be better to have a
>>>> simpler
>>>>>>>>>>>> connector
>>>>>>>>>>>>> API (that would solve our problems) and force connectors to
>>>>> comply
>>>>>>>> one
>>>>>>>>>>>> way
>>>>>>>>>>>>> or another.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If we let the block happen inside the connector, the blocking
>>>>> does
>>>>>>>> not
>>>>>>>>>>>> have
>>>>>>>>>>>>> to be a busy loop. For example, to do the block waiting
>>>>>> efficiently,
>>>>>>>>>> the
>>>>>>>>>>>>> connector can use java NIO selector().select which relies on OS
>>>>>>>> syscall
>>>>>>>>>>>>> like epoll[1] instead of busy looping. But if Flink engine
>>>> blocks
>>>>>>>>>> outside
>>>>>>>>>>>>> the connector, it pretty much has to do the busy loop. So if
>>>>> there
>>>>>> is
>>>>>>>>>>>> only
>>>>>>>>>>>>> one API to get the element, a blocking getNextElement() makes
>>>>> more
>>>>>>>>>> sense.
>>>>>>>>>>>>> In any case, we should avoid ambiguity. It has to be crystal
>>>>> clear
>>>>>>>>>> about
>>>>>>>>>>>>> whether a method is expected to be blocking or non-blocking.
>>>>>>>> Otherwise
>>>>>>>>>> it
>>>>>>>>>>>>> would be very difficult for Flink engine to do the right thing
>>>>> with
>>>>>>>> the
>>>>>>>>>>>>> connectors. At the first glance at getCurrent(), the expected
>>>>>>>> behavior
>>>>>>>>>> is
>>>>>>>>>>>>> not quite clear.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> That said, I do agree that functionality wise, poll() and
>>>> take()
>>>>>> kind
>>>>>>>>>> of
>>>>>>>>>>>>> overlap. But they are actually not quite different from
>>>>>>>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the
>>>> only
>>>>>>>>>>>>> difference is that poll() also returns the next record if it is
>>>>>>>>>>>> available.
>>>>>>>>>>>>> But I agree that the isBlocked() + getNextElement() is more
>>>>>> flexible
>>>>>>>> as
>>>>>>>>>>>>> users can just check the record availability, but not fetch the
>>>>>> next
>>>>>>>>>>>>> element.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In case of thread-less readers with only non-blocking
>>>>>> `queue.poll()`
>>>>>>>>>> (is
>>>>>>>>>>>>> that really a thing? I can not think about a real
>>>> implementation
>>>>>> that
>>>>>>>>>>>>> enforces such constraints)
>>>>>>>>>>>>> Right, it is pretty much a syntax sugar to allow user combine
>>>> the
>>>>>>>>>>>>> check-and-take into one method. It could be achieved with
>>>>>>>> isBlocked() +
>>>>>>>>>>>>> getNextElement().
>>>>>>>>>>>>> 
>>>>>>>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <
>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> With my proposal, both of your examples would have to be
>>>> solved
>>>>> by
>>>>>>>> the
>>>>>>>>>>>>>> connector and solution to both problems would be the same:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return
>>>>>>>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking
>>>>>>>> fashion
>>>>>>>>>>>> (or
>>>>>>>>>>>>>> semi blocking with return of control from time to time to
>>>> allow
>>>>>> for
>>>>>>>>>>>>>> checkpointing, network flushing and other resource management
>>>>>> things
>>>>>>>>>> to
>>>>>>>>>>>>>> happen in the same main thread). In other words, exactly how
>>>> you
>>>>>>>> would
>>>>>>>>>>>>>> implement `take()` method or how the same source connector
>>>> would
>>>>>> be
>>>>>>>>>>>>>> implemented NOW with current source interface. The difference
>>>>> with
>>>>>>>>>>>> current
>>>>>>>>>>>>>> interface would be only that main loop would be outside of the
>>>>>>>>>>>> connector,
>>>>>>>>>>>>>> and instead of periodically releasing checkpointing lock,
>>>>>>>> periodically
>>>>>>>>>>>>>> `return null;` or `return Optional.empty();` from
>>>>>>>> `getNextElement()`.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In case of thread-less readers with only non-blocking
>>>>>> `queue.poll()`
>>>>>>>>>> (is
>>>>>>>>>>>>>> that really a thing? I can not think about a real
>>>> implementation
>>>>>>>> that
>>>>>>>>>>>>>> enforces such constraints), we could provide a wrapper that
>>>>> hides
>>>>>>>> the
>>>>>>>>>>>> busy
>>>>>>>>>>>>>> looping. The same applies how to solve forever blocking
>>>> readers
>>>>> -
>>>>>> we
>>>>>>>>>>>> could
>>>>>>>>>>>>>> provider another wrapper running the connector in separate
>>>>> thread.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> But I don’t see a reason why we should expose both blocking
>>>>>> `take()`
>>>>>>>>>> and
>>>>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone
>>>>> (Flink
>>>>>>>>>>>> engine or
>>>>>>>>>>>>>> connector) would have to do the same busy looping anyway and I
>>>>>> think
>>>>>>>>>> it
>>>>>>>>>>>>>> would be better to have a simpler connector API (that would
>>>>> solve
>>>>>>>> our
>>>>>>>>>>>>>> problems) and force connectors to comply one way or another.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket.qin@xxxxxxxxx>
>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Piotr,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I might have misunderstood you proposal. But let me try to
>>>>>> explain
>>>>>>>> my
>>>>>>>>>>>>>>> concern. I am thinking about the following case:
>>>>>>>>>>>>>>> 1. a reader has the following two interfaces,
>>>>>>>>>>>>>>> boolean isBlocked()
>>>>>>>>>>>>>>> T getNextElement()
>>>>>>>>>>>>>>> 2. the implementation of getNextElement() is non-blocking.
>>>>>>>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any
>>>>> internal
>>>>>>>>>>>> thread.
>>>>>>>>>>>>>>> For example, it might just delegate the getNextElement() to a
>>>>>>>>>>>>>> queue.poll(),
>>>>>>>>>>>>>>> and isBlocked() is just queue.isEmpty().
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> How can Flink efficiently implement a blocking reading
>>>> behavior
>>>>>>>> with
>>>>>>>>>>>> this
>>>>>>>>>>>>>>> reader? Either a tight loop or a backoff interval is needed.
>>>>>>>> Neither
>>>>>>>>>> of
>>>>>>>>>>>>>>> them is ideal.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Now let's say in the reader mentioned above implements a
>>>>> blocking
>>>>>>>>>>>>>>> getNextElement() method. Because there is no internal thread
>>>> in
>>>>>> the
>>>>>>>>>>>>>> reader,
>>>>>>>>>>>>>>> after isBlocked() returns false. Flink will still have to
>>>> loop
>>>>> on
>>>>>>>>>>>>>>> isBlocked() to check whether the next record is available. If
>>>>> the
>>>>>>>>>> next
>>>>>>>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min.
>>>> You
>>>>>>>> have
>>>>>>>>>>>>>>> probably noticed that in this case, even isBlocked() returns
>>>> a
>>>>>>>>>> future,
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> future() will not be completed if Flink does not call some
>>>>> method
>>>>>>>>>> from
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> reader, because the reader has no internal thread to complete
>>>>>> that
>>>>>>>>>>>> future
>>>>>>>>>>>>>>> by itself.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Due to the above reasons, a blocking take() API would allow
>>>>> Flink
>>>>>>>> to
>>>>>>>>>>>> have
>>>>>>>>>>>>>>> an efficient way to read from a reader. There are many ways
>>>> to
>>>>>> wake
>>>>>>>>>> up
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> blocking thread when checkpointing is needed depending on the
>>>>>>>>>>>>>>> implementation. But I think the poll()/take() API would also
>>>>> work
>>>>>>>> in
>>>>>>>>>>>> that
>>>>>>>>>>>>>>> case.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <
>>>>>>>>>> piotr@xxxxxxxxxxxxxxxxx
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> a)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more
>>>>>> questions.
>>>>>>>>>> 21,
>>>>>>>>>>>>>> Is
>>>>>>>>>>>>>>>> a method isReady() with boolean as a return value
>>>>>>>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing
>>>>> in
>>>>>>>> what
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> supposed to be returned when the future is completed. 22. if
>>>>>>>>>>>>>>>>> the implementation of isBlocked() is optional, how do the
>>>>>> callers
>>>>>>>>>>>> know
>>>>>>>>>>>>>>>> whether the method is properly implemented or not?
>>>>>>>>>>>>>>>>> Does not implemented mean it always return a completed
>>>>> future?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an
>>>>> equivalent
>>>>>>>> to
>>>>>>>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some
>>>>> kind
>>>>>>>> of a
>>>>>>>>>>>>>>>> listener/callback that notifies about presence of next
>>>>> element.
>>>>>>>>>> There
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a
>>>> minimal
>>>>>> two
>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> logic:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1. Future is completed - we have more data
>>>>>>>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we
>>>>>>>>>> might/we
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> have in the future
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit
>>>>> more
>>>>>>>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()`
>>>> spam.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> b)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method
>>>>> like
>>>>>>>>>>>>>> `getNext`
>>>>>>>>>>>>>>>> the `getNext` would need return a
>>>>>>>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add
>>>>>> timestamp
>>>>>>>>>> to
>>>>>>>>>>>>>>>> every element. IMO, this is not so memory friendly
>>>>>>>>>>>>>>>>> so I prefer this design.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate
>>>> why
>>>>>>>>>> having a
>>>>>>>>>>>>>>>> separate `advance()` help?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> c)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two
>>>>>>>> separate
>>>>>>>>>>>>>>>> methods: poll and take? Which one of them should be called
>>>> and
>>>>>>>> which
>>>>>>>>>>>>>>>> implemented? What’s the benefit of having those methods
>>>>> compared
>>>>>>>> to
>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or
>>>>>>>>>> whatever
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> name it) with following contract:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> CompletableFuture<?> isBlocked();
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>> Return next element - will be called only if `isBlocked()`
>>>> is
>>>>>>>>>>>> completed.
>>>>>>>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s
>>>>>>>>>> impossible
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> you just don’t need the effort, you can block in this
>>>> method.
>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>> T getNextElement();
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I mean, if the connector is implemented non-blockingly,
>>>> Flink
>>>>>>>> should
>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new
>>>>>>>>>>>>>>>> NotImplementedException()`. Implementing both of them and
>>>>>>>> providing
>>>>>>>>>>>>>> both of
>>>>>>>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them
>>>>>> into
>>>>>>>> a
>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>> method call that should preferably (but not necessarily need
>>>>> to)
>>>>>>>> be
>>>>>>>>>>>>>>>> non-blocking? It’s not like we are implementing general
>>>>> purpose
>>>>>>>>>>>> `Queue`,
>>>>>>>>>>>>>>>> which users might want to call either of `poll` or `take`.
>>>> We
>>>>>>>> would
>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we
>>>>> have
>>>>>> no
>>>>>>>>>>>>>> choice,
>>>>>>>>>>>>>>>> but to call it and block on it.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> d)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking
>>>> source
>>>>>> is
>>>>>>>>>> very
>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be
>>>>>> another
>>>>>>>>>> way
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if
>>>>>> every
>>>>>>>>>>>>>> advance
>>>>>>>>>>>>>>>>> call return a Future.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I didn’t want to mention this, to not clog my initial
>>>>> proposal,
>>>>>>>> but
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>> is a simple solution for the problem:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> public interface SplitReader {
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> (…)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED =
>>>>>>>>>>>>>>>> CompletableFuture.completedFuture(null);
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>> * Returns a future that will be completed when the page
>>>> source
>>>>>>>>>>>>>> becomes
>>>>>>>>>>>>>>>> * unblocked.  If the page source is not blocked, this method
>>>>>>>>>> should
>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>> * {@code NOT_BLOCKED}.
>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked()
>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>  return NOT_BLOCKED;
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> If we are blocked and we are waiting for the IO, then
>>>>> creating a
>>>>>>>> new
>>>>>>>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not
>>>>>>>> blocked
>>>>>>>>>>>>>> sources
>>>>>>>>>>>>>>>> returning a static `NOT_BLOCKED` constant  should also solve
>>>>> the
>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> One more remark, non-blocking sources might be a necessity
>>>> in
>>>>> a
>>>>>>>>>> single
>>>>>>>>>>>>>>>> threaded model without a checkpointing lock. (Currently when
>>>>>>>> sources
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire
>>>> it
>>>>>>>> again
>>>>>>>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for
>>>>>>>> checkpoints
>>>>>>>>>> to
>>>>>>>>>>>>>>>> happen when source is idling. In that case either `notify()`
>>>>> or
>>>>>> my
>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket.qin@xxxxxxxxx>
>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> The iterator-like API was also the first thing that came to
>>>>> me.
>>>>>>>> But
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the
>>>>>> stream
>>>>>>>>>> has
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> ended", but means "the next record is ready", which is
>>>>>>>> repurposing
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>> known meaning of hasNext(). If we follow the
>>>> hasNext()/next()
>>>>>>>>>>>> pattern,
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> additional isNextReady() method to indicate whether the
>>>> next
>>>>>>>> record
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> ready seems more intuitive to me.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of
>>>>> isDone()
>>>>>>>> is
>>>>>>>>>>>>>> needed
>>>>>>>>>>>>>>>>> to indicate whether the stream has ended or not.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern,
>>>>>>>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader
>>>>>>>>>>>>>> implementation.
>>>>>>>>>>>>>>>>> When I am implementing a reader, I could have a couple of
>>>>>>>> choices:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> - A thread-less reader that does not have any internal
>>>>> thread.
>>>>>>>>>>>>>>>>> - When poll() is called, the same calling thread will
>>>>> perform a
>>>>>>>>>> bunch
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> IO asynchronously.
>>>>>>>>>>>>>>>>> - When take() is called, the same calling thread will
>>>>> perform a
>>>>>>>>>>>>>>>> bunch
>>>>>>>>>>>>>>>>> of IO and wait until the record is ready.
>>>>>>>>>>>>>>>>> - A reader with internal threads performing network IO and
>>>>> put
>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>> into a buffer.
>>>>>>>>>>>>>>>>> - When poll() is called, the calling thread simply reads
>>>> from
>>>>>>>> the
>>>>>>>>>>>>>>>>> buffer and return empty result immediately if there is no
>>>>>>>> record.
>>>>>>>>>>>>>>>>> - When take() is called, the calling thread reads from the
>>>>>>>> buffer
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> block waiting if the buffer is empty.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady()
>>>>> API,
>>>>>>>> it
>>>>>>>>>> is
>>>>>>>>>>>>>>>> less
>>>>>>>>>>>>>>>>> intuitive for the reader developers to write the
>>>> thread-less
>>>>>>>>>> pattern.
>>>>>>>>>>>>>>>>> Although technically speaking one can still do the
>>>>> asynchronous
>>>>>>>> IO
>>>>>>>>>> to
>>>>>>>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit
>>>> and
>>>>>>>> seems
>>>>>>>>>>>>>>>>> somewhat hacky.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <
>>>> thw@xxxxxxxxxx>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Couple more points regarding discovery:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> The proposal mentions that discovery could be outside the
>>>>>>>>>> execution
>>>>>>>>>>>>>>>> graph.
>>>>>>>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I
>>>>>> believe
>>>>>>>>>> that
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> also need to be the case in the future, even when
>>>> discovery
>>>>>> and
>>>>>>>>>>>>>> reading
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> split between different tasks.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the
>>>>>>>> relationship
>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly
>>>>>>>>>> distributed
>>>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>>>>> readers in certain situations. An example was mentioned
>>>>> here:
>>>>>>>>>>>>>>>>>> 
>>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <
>>>> thw@xxxxxxxxxx
>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks for getting the ball rolling on this!
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be
>>>>> closed
>>>>>>>> and
>>>>>>>>>> go
>>>>>>>>>>>>>>>> away.
>>>>>>>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing
>>>>>> shards
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> closed and replaced with a new shard).
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive
>>>>>> approach
>>>>>>>>>>>> would
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking,
>>>> caller
>>>>>>>>>>>> retrieves
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>> records when available). The current Kinesis API requires
>>>>> the
>>>>>>>> use
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> threads. But that can be internal to the split reader and
>>>>>> does
>>>>>>>>>> not
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> be a source API concern. In fact, that's what we are
>>>>> working
>>>>>> on
>>>>>>>>>>>> right
>>>>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>>>> as improvement to the existing consumer: Each shard
>>>>> consumer
>>>>>>>>>> thread
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the
>>>>>>>> queue(s).
>>>>>>>>>>>> It
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> The proposed SplitReader interface would fit the
>>>>> thread-less
>>>>>> IO
>>>>>>>>>>>>>> model.
>>>>>>>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new
>>>>> element
>>>>>>>>>>>>>> (hasNext)
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the
>>>> meta
>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer
>>>> a
>>>>>>>>>> timeout
>>>>>>>>>>>>>>>>>> option,
>>>>>>>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the
>>>>>> other
>>>>>>>>>>>>>> hand, a
>>>>>>>>>>>>>>>>>>> caller processing multiple splits may want to cycle
>>>> through
>>>>>>>> fast,
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> process elements of other splits as soon as they become
>>>>>>>>>> available.
>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>> nice
>>>>>>>>>>>>>>>>>>> thing is that this "split merge" logic can now live in
>>>>> Flink
>>>>>>>> and
>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> optimized and shared between different sources.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <
>>>>>> guowei.mgw@xxxxxxxxx
>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking
>>>>>> source
>>>>>>>> is
>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may
>>>> be
>>>>>>>>>> another
>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly
>>>>> if
>>>>>>>>>> every
>>>>>>>>>>>>>>>>>> advance
>>>>>>>>>>>>>>>>>>>> call return a Future.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> public interface Listener {
>>>>>>>>>>>>>>>>>>>> public void notify();
>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> public interface SplitReader() {
>>>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>>>> * When there is no element temporarily, this will return
>>>>>>>>>> false.
>>>>>>>>>>>>>>>>>>>> * When elements is available again splitReader can call
>>>>>>>>>>>>>>>>>>>> listener.notify()
>>>>>>>>>>>>>>>>>>>> * In addition the frame would check `advance`
>>>>> periodically .
>>>>>>>>>>>>>>>>>>>> * Of course advance can always return true and ignore
>>>> the
>>>>>>>>>>>>>>>> listener
>>>>>>>>>>>>>>>>>>>> argument for simplicity.
>>>>>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>>>>>> public boolean advance(Listener listener);
>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 2.  The FLIP tells us very clearly that how to create
>>>> all
>>>>>>>> Splits
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no
>>>>>> strategy
>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think
>>>>> we
>>>>>>>>>> could
>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> Enum to let user to choose.
>>>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>>>>>>>>>>>>>>> Location,
>>>>>>>>>>>>>>>>>>>> Workload,
>>>>>>>>>>>>>>>>>>>> Random,
>>>>>>>>>>>>>>>>>>>> Average
>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one
>>>> method
>>>>>> like
>>>>>>>>>>>>>>>> `getNext`
>>>>>>>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp`
>>>>>>>> because
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO,
>>>> this
>>>>> is
>>>>>>>> not
>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>> memory
>>>>>>>>>>>>>>>>>>>> friendly so I prefer this design.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> 于2018年11月1日周四
>>>>>>>>>> 下午6:08写道:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite
>>>> a
>>>>>> lot
>>>>>>>> of
>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of
>>>>>>>> having a
>>>>>>>>>>>>>>>> method:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> boolean advance() throws IOException;
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I would replace it with
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> /*
>>>>>>>>>>>>>>>>>>>>> * Return a future, which when completed means that
>>>> source
>>>>>> has
>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>> and getNext() will not block.
>>>>>>>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking
>>>> connectors,
>>>>>>>>>> please
>>>>>>>>>>>>>>>>>>>>> implement this method appropriately.
>>>>>>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>>>>>>>>>>>>>>> return CompletableFuture.completedFuture(null);
>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Couple of arguments:
>>>>>>>>>>>>>>>>>>>>> 1. I don’t understand the division of work between
>>>>>>>> `advance()`
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which,
>>>> especially
>>>>>> for
>>>>>>>>>>>>>>>> connectors
>>>>>>>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when
>>>>> should
>>>>>>>> you
>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>>> `advance` and when `getCurrent()`.
>>>>>>>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will
>>>>>> allow
>>>>>>>>>> us
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and
>>>>>> more
>>>>>>>>>>>>>>>>>> efficiently
>>>>>>>>>>>>>>>>>>>>> handle large number of blocked threads, without busy
>>>>>> waiting.
>>>>>>>>>>>> While
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive
>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>> implementations can be always blocking.
>>>>>>>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread
>>>>> pool
>>>>>>>> of
>>>>>>>>>>>> task
>>>>>>>>>>>>>>>>>>>>> executors, instead of one thread per task.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
>>>>>>>>>> aljoscha@xxxxxxxxxx
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new
>>>>> source
>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>>>>> that we have discussed for so long I finally created a
>>>>>> FLIP:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing
>>>>>>>>>> work/discussion
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis
>>>>>> source
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>> this would enable generic implementation of event-time
>>>>>>>>>> alignment
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time
>>>>>>>>>> alignment
>>>>>>>>>>>>>>>> part,
>>>>>>>>>>>>>>>>>>>>> especially the part about information sharing between
>>>>>>>>>> operations
>>>>>>>>>>>>>> (I'm
>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> calling it state sharing because state has a special
>>>>>> meaning
>>>>>>>> in
>>>>>>>>>>>>>>>> Flink).
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Please discuss away!
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>>