[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

FlinkKafkaProducer and Confluent Schema Registry


I would like to use Confluent Schema Registry in my streaming job.
I was able to make it work with the help of generic Kafka producer and FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.  

FlinkKafkaConsumer011<GenericRecord> consumer = new FlinkKafkaConsumer011<>(MY_TOPIC,

ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), kafkaProperties);

My question: is it possible to implement producer logic in the FlinkKafkaProducer to serialize message and store schema id in the Confluent Schema registry?

I don't think this is going to work with the current interface because creation and caching of the schema id in the Confluent Schema Registry is done with the help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all FlinkKafkaProducer constructors have either SerializationSchema or KeyedSerializationSchema (part of Flink's own serialization stack) as one of the parameters.

If my assumption is wrong, could you please provide details of implementation?

Thank you very much,