osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ask for SQL using kafka in Flink


Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema? Yes, there are two names but now I put one name only and I want to define jsonschema.
Rong Rong wrote
Hi Radhya, Can you provide which Flink version you are using? Based on the latest FLINK 1.5 release, Kafka09JsonTableSource takes: /** * Creates a Kafka 0.9 JSON {@link StreamTableSource}. * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param tableSchema The schema of the table. * @param jsonSchema The schema of the JSON messages to decode from Kafka. */ Also, your type definition: TypeInformation typeInfo2 = Types.ROW(... arguments seem to have different length for schema names and types. Thanks, Rong On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <[hidden email]> wrote: > Hi, > > Could anyone help me to solve this problem > > > /Exception in thread "main" java.lang.Error: Unresolved compilation > problem: > The constructor Kafka09JsonTableSource(String, Properties, > TypeInformation) is undefined > / > *--This is the code * > public class FlinkKafkaSQL { > public static void main(String[] args) throws Exception { > // Read parameters from command line > final ParameterTool params = ParameterTool.fromArgs(args); > > if(params.getNumberOfParameters() < 5) { > System.out.println("\nUsage: FlinkReadKafka " + > "--read-topic " + > "--write-topic " + > "--bootstrap.servers " + > "zookeeper.connect" + > "--group.id "); > return; > } > > // setup streaming environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 10000)); > env.enableCheckpointing(300000); // 300 seconds > env.getConfig().setGlobalJobParameters(params); > > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > // specify JSON field names and types > > > TypeInformation typeInfo2 = Types.ROW( > new String[] { "iotdevice", "sensorID" }, > new TypeInformation[] { Types.STRING()} > ); > > // create a new tablesource of JSON from kafka > KafkaJsonTableSource kafkaTableSource = new > Kafka09JsonTableSource( > params.getRequired("read-topic"), > params.getProperties(), > typeInfo2); > > // run some SQL to filter results where a key is not null > String sql = "SELECT sensorID " + > "FROM iotdevice "; > tableEnv.registerTableSource("iotdevice", kafkaTableSource); > Table result = tableEnv.sql(sql); > > // create a partition for the data going into kafka > FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); > > // create new tablesink of JSON to kafka > KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( > params.getRequired("write-topic"), > params.getProperties(), > partition); > > result.writeToSink(kafkaTableSink); > > env.execute("FlinkReadWriteKafkaJSON"); > } > } > > > *This is the dependencies in pom.xml* > > > > org.apache.flink > flink-java > 1.3.0 > > > org.apache.flink > flink-streaming-java_2.11 > 1.3.0 > > > org.apache.flink > flink-clients_2.11 > 1.3.0 > > > org.apache.flink > flink-connector-kafka-0.9 > > 1.3.0 > > > org.apache.flink > flink-table_2.11 > 1.3.0 > > > org.apache.flink > flink-core > 1.3.0 > > > org.apache.flink > flink-streaming- > scala_2.11 > 1.3.0 > > > > > Regards. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >


Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.