The distinct operator is implemented as a groupBy(distinctKeys) and a ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the ReduceFunction.
Flink does not maintain a deterministic order because it is quite expensive in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles
There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all (non-distinct-key) fields to determine which record to return.
I would go for the second approach because it is more efficient (no need to fully sort before the combiner).