Re: Sharing state between subtasks
Thanks for the feedback and comments so far.
I want to elaborate more on the need for the shared state and awareness of
watermark alignment in the source implementation. Sources like Kafka and
Kinesis pull from the external system and then emit the records. For
Kinesis, we have multiple consumer threads (one per shard), that fetch from
Kinesis and push the records downstream. Each of those threads logically
have a different watermark. (Note that the Kinesis source in Flink
currently does not have any source watermarking support, we have
implemented that in our own extension at Lyft).
The source needs logic to decide which threads should pause because they
are ahead. That logic needs to take into account other threads and other
subtasks (for which we need the shared state). A watermark aware back
pressure mechanism would be great, but we cannot block the entire source.
We need to only stop reading those shards (or Kafka partitions) that have
gotten too far ahead of others.
On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhueske@xxxxxxxxx> wrote:
> I think the new source interface would be designed to be able to leverage
> shared state to achieve time alignment.
> I don't think this would be possible without some kind of shared state.
> The problem of tasks that are far ahead in time cannot be solved with
> That's because a task cannot choose from which source task it accepts
> events and from which doesn't.
> If it blocks an input, all downstream tasks that are connected to the
> operator are affected. This can easily lead to deadlocks.
> Therefore, all operators need to be able to handle events when they arrive.
> If they cannot process them yet because they are too far ahead in time,
> they are put in state.
> Am Mi., 10. Okt. 2018 um 18:15 Uhr schrieb Elias Levy <
> > On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek <aljoscha@xxxxxxxxxx>
> > wrote:
> > > I think the two things (shared state and new source interface) are
> > > somewhat orthogonal. The new source interface itself alone doesn't
> > > the problem, we would still need some mechanism for sharing the
> > event-time
> > > information between different subtasks. This could be the state sharing
> > > mechanism. Therefore I would say we should not block one on the other
> > > therefore should go ahead with state sharing.
> > >
> > Is that really the case? The reason Thomas gave for the request to share
> > state among subtasks was to implement stream alignment. If streams can
> > aligned, then the given reason for state sharing disappears. Not to say
> > there aren't other situations where state sharing could be useful. It
> > would have been handy in a number of our jobs.
> > Also, it's not clear to me that if sources (and multiple streams
> > were performing time alignment, you'd need some mechanism for sharing
> > even-time information between subtasks. Each source and multiple input
> > operator can perform its own local alignment and back-pressure can take
> > care of squelching sources that are advancing too fast.