[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Efficient Stateful Processing with Time-Series Data and Enrichments


I have a two-part question related to processing and storing large amounts of time-series data. The first part is related to the preferred way to keep state on the time-series data in an efficient way, and the second part is about how to further enrich the processed data and feed it back into the state.

For the sake of discussion, let's say that I am tracking tens to hundreds of millions of IoT devices. This could grow but that's what I'm looking at right now. I will receive an initial event from each device, as well as an unknown number of subsequent events. I will need to aggregate together all the events related to one device for some period of time after the initial event, say 1 hour, at which point I can discard the state. After that, I will never hear from that device again. (I'm not actually working with IoT devices, but that is the gist. At any given point in time, I will have millions of active keys, and as some keys expire new keys are added).

The output of my application should contain the full state for a given device, and a new output should be generated every time a new event comes in. This application must be fault tolerant. I am currently checkpointing my state using the RocksDB state backend.

Part 1 of my question is how best to manage this state. This sounds like an excellent use case for State TTL (https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since this is still a pending feature under active discussion, I did some reading about how others have dealt with similar use-cases. What I gleaned boils down to this: naively storing everything in one large MapState keyed by device ID, and using Triggers to clear the state 1 hour after the initial event will lead to far to many Triggers to be efficient. 

An alternate approach is to bucket my devices into a far smaller amount of keys (not in the millions, maybe thousands), and maintain a MapState for each bucket. I can fire a Trigger every minute for every bucket, and iterate over the MapState to clear any state that has past its TTL.

A similar, alternative approach is to use tumbling Windows to achieve the same effect. Every incoming event has a copy of the timestamp of the initial event for that device (think of it as when the device came online), so I can use that for event time, and let the watermarks lag by 1 hour. The devices are bucketed into some fixed amount of keys like above, so I will have a Window for each bucket, for each time slice. The Window has a Trigger that eagerly fires and purges each element, and a ProcessWindowFunction updates a MapState using per-window state, so that when a Window expires I can clear the state. I am currently using this approach, since it uses Flink's own Windowing and per-window state to clear old data, rather than manually doing it with Triggers.

Other than waiting for the State TTL feature, is there a more efficient approach to maintain the aggregate state of all events related to one device, and output this every time a new event arrives?

Part 2 of my question relates to how I can enrich the state I have accumulated before generating outputs. I have some set of enrichments I'd like to do using AsyncFunctions to call out to external services. The issue is some enrichments require data that may never be present on any one event; I need to work with the stream of aggregated data described above to be able to make some of those calls. Furthermore, some enrichments might need the data added by other enrichments. I would like to feed the enriched data back into the state.

This initially sounded like a perfect use case for an IterativeStream, until I tried to use it and realized the only way to enable checkpointing was to force it using a feature that is deprecated in Flink 1.4.2. Is that approach a dead end? If checkpoints will never be supported for IterativeStream, I don't want to explore this route, but it would be nice if checkpointed IterativeStreams are on the roadmap, or at least a possibility.

Now I'm kind of stumped. The only way I can think of aggregating together all the state *before* applying enrichments, and feeding the enriched data back into that state *after* the enrichments is to sink the enriched data to Kafka or something, and then create a source that reads it back and feeds into the operator that keeps the state. That works, but I'd prefer to keep all the data flowing within the Flink application if possible. Are there other approaches to creating feedback loops that play well with fault tolerance and checkpoints?

I appreciate any suggestions related to the two points above.


Mike Urbach

Mike Urbach