osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Google dataflow] Apache beam 2.4.0 causes exceptions with CombiningState


There's actually been a rename since that notion of CombiningState. It used to be BagState (just blind writes) and CombiningValueState (uses a CombineFn) were both instances of CombiningState (any automatically mergeable thing).

Now the names are BagState (blind writes) and CombiningState (uses a CombineFn) which are instances of GroupingState (automatically mergeable - you might wonder why we didn't call it MergeableState...)

Kenn

On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <relax@xxxxxxxxxx> wrote:
Out of curiosity, what are you using CombiningState for? I believe it is intended for use in merging windows (such as sessions), however those windows are not yet supported with state.

Reuven

On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <achauhan@xxxxxxxxxxxxxx> wrote:

Hi all,

I recently updated my dataflow pipeline to 2.4.0 sdk and found that my stateful DoFn with the following statespec is throwing java.lang.UnsupportedOperationException.

For reference the job information is:

  • job-id: 2018-04-11_12_11_36-1181436984489583563

The same code seems to work correctly i.e. without problems in 2.3.0

@StateId("indexKeys")
        // this is the state spec needed by beam to figure out the state spec / type requirements at runtime
        private final StateSpec<CombiningState<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());

The exception is:

java.lang.UnsupportedOperationException
        java.util.AbstractMap.put(AbstractMap.java:209)
        com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
        com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
        com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
        com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)

The combine fn is:


import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.values.KV;

import java.util.Map;

// this combiner ensures that we keep track of the most value of each key in the map
public class IndexStateCombineFn extends Combine.CombineFn<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>> {
    @Override
    public Map<String, KV<Long, ByteString>> createAccumulator() {
        return Maps.newHashMap();
    }

    @Override
    public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
        String id = input.getKey();
        KV<Long, ByteString> indexKey = input.getValue();
        if (!accumulator.containsKey(id)) {
            accumulator.put(id, indexKey);
        } else {
            KV<Long, ByteString> prevVal = accumulator.get(id);
            if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
                // input is newer than what we have in the map, store it
                accumulator.put(id, indexKey);
            }
        }
        return accumulator;
    }

    @Override
    public Map<String, KV<Long, ByteString>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
        Map<String, KV<Long, ByteString>> merged = null;
        for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
            if (merged == null) {
                merged = accumulator;
            } else {
                for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
                    String indexId = entry.getKey();
                    KV<Long, ByteString> v = entry.getValue();
                    if (!merged.containsKey(indexId)) {
                        merged.put(indexId, v);
                    } else {
                        KV<Long, ByteString> old = merged.get(indexId);
                        if (old.getKey() < v.getKey()) {
                            merged.put(indexId, v);
                        }
                    }
                }
            }
        }
        return merged;
    }

    @Override
    public Map<String, ByteString> extractOutput(Map<String, KV<Long, ByteString>> accumulator) {
        Map<String, ByteString> output = Maps.newHashMapWithExpectedSize(accumulator.size());
        for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
            output.put(entry.getKey(), entry.getValue().getValue());
        }
        return output;
    }
}

The exception seems to point that WindmillStateInternals may be returning an ImmutableMap but I can’t say for sure. Based on the javadoc for addInput, the accumulator should be mutable.

Has anyone else seen this issue?

— Ankur Chauhan

--
You received this message because you are subscribed to the Google Groups "dataflow-feedback" group.
To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com.

--
You received this message because you are subscribed to the Google Groups "dataflow-feedback" group.
To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com.