[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
Ask for SQL using kafka in Flink
Hi,
Could anyone help me to solve this problem
/Exception in thread "main" java.lang.Error: Unresolved compilation problem:
The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation<Row>) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
public static void main(String[] args) throws Exception {
// Read parameters from command line
final ParameterTool params = ParameterTool.fromArgs(args);
if(params.getNumberOfParameters() < 5) {
System.out.println("\nUsage: FlinkReadKafka " +
"--read-topic <topic> " +
"--write-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
"zookeeper.connect" +
"--group.id <groupid>");
return;
}
// setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
// specify JSON field names and types
TypeInformation<Row> typeInfo2 = Types.ROW(
new String[] { "iotdevice", "sensorID" },
new TypeInformation<?>[] { Types.STRING()}
);
// create a new tablesource of JSON from kafka
KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
params.getRequired("read-topic"),
params.getProperties(),
typeInfo2);
// run some SQL to filter results where a key is not null
String sql = "SELECT sensorID " +
"FROM iotdevice ";
tableEnv.registerTableSource("iotdevice", kafkaTableSource);
Table result = tableEnv.sql(sql);
// create a partition for the data going into kafka
FlinkFixedPartitioner partition = new FlinkFixedPartitioner();
// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);
result.writeToSink(kafkaTableSink);
env.execute("FlinkReadWriteKafkaJSON");
}
}
*This is the dependencies in pom.xml*
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
Regards.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/