[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Sharing state between subtasks

I'm looking to implement a state sharing mechanism between subtasks (of one
or multiple tasks). Our use case is to align watermarks between subtasks of
one or multiple sources to prevent some data fetchers to race ahead of
others and cause massive state buffering in Flink.

Each subtask would share a small state (probably just a key and couple
longs). The state would be updated periodically (perhaps every 30s). Other
subtasks should see these changes with similar latency. It is essentially a
hash table to which every node contributes a distinct key.

An initial idea was to implement this using ZooKeeper ephemeral nodes. But
since there is no way to read all child nodes in one sweep, state access
becomes very chatty. With lets's say 512 subtasks we would end up with 512
* 512 reads per interval (1 to list children, N-1 to fetch data, per

My next stop will be a group communication mechanism like JGroups or Akka
(following looks like a potential good fit:
https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java). But
before that I wanted to check if others already had a similar need and
possibly experience/implementation to share?

There are probably more use cases related to discovery etc. Perhaps Flink
could provide a state primitive, if there is broader interest in the