[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: ConnectedIterativeStreams and processing state 1.4.2


Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know 

Med venlig hilsen / Best regards
Lasse Nedergaard

Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>:


Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?


On 1 May 2018, at 10:20, Lasse Nedergaard <lassenedergaard@xxxxxxxxx> wrote:


I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. 
Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType

Form Flink source
public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
and both streams need to be keyed before state are assigned to the operator.
Any ideas how to workaround this problem?

My sudo code is as below.

IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
.keyBy(obj -> obj.getkey))
.iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));

DataStream<ReportMessageBase> enrichedStream = iteration
.process(new EnrichFromState());

DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
.filter(obj -> obj.enriched);

EnrichService EnrichService = new EnrichService();
DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);

DataStream<EnrichData> newEnrich = enrichedFromApi
.map(obj -> {

EnrichData newData = new EnrichData();
newData.xx = obj.xx();

return newData;
.keyBy(obj -> obj.