I would recommend against using iterations with windows for that problem at the moment.
Alongside loop scoping and backpressure that will be addressed by FLIP-15  I think you also need the notion of stream supersteps, which is experimental work in progress for now, from my side at least.
Until these features are added on Flink I would recommend trying out gelly-streams , a Flink API for graph stream computation which supports connected components in a single pass.
All you need to do is to convert your stream into edge additions. You can try it and let us know what you think .
thank you for your reply
Actually, I'm doing something very similar to your code. The problem I'm having is that this structure is not generating any loop. For instance, If I print
I only see the initial set of tuples,
the one from updatedLabelsVerticesGroup
(at the end of the first iteration) and nothing more. So, it means that the content of updatedLabelsVerticesGroup is
indeed being correctly assigned to labelsVerticesGroup,
but the loop itself is not happening.
For simplicity sake, here I'm omitting the logic behind the separation of the tuples that need to be fed back to the loop. I do understand that both codes we commented here are expected to loop indefinitely. On the complete version of mine, I
use the JoinFunction, a ProcessAllWindowFunction, a Filter Function and a Map to create a flag that indicates if there was a change on the label of a vertex during the join function, then the ProcessAllWindowFunction to spread this flag to the whole window,
in case any tuple had a change. Finally I filter the tuples by this flag. This whole mechanism is separating the tuples as expected. However, even if I remove this logic from the code, in order to get an infinite loop of the tuples (as we get on the code we've
written in the previous emails), the iteration does not work.
PS. I've been using Flink 1.3.3
Based on the pseudo code. Seems like you are trying to do the loop by yourself and not suing the iterative.map() function.
I think you would need to specify the "map" function in order to use the iterative stream. and there should be a clear definition on
which data is iterative. In this case you have
label & vertices interlacing each other but no specific loop back.
I would suggest something close to the example in , like
labelsVerticesGroup = DataStream<initial_label,
labels = labelsVerticesGroup.map(...)
Is this what you are looking for?