[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Datastream[Row] covert to table exception

Hi ,

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:

private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
    (fieldNameList.toArray, fieldTypeList.toArray)

  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL

 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here

Thanks !