OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to use keyBy on ConnectedStream?


Hi Ishwara,

the `keyBy()` method automatically ensures that records with the same key will be processed by the same instance of a CoFlatMap.

As for the exception, I suppose the types `MessageType1` and `MessageType1` are POJOs which should follow some rules [1]. 
Also, make sure that (1) `property1` and `property2` are not arrays; (2) their types have overridden the `hashCode()` method [2].

Hope that helps,
Xingcan


On May 10, 2018, at 10:43 PM, Ishwara Varnasi <ivarnasi@xxxxxxxxx> wrote:

Hello,
I am using ConnectedStream to process two different types of messages using CoFlatMap. However, I would like to use keyBy on the ConnectedStream such that messages with same value of certain property should always be sent to same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, surprised to see that the return type is not grouped.

ConnectedStreams<MessageType1, MessageTyp2> connect = myDataStream1.connect(myDataStreamOther);
connect = connect.keyBy("property1", "property2");
// property1 is a valid property in MessageTyp1 and property2 is a valid property of MessageType2
However, I get following exception:
Caused by: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com....MessageType1>) cannot be used as key.
How to use keyBy with ConnectedStream and ensure that grouped messages are handled by same instance of CoFlatMap?

thanks
Ishwara Varnasi