[DISCUSS] Change the Keyed partitioning behavior of the Kafka Producer API


A while ago we had found that if you construct a Kafka Producer that it
always uses the  FlinkFixedPartitioner to spread the data across the Kafka
Except when you give it a custom partitioner.

Because we want all our elements to be partitioned by the key of the
records we created this issue and put up a pull request with a
simple FlinkKeyHashPartitioner.


A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does
this by default already, why this change?"

So I did a lot deeper digging to understand how the partitioning decisions
and data flows from the Flink API down into the Kafka producer client code.

My conclusions:
1) The Kafka producer code uses the provided partitioner, if it doesn't
have that it uses the hash of the key, if it doesn't have a key then it
does a round robin distribution.
2) The Flink Kafka producer constructors are available in the variants with
and without a partitioner. Even if you provide a valid key for each record
it will still use the  FlinkFixedPartitioner if no explicit partitioner has
been specified.

Looking at the code (I haven't tried it) you can actually get the desired
behavior without any code changes by using the constructor that requires a
partitioner and there give it a null value.

In my opinion providing a KeyedSerializationSchema is an implicit way of
specifying that you want to use that key to partition the data by.

So to make this a workable situation I see three ways to handle this:
1) We merge something like the partitioner I proposed.
2) We change the constructors that get a KeyedSerializationSchema to use
that key for partitioning.
3) We remove all constructors that have a KeyedSerializationSchema because
the key is never used anyway.

I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward
compatibility is the best solution.

So to clarify the change I propose here:
We change the behavior of the all flink producer constructors that have
a KeyedSerializationSchema parameter and NO partitioner.
The proposed change is that because we HAVE a key and we do NOT have a
partitioner the partitioning is done by the partitioning code that already
exists in the underlying Kafka.

So for the rest of the constructors the behavior remains unchanged:
- With a  NON-Keyed SerializationSchema
- With a provided partitioner

What do you guys think?

Best regards / Met vriendelijke groeten,

Niels Basjes