I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify a table keys in this scenario? I also check the exception stack, and found that the system infer the keys field by
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make the function return value ?
Thanks a lot !!!
var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )