[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-9460) Redundant output in table & upsert semantics

zhengcanbin created FLINK-9460:

             Summary: Redundant output in table & upsert semantics
                 Key: FLINK-9460
                 URL: https://issues.apache.org/jira/browse/FLINK-9460
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.5.0
            Reporter: zhengcanbin
             Fix For: 1.6.0
         Attachments: image-2018-05-29-11-39-45-698.png, image-2018-05-29-11-51-20-671.png

The output seems incorrect in my table & upsert example, here's the code:
object VerifyUpsert {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val input = env.socketTextStream("localhost", 9099).map { x =>
      val tokens = x.split(",")
      DemoSource(tokens(0), tokens(1), tokens(2))

    tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 'page_id)

    val fieldNames = Array("record_time", "pv", "uv")
    val fieldTypes = Array(Types.STRING, Types.LONG, Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
    tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, MyPrintSink(fieldNames, fieldTypes))

        |INSERT INTO demoSink
        |  SUBSTRING(record_time, 1, 16) as record_time,
        |  count(user_id) as pv,
        |  count(DISTINCT user_id) as uv
        |FROM demoSource
        |GROUP BY SUBSTRING(record_time, 1, 16)


  case class DemoSource(record_time: String, user_id: String, page_id: String)


case class MyPrintSink(var fNames: Array[String], var fTypes: Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {

  override def setKeyFields(keys: Array[String]): Unit = Seq.empty

  override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}

  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)

  override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new PrintSinkFunction())

  override def getFieldNames: Array[String] = fNames

  override def getFieldTypes: Array[TypeInformation[_]] = fTypes

  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
    val copy = MyPrintSink(fNames, fTypes)
    copy.fNames = fieldNames
    copy.fTypes = fieldTypes
when application starts, I type in netcat client one record a time,  below table shows outputs for every input record:

|2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
|2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
|2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
|2018-05-24 21:34:12,0,4|{color:#FF0000}(true,2018-05-24 21:34,2,2){color}
(true,2018-05-24 21:34,4,3)|


when the forth record is consumed, two output records would be printed in sink, obviously the first one record with red color is redundant. I followed the source code and found something wrong with 


I think when (!generateRetraction) && !inputC.change is true, we should not invoke out.collect here.


[~astephan] please look over this

This message was sent by Atlassian JIRA