Hi everyone,While integrating portable state with the FlinkRunner, I hit a problem and wanted to get your opinion.
Stateful DoFns require their input to be KV records. The reason for this is that state is isolated by key. The (non-portable) FlinkRunner uses Flink's `keyBy(key)` construct to partition state by key .
That works fine for portable Java pipelines where we enforce the `KV` class for Stateful DoFns. After running tests with the Python SDK, I came to the conclusion that tuples, e.g. `(key, value)` which are used for KV functionality, do not go through the KvCoder but are encoded using a byte array encoder.
How do we infer the key in the Runner from an opaque sequence of bytes? Should we also require the KvCoder for stateful DoFns in the Python SDK?
Thanks, Max https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471