OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ask for SQL using kafka in Flink


Yes, I am also looking for examples for Kafka avro table examples in java and command line. Also, Kafka avro table sink is still missing. In addition, once we have Kafka topic, the API should read the schema directly from schema file or schema registry. The way of current API supporting lacks of flexibility, just my own opinion.

Sent from my iPhone

> On Jun 4, 2018, at 14:29, Shuyi Chen <suez1224@xxxxxxxxx> wrote:
> 
> Given the popularity of Flink SQL and Kafka as streaming source, I think we can add some examples of using Kafka[XXX]TableSource in flink-examples/flink-examples-table module. What do you guys think?
> 
> Cheers
> Shuyi
> 
>> On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <twalthr@xxxxxxxxxx> wrote:
>> Hi,
>> 
>> as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You can create table schema from type information see [2].
>> 
>> Regards,
>> Timo
>> 
>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
>> [2] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
>> 
>>> Am 02.06.18 um 18:31 schrieb Radhya Sahal:
>>> 
>>> Thanks Rong,
>>> 
>>> I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?
>>> 
>>> Yes, there are two names but now I put one name only and I want to define
>>> jsonschema.
>>> 
>>> 
>>> 
>>> Rong Rong wrote
>>>> Hi Radhya,
>>>> 
>>>> Can you provide which Flink version you are using? Based on the latest
>>>> FLINK 1.5 release, Kafka09JsonTableSource takes:
>>>> 
>>>> /**
>>>>   * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
>>>>   *
>>>>   * @param topic       Kafka topic to consume.
>>>>   * @param properties  Properties for the Kafka consumer.
>>>>   * @param tableSchema The schema of the table.
>>>>   * @param jsonSchema  The schema of the JSON messages to decode from
>>>> Kafka.
>>>>   */
>>>> 
>>>> Also, your type definition: TypeInformation
>>>> <Row>
>>>>   typeInfo2 = Types.ROW(...
>>>> arguments seem to have different length for schema names and types.
>>>> 
>>>> Thanks,
>>>> Rong
>>>> 
>>>> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
>>>> radhya.sahal@
>>>> &gt; wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Could anyone help me to solve this problem
>>>>> 
>>>>> 
>>>>> /Exception in thread "main" java.lang.Error: Unresolved compilation
>>>>> problem:
>>>>>          The constructor Kafka09JsonTableSource(String, Properties,
>>>>> TypeInformation
>>>> <Row>
>>>> ) is undefined
>>>>> /
>>>>> *--This is the code *
>>>>> public class FlinkKafkaSQL {
>>>>>          public static void main(String[] args) throws Exception {
>>>>>              // Read parameters from command line
>>>>>              final ParameterTool params = ParameterTool.fromArgs(args);
>>>>> 
>>>>>              if(params.getNumberOfParameters() < 5) {
>>>>>                  System.out.println("\nUsage: FlinkReadKafka " +
>>>>>                                     "--read-topic
>>>> <topic>
>>>>   " +
>>>>>                                     "--write-topic
>>>> <topic>
>>>>   " +
>>>>>                                     "--bootstrap.servers
>>>> <kafka brokers>
>>>>   " +
>>>>>                                     "zookeeper.connect" +
>>>>>                                     "--group.id
>>>> <groupid>
>>>> ");
>>>>>                  return;
>>>>>              }
>>>>> 
>>>>>              // setup streaming environment
>>>>>              StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> 
>>>>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
>>>>> 10000));
>>>>>              env.enableCheckpointing(300000); // 300 seconds
>>>>>              env.getConfig().setGlobalJobParameters(params);
>>>>> 
>>>>>              StreamTableEnvironment tableEnv =
>>>>> TableEnvironment.getTableEnvironment(env);
>>>>> 
>>>>>              // specify JSON field names and types
>>>>> 
>>>>> 
>>>>>              TypeInformation
>>>> <Row>
>>>>   typeInfo2 = Types.ROW(
>>>>>                      new String[] { "iotdevice", "sensorID" },
>>>>>                      new TypeInformation<?>[] { Types.STRING()}
>>>>>              );
>>>>> 
>>>>>              // create a new tablesource of JSON from kafka
>>>>>              KafkaJsonTableSource kafkaTableSource = new
>>>>> Kafka09JsonTableSource(
>>>>>                      params.getRequired("read-topic"),
>>>>>                      params.getProperties(),
>>>>>                      typeInfo2);
>>>>> 
>>>>>              // run some SQL to filter results where a key is not null
>>>>>              String sql = "SELECT sensorID " +
>>>>>                           "FROM iotdevice ";
>>>>>              tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>>>>>              Table result = tableEnv.sql(sql);
>>>>> 
>>>>>              // create a partition for the data going into kafka
>>>>>              FlinkFixedPartitioner partition =  new
>>>>> FlinkFixedPartitioner();
>>>>> 
>>>>>              // create new tablesink of JSON to kafka
>>>>>              KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>>>>                      params.getRequired("write-topic"),
>>>>>                      params.getProperties(),
>>>>>                      partition);
>>>>> 
>>>>>              result.writeToSink(kafkaTableSink);
>>>>> 
>>>>>              env.execute("FlinkReadWriteKafkaJSON");
>>>>>          }
>>>>> }
>>>>> 
>>>>> 
>>>>> *This is the dependencies  in pom.xml*
>>>>> 
>>>>>          
>>>> <dependencies>
>>>>>              
>>>> <dependency>
>>>>>                  
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                  
>>>> <artifactId>
>>>> flink-java
>>>> </artifactId>
>>>>>                  
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>              
>>>> </dependency>
>>>>>              
>>>> <dependency>
>>>>>                          
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                          
>>>> <artifactId>
>>>> flink-streaming-java_2.11
>>>> </artifactId>
>>>>>                          
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>                  
>>>> </dependency>
>>>>>                  
>>>> <dependency>
>>>>>                          
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                          
>>>> <artifactId>
>>>> flink-clients_2.11
>>>> </artifactId>
>>>>>                          
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>                  
>>>> </dependency>
>>>>>                  
>>>> <dependency>
>>>>>                          
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                          
>>>> <artifactId>
>>>> flink-connector-kafka-0.9
>>>> </artifactId>
>>>>> 
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>                  
>>>> </dependency>
>>>>>                  
>>>> <dependency>
>>>>>                          
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                          
>>>> <artifactId>
>>>> flink-table_2.11
>>>> </artifactId>
>>>>>                          
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>                  
>>>> </dependency>
>>>>>                  
>>>> <dependency>
>>>>>                          
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                          
>>>> <artifactId>
>>>> flink-core
>>>> </artifactId>
>>>>>                          
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>                  
>>>> </dependency>
>>>>>                  
>>>> <dependency>
>>>>>                          
>>>> <groupId>
>>>> org.apache.flink
>>>> </groupId>
>>>>>                          
>>>> <artifactId>
>>>> flink-streaming-
>>>>> scala_2.11
>>>> </artifactId>
>>>>>                          
>>>> <version>
>>>> 1.3.0
>>>> </version>
>>>>>                  
>>>> </dependency>
>>>>>          
>>>> </dependencies>
>>>>> 
>>>>> Regards.
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>>>>> n4.nabble.com/
>>>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> 
>> 
> 
> 
> 
> -- 
> "So you have to trust that the dots will somehow connect in your future."