[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Sharing state between subtasks

Yes, I think this is the way to go.

This would also go well with a redesign of the source interface that has been floated for a while now. I also created a prototype a while back: https://github.com/aljoscha/flink/tree/refactor-source-interface <https://github.com/aljoscha/flink/tree/refactor-source-interface>. Just as a refresher, the redesign aims at several things:

1. Make partitions/splits explicit in the interface. Currently, the fact that there are file splits or Kafka partitions or Kinesis shards is hidden in the source implementation while it would be beneficial for the system to know of these and to be able to track watermarks for them. Currently, there is a custom implementation for per-partition watermark tracking in the Kafka Consumer that this redesign would obviate.

2. Split split/partition/shard discovery from the reading part. This would allow rebalancing work and again makes the nature of sources more explicit in the interfaces.

3. Go away from the push model to a pull model. The problem with the current source interface is that the source controls the read-loop and has to get the checkpoint lock for emitting elements/updating state. If we get the loop out of the source this leaves more potential for Flink to be clever about reading from sources.

The prototype posted above defines three new interfaces: Source, SplitEnumerator, and SplitReader, along with a naive example and a working Kafka Consumer (with checkpointing, actually).

If we had this source interface, along with a service for propagating watermark information the code that reads form the splits could de-prioritise certain splits and we would get the event-time alignment behaviour for all sources that are implemented using the new interface without requiring special code in each source implementation.

@Elias Do you know if Kafka Consumers do this alignment across multiple consumers or only within one Consumer across the partitions that it reads from.

> On 9. Oct 2018, at 00:55, Elias Levy <fearsome.lucidity@xxxxxxxxx> wrote:
> Kafka Streams handles this problem, time alignment, by processing records
> from the partitions with the lowest timestamp in a best effort basis.
> See KIP-353 for the details.  The same could be done within the Kafka
> source and multiple input stream operators.  I opened FLINK-4558
> <https://issues.apache.org/jira/browse/FLINK-4558> a while ago regarding
> this topic.
> On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier <jgrier@xxxxxxxx.invalid> wrote:
>> I'd be very curious to hear others' thoughts on this..  I would expect many
>> people to have run into similar issues.  I also wonder if anybody has
>> already been working on similar issues.  It seems there is room for some
>> core Flink changes to address this as well and I'm guessing people have
>> already thought about it.