I have got an application, which utilizes Beam pipeline - Direct Runner. It contains an unbounded source. I have got a frontend, which manually adds some data into the pipeline with the same timestamp in order to be processed in the same window.
The pipeline runs well, however it eventually runs out of heap space. I have profiled the application and have noticed that there is a hotspot in outputWatermark - holds - keyedHolds. It gets swamped mainly by values keyed by the anonymous StructuralKey 'empty' classes over time. With every request it grows and never gets released.
When I changed the empty structural key to true singleton, it solved a part of this issue, but I have noticed that there is a specific test that ensures that two empty keys (StructuralKey) are not equal so my change would not be valid. When are those empty keys used and when should they be removed in the Direct runner? Is there some mechanism to prevent the inevitable heap out of memory error after few requests?