[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to use keyBy on ConnectedStream?

I am using ConnectedStream to process two different types of messages using CoFlatMap. However, I would like to use keyBy on the ConnectedStream such that messages with same value of certain property should always be sent to same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, surprised to see that the return type is not grouped.

ConnectedStreams<MessageType1, MessageTyp2> connect = myDataStream1.connect(myDataStreamOther);
connect = connect.keyBy("property1", "property2");
// property1 is a valid property in MessageTyp1 and property2 is a valid property of MessageType2
However, I get following exception:
Caused by: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com....MessageType1>) cannot be used as key.
How to use keyBy with ConnectedStream and ensure that grouped messages are handled by same instance of CoFlatMap?

Ishwara Varnasi