I was trying different ways to implement a moving average (count based, not time based).
The blunt instrument approach is to create a custom FlatMapFunction that keeps track of the last N values.
It seemed like using an AggregateFunction would be most consistent with the Flink API, along the lines of...
This works, but the API for the AggregateFunction (MovingAverageAggregator) feels a bit odd.
Specifically, I want to emit a <key, moving average> result from getResult(), but the key isn’t passed to the createAccumulator() method, nor is it passed to the getResult() method. So in the add() method I check if the accumulator I’ve created has a key set, and if not then I extract the key from the record and set it on the accumulator, so I can use it in the getResult() call.
Is this expected, or am I miss-using the functionality?