[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Dataset.distinct - Question on deterministic results

Hi Will,

The distinct operator is implemented as a groupBy(distinctKeys) and a ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the ReduceFunction.

Flink does not maintain a deterministic order because it is quite expensive in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles

There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all (non-distinct-key) fields to determine which record to return.

I would go for the second approach because it is more efficient (no need to fully sort before the combiner).

Best, Fabian

2018-08-09 18:12 GMT+02:00 Will Bastian <will.s.bastian@xxxxxxxxx>:
I'm operating on a data set with some challenges to overcome. They are:
  1. There is possibility for multiple entries for a single key
  2. For a single key, there may be multiple unique value-tuples
For example
key, val1, val2, val3
1,      0,    0,    0
1,      0,    0,    0
1,      1,    0,    0
2,      1,    1,    1
2,      1,    1,    1
2,      1,    1,    0
1,      0,    0,    0

I've found when executing mySet.distinct(_.key) on the above, that my final results suggest distinct isn't always pulling the same record/value-tuple on every run.

Fully understanding that the use of distinct I've outlined above isn't optimal (we don't know, or care which value-tuple we get, we just want it to be consistent on each run), I wanted to validate whether what I believe I'm observing is accurate. Specifically, in this example is Flink reducing by key with no concern for value, and we can expect the possibility that we may pull different instances back on each distinct call?