osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: CoFlatMapFunction with more than two input streams


Thank you Xingcan.

Regarding that Either, I still see the need to do TypeCasting/CaseClass matching. Could you please help give a look?

val transformed = dog
            .union(cat)
            .connect(transformer)
            .keyBy(r => r.name, r2 => r2.name)
            .process(new TransformerCoProcessFunction)
            .split(_ match {
               case Right(d) => List("dog")
               case Left(c) => List("cat")
               case _ => List("")
            })

val transformed_dog = transformed.select("dog").map(_ match {
               case Right(d) => d
               case _ => NON_EXIST_DOG
            })
val transformed_cat = transformed.select("cat").map(_ match {
               case Left(c) => c
               case _ => NON_EXIST_CAT
            })

Thanks!

Averell



Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.