[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Using a custom DeserializationSchema with Kafka and Python


I'm trying to write a pipeline using the new Python streaming API, which reads from Kafka using FlinkKafkaConsumer010.

This works fine when using an existing deserializer like the SimpleStringSchema but I need to define my own deserializer to process a custom format. I've written a class which extends SimpleStringSchema, but I get an ImportError when trying to use it.

The class is as follows:
from org.apache.flink.api.common.serialization import SimpleStringSchema

class MyCustomKafkaDeserializer(SimpleStringSchema):

def __init__(self):
print "created MyKafkaDeserializer"

def deserialize(self, *args):
# snip
I instantiate the Kafka consumer like this:

consumer = FlinkKafkaConsumer010([configs['kafkaTopic']], MyCustomKafkaDeserializer(), props)

When I start the pipeline I see the message printed in the constructor (so the deserializer is being created) but once env.execute() is called I get this error:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: ImportError: No module named MyCustomKafkaDeserializer

	at org.python.core.Py.ImportError(Py.java:328)

The issue is the same whether MyCustomKafkaDeserializer is defined in the same file as the pipeline, or in another file and imported. It seems that the internals of Flink can't find the class for some reason.

The command I'm using to run the pipeline: 
./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py /Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local

How can I make Flink see the custom deserializer?


Joe Malt

Software Engineering Intern, Stream Processing