OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]# Re: Iterative Stream won't loop

__labelsVerticesGroup__ = DataStream<initial_label, __vertices__>

__labels__ = __labelsVerticesGroup__.map(...)

__updated____LabelsVerticesGroup__ = __vertices__.join(__labels__).where(VertexId).equalTo(VertexId)

__labelsVerticesGroup__.closeWith(__updated____LabelsVerticesGroup____)__

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Based on the pseudo code. Seems like you are trying to do the loop by yourself and not suing the iterative.map() function[1].

I think you would need to specify the "map" function in order to use the iterative stream. and there should be a clear definition on

I think you would need to specify the "map" function in order to use the iterative stream. and there should be a clear definition on

which data is iterative. In this case you have label & vertices interlacing each other but no specific loop back.

I would suggest something close to the example in [1], like

I would suggest something close to the example in [1], like

.keyBy(VertexID)

.window(...)

.min(label);

__vertices__ = __labelsVerticesGroup__.map(...)

.windowAll(...)

.agg(...)

Is this what you are looking for?

--

--

Rong

On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <henrique.colao@xxxxxxxxx> wrote:

Hi,I am trying to implement a connected components algorithm using DataStream. For this algorithm, I'm separating the data by tumbling windows. So, for each window, I'm trying to compute it independently.This algorithm is iterative because the labels (colors) of the vertices need to be propagated. Basically, I need to iterate over the following steps:Input:vertices= Datastream of <VertexId, [list of neighbor vertices], label>Loop:labels=vertices.flatmap (emiting a tupple <VertexID, label> for every vertices.f0 and every element on vertices.f1).keyBy(VertexID).window(...).min(label);updatedVertices=vertices. join(labels).where(VertexId).equalTo(VertexId) .windowAll(...).apply(re-emit originalverticesstream tuples, but keeping the new labels)End loopI am trying to use IterativeStreams to do so. However, despite successfully separating the tuples that need to be fed back to the loop (by using filters and closeWith), the subsequent iterations are not happening. So, what I get is only the first iteration.I suppose this might come from the fact that I'm creating a new stream (labels) based on the original IterativeStream, joining it with the original one (vertices) and only then closing the loop with it.Do you know whether Flink has some limitation in this respect? and if so, would you have a hint about a different approach I could take for this algorithm to avoid this?thank you in advance,Henrique Colao

- Prev by Date:
**Flink Forward SF 2018 Videos** - Next by Date:
**Latency with cross operation on Datasets** - Previous by thread:
**Iterative Stream won't loop** - Next by thread:
**Re: Iterative Stream won't loop** - Index(es):