osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Sharing state between subtasks


I'll add to what Thomas already said..  The larger issue driving this is
that when reading from a source with many parallel partitions, especially
when reading lots of historical data (or recovering from downtime and there
is a backlog to read), it's quite common for there to develop an event-time
skew across those partitions.

When doing event-time windowing -- or in fact any event-time driven
processing -- the event time skew across partitions results directly in
increased buffering in Flink and of course the corresponding
state/checkpoint size growth.

As the event-time skew and state size grows larger this can have a major
effect on application performance and in some cases result in a "death
spiral" where the application performance get's worse and worse as the
state size grows and grows.

So, one solution to this problem, outside of core changes in Flink itself,
seems to be to try to coordinate sources across partitions so that they
make progress through event time at roughly the same rate.  In fact if
there is large skew the idea would be to slow or even stop reading from
some partitions with newer data while first reading the partitions with
older data.  Anyway, to do this we need to share state somehow amongst
sub-tasks.

The common sense view of this is the following:  Why would we want to pull
data from a perfectly good buffer (like a filesystem, Kinesis, or Kafka)
into Flink state just to manage and checkpoint it while waiting to be able
to complete event time computations.  The completion of computations is
held up by the partitions with the oldest data so it's of no value to read
the newer data until you've read the old.  It seems much better to leave
the newer data buffered upstream.

I'd be very curious to hear others' thoughts on this..  I would expect many
people to have run into similar issues.  I also wonder if anybody has
already been working on similar issues.  It seems there is room for some
core Flink changes to address this as well and I'm guessing people have
already thought about it.

-Jamie



On Sun, Oct 7, 2018 at 12:58 PM Thomas Weise <thw@xxxxxxxxxx> wrote:

> I'm looking to implement a state sharing mechanism between subtasks (of one
> or multiple tasks). Our use case is to align watermarks between subtasks of
> one or multiple sources to prevent some data fetchers to race ahead of
> others and cause massive state buffering in Flink.
>
> Each subtask would share a small state (probably just a key and couple
> longs). The state would be updated periodically (perhaps every 30s). Other
> subtasks should see these changes with similar latency. It is essentially a
> hash table to which every node contributes a distinct key.
>
> An initial idea was to implement this using ZooKeeper ephemeral nodes. But
> since there is no way to read all child nodes in one sweep, state access
> becomes very chatty. With lets's say 512 subtasks we would end up with 512
> * 512 reads per interval (1 to list children, N-1 to fetch data, per
> subtask).
>
> My next stop will be a group communication mechanism like JGroups or Akka
> (following looks like a potential good fit:
> https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java).
> But
> before that I wanted to check if others already had a similar need and
> possibly experience/implementation to share?
>
> There are probably more use cases related to discovery etc. Perhaps Flink
> could provide a state primitive, if there is broader interest in the
> community?
>
> Thanks,
> Thomas
>