osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Flink 1.7 doesn't work with Kafka Table Source Descriptors


Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.
package org.monitoring.stream.analytics;


import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.WindowedTable;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.Json;

public class FlinkTableSourceLatest {

	public static void main(String[] args) throws Exception {
        // Read parameters from command line
		final ParameterTool params = ParameterTool.fromPropertiesFile("application.properties");

        if(params.getNumberOfParameters() < 4) {
            System.out.println("\nUsage: FlinkReadKafka " +
                               "--read-topic <topic> " +
                               "--write-topic <topic> " +
                               "--bootstrap.servers <kafka brokers> " +
                               "--group.id <groupid>");
            return;
        }

        // define a schema
      
        // setup streaming environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.enableCheckpointing(300000); // 300 seconds
        env.getConfig().setGlobalJobParameters(params);

        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

       
        
        tableEnv
        .connect(
          new Kafka()
            .version("0.11")
           .topic("testout")
           .property("zookeeper.connect", "localhost:2181")
           .property("bootstrap.servers", "localhost:9092")
           .property("group.id", "analytics")
           .startFromLatest())
        
        .withFormat(
          new Json()
           /* .jsonSchema("{\n" + 
            	"  \"type\": \"object\",\n" + 
            	"  \"properties\": {\n" + 
            	"    \"food\": {\n" + 
            	"      \"type\": \"string\"\n" + 
            	"    },\n" + 
            	"    \"price\": {\n" + 
            	"      \"type\": \"integer\"\n" + 
            	"    },\n" + 
            	"    \"processingTime\": {\n" + 
            	"      \"type\": \"integer\"\n" + 
            	"    }\n" + 
            	"  }\n" + 
            	"}")*/
            .failOnMissingField(false)
            .deriveSchema()
            )
        .withSchema(
          new Schema()
            .field("food", "VARCHAR")
            .field("price", "DECIMAL")
            .field("processingTime", "TIMESTAMP").proctime())
        .inAppendMode()
        
        .registerTableSource("foodTable");

        String sql ="SELECT food FROM foodTable WHERE processingTime BETWEEN processingTime - INTERVAL '4' HOUR AND processingTime";

        WindowedTable windowedTable = tableEnv.scan("foodTable").window(Tumble.over("50.minutes").on("processingTime").as("userActionWindow")); 
        Table result = tableEnv.sqlQuery(sql);
        System.out.println("======================= "+result.toString());

/*
        // create a partition for the data going into kafka
        FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

        // create new tablesink of JSON to kafka
        KafkaJsonTableSink kafkaTableSink = new Kafka010JsonTableSink(
                params.getRequired("write-topic"),
                params.getProperties(),
                partition);*/
        
        tableEnv
        .connect(
          new Kafka()
            .version("0.11")
           .topic("testout")
           .property("zookeeper.connect", "localhost:2181")
           .property("bootstrap.servers", "localhost:9092")
           .property("group.id", "analytics")
           .sinkPartitionerRoundRobin() 
           .startFromLatest())
        .withFormat(
          new Json()
           /* .jsonSchema("{\"type\": \"object\",\"properties\": {\n" + 
            	"    \"food\": {\n" + 
            	"      \"type\": \"string\"\n" + 
            	"    }}")*/
            .failOnMissingField(false)
            .deriveSchema())
        .withSchema(
          new Schema()
            .field("food", "VARCHAR"))
        .inAppendMode()
        .registerTableSink("ruleTable");

        
        

        result.insertInto("ruleTable");

        env.execute("FlinkReadWriteKafkaJSON");
    }

}

Attachment: flink-libs.png
Description: PNG image