OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ask for SQL using kafka in Flink


Thanks Rong, 

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema. 



Rong Rong wrote
> Hi Radhya,
> 
> Can you provide which Flink version you are using? Based on the latest
> FLINK 1.5 release, Kafka09JsonTableSource takes:
> 
> /**
>  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
>  *
>  * @param topic       Kafka topic to consume.
>  * @param properties  Properties for the Kafka consumer.
>  * @param tableSchema The schema of the table.
>  * @param jsonSchema  The schema of the JSON messages to decode from
> Kafka.
>  */
> 
> Also, your type definition: TypeInformation
> <Row>
>  typeInfo2 = Types.ROW(...
> arguments seem to have different length for schema names and types.
> 
> Thanks,
> Rong
> 
> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;

> radhya.sahal@

> &gt; wrote:
> 
>> Hi,
>>
>> Could anyone help me to solve this problem
>>
>>
>> /Exception in thread "main" java.lang.Error: Unresolved compilation
>> problem:
>>         The constructor Kafka09JsonTableSource(String, Properties,
>> TypeInformation
> <Row>
> ) is undefined
>> /
>> *--This is the code *
>> public class FlinkKafkaSQL {
>>         public static void main(String[] args) throws Exception {
>>             // Read parameters from command line
>>             final ParameterTool params = ParameterTool.fromArgs(args);
>>
>>             if(params.getNumberOfParameters() < 5) {
>>                 System.out.println("\nUsage: FlinkReadKafka " +
>>                                    "--read-topic 
> <topic>
>  " +
>>                                    "--write-topic 
> <topic>
>  " +
>>                                    "--bootstrap.servers 
> <kafka brokers>
>  " +
>>                                    "zookeeper.connect" +
>>                                    "--group.id 
> <groupid>
> ");
>>                 return;
>>             }
>>
>>             // setup streaming environment
>>             StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>> 10000));
>>             env.enableCheckpointing(300000); // 300 seconds
>>             env.getConfig().setGlobalJobParameters(params);
>>
>>             StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>>
>>             // specify JSON field names and types
>>
>>
>>             TypeInformation
> <Row>
>  typeInfo2 = Types.ROW(
>>                     new String[] { "iotdevice", "sensorID" },
>>                     new TypeInformation<?>[] { Types.STRING()}
>>             );
>>
>>             // create a new tablesource of JSON from kafka
>>             KafkaJsonTableSource kafkaTableSource = new
>> Kafka09JsonTableSource(
>>                     params.getRequired("read-topic"),
>>                     params.getProperties(),
>>                     typeInfo2);
>>
>>             // run some SQL to filter results where a key is not null
>>             String sql = "SELECT sensorID " +
>>                          "FROM iotdevice ";
>>             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>>             Table result = tableEnv.sql(sql);
>>
>>             // create a partition for the data going into kafka
>>             FlinkFixedPartitioner partition =  new
>> FlinkFixedPartitioner();
>>
>>             // create new tablesink of JSON to kafka
>>             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>                     params.getRequired("write-topic"),
>>                     params.getProperties(),
>>                     partition);
>>
>>             result.writeToSink(kafkaTableSink);
>>
>>             env.execute("FlinkReadWriteKafkaJSON");
>>         }
>> }
>>
>>
>> *This is the dependencies  in pom.xml*
>>
>>         
> <dependencies>
>>             
> <dependency>
>>                 
> <groupId>
> org.apache.flink
> </groupId>
>>                 
> <artifactId>
> flink-java
> </artifactId>
>>                 
> <version>
> 1.3.0
> </version>
>>             
> </dependency>
>>             
> <dependency>
>>                         
> <groupId>
> org.apache.flink
> </groupId>
>>                         
> <artifactId>
> flink-streaming-java_2.11
> </artifactId>
>>                         
> <version>
> 1.3.0
> </version>
>>                 
> </dependency>
>>                 
> <dependency>
>>                         
> <groupId>
> org.apache.flink
> </groupId>
>>                         
> <artifactId>
> flink-clients_2.11
> </artifactId>
>>                         
> <version>
> 1.3.0
> </version>
>>                 
> </dependency>
>>                 
> <dependency>
>>                         
> <groupId>
> org.apache.flink
> </groupId>
>>                         
> <artifactId>
> flink-connector-kafka-0.9
> </artifactId>
>>
>> 
> <version>
> 1.3.0
> </version>
>>                 
> </dependency>
>>                 
> <dependency>
>>                         
> <groupId>
> org.apache.flink
> </groupId>
>>                         
> <artifactId>
> flink-table_2.11
> </artifactId>
>>                         
> <version>
> 1.3.0
> </version>
>>                 
> </dependency>
>>                 
> <dependency>
>>                         
> <groupId>
> org.apache.flink
> </groupId>
>>                         
> <artifactId>
> flink-core
> </artifactId>
>>                         
> <version>
> 1.3.0
> </version>
>>                 
> </dependency>
>>                 
> <dependency>
>>                         
> <groupId>
> org.apache.flink
> </groupId>
>>                         
> <artifactId>
> flink-streaming-
>> scala_2.11
> </artifactId>
>>                         
> <version>
> 1.3.0
> </version>
>>                 
> </dependency>
>>         
> </dependencies>
>>
>>
>> Regards.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/