osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

KafkaProducer with generic (Avro) serialization schema


Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s.
However this serialization schema is not serializable itself. Here is my code for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {

  override def serialize(element: IN): Array[Byte] = {
    val byteArray = new ByteArrayOutputStream()
    val avroSer = AvroOutputStream.binary[IN](byteArray)
    avroSer.write(element)
    avroSer.flush()
    avroSer.close()

    return byteArray.toByteArray
  }
}

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  @transient
  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

  def start() = {
    val testPerson = Person("Test", 100, Address("Test", "Test"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.
      fromCollection(Seq(testPerson)).
      addSink(createKafkaSink())

    env.execute("Flink sample job")
  }


  def createKafkaSink() : RichSinkFunction[Person] = {
    //set some properties
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.01:9092")
    properties.put("zookeeper.connect", "127.0.0.1:2181")

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
  }

}

The code does compile, however it gives the following error on runtime: InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d is not serializable.

I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. 
Anyone knows a solution or workaround?

Thanks in advance!
Wouter