osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Google dataflow] Apache beam 2.4.0 causes exceptions with CombiningState


If you are mutating accumulators, perhaps you might blind write the inputs and have the system manage the combining. I'd have to see the body of @ProcessElement to say more.

Kenn

On Mon, Apr 16, 2018 at 3:33 PM Kenneth Knowles <klk@xxxxxxxxxx> wrote:
There's actually been a rename since that notion of CombiningState. It used to be BagState (just blind writes) and CombiningValueState (uses a CombineFn) were both instances of CombiningState (any automatically mergeable thing).

Now the names are BagState (blind writes) and CombiningState (uses a CombineFn) which are instances of GroupingState (automatically mergeable - you might wonder why we didn't call it MergeableState...)

Kenn

On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <relax@xxxxxxxxxx> wrote:
Out of curiosity, what are you using CombiningState for? I believe it is intended for use in merging windows (such as sessions), however those windows are not yet supported with state.

Reuven

On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <achauhan@xxxxxxxxxxxxxx> wrote:

Hi all,

I recently updated my dataflow pipeline to 2.4.0 sdk and found that my stateful DoFn with the following statespec is throwing java.lang.UnsupportedOperationException.

For reference the job information is:

  • job-id: 2018-04-11_12_11_36-1181436984489583563

The same code seems to work correctly i.e. without problems in 2.3.0

@StateId("indexKeys")
        // this is the state spec needed by beam to figure out the state spec / type requirements at runtime
        private final StateSpec<CombiningState<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());

The exception is:

java.lang.UnsupportedOperationException
        java.util.AbstractMap.put(AbstractMap.java:209)
        com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
        com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
        com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
        com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)

The combine fn is:


import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.values.KV;

import java.util.Map;

// this combiner ensures that we keep track of the most value of each key in the map
public class IndexStateCombineFn extends Combine.CombineFn<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>> {
    @Override
    public Map<String, KV<Long, ByteString>> createAccumulator() {
        return Maps.newHashMap();
    }

    @Override
    public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
        String id = input.getKey();
        KV<Long, ByteString> indexKey = input.getValue();
        if (!accumulator.containsKey(id)) {
            accumulator.put(id, indexKey);
        } else {
            KV<Long, ByteString> prevVal = accumulator.get(id);
            if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
                // input is newer than what we have in the map, store it
                accumulator.put(id, indexKey);
            }
        }
        return accumulator;
    }

    @Override
    public Map<String, KV<Long, ByteString>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
        Map<String, KV<Long, ByteString>> merged = null;
        for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
            if (merged == null) {
                merged = accumulator;
            } else {
                for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
                    String indexId = entry.getKey();
                    KV<Long, ByteString> v = entry.getValue();
                    if (!merged.containsKey(indexId)) {
                        merged.put(indexId, v);
                    } else {
                        KV<Long, ByteString> old = merged.get(indexId);
                        if (old.getKey() < v.getKey()) {
                            merged.put(indexId, v);
                        }
                    }
                }
            }
        }
        return merged;
    }

    @Override
    public Map<String, ByteString> extractOutput(Map<String, KV<Long, ByteString>> accumulator) {
        Map<String, ByteString> output = Maps.newHashMapWithExpectedSize(accumulator.size());
        for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
            output.put(entry.getKey(), entry.getValue().getValue());
        }
        return output;
    }
}

The exception seems to point that WindmillStateInternals may be returning an ImmutableMap but I can’t say for sure. Based on the javadoc for addInput, the accumulator should be mutable.

Has anyone else seen this issue?

— Ankur Chauhan

--
You received this message because you are subscribed to the Google Groups "dataflow-feedback" group.
To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com.

--
You received this message because you are subscribed to the Google Groups "dataflow-feedback" group.
To view this discussion on the web visit https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com.