OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to implement @SplitRestriction for Splittable DoFn


Hi all,

I'm trying to use ReplicateFn mentioned in this doc in my pipeline to speed up a nested for loop. The use case is exactly the same as "Counting friends in common (cross join by key)"  section. However, I have trouble to make it work with beam 2.4.0 SDK.

I'm implementing @SplitRestriction as follows:

@SplitRestriction
public void splitRestriction(A element, OffsetRange range, OutputReceiver<OffsetRange> out) {
  for (final OffsetRange p : range.split(1000, 10)) {
     out.output(p);
  }
}

Dataflow runner throws exception like this:

java.util.NoSuchElementException com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63) com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308) com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294) com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97) com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71) com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61) com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323) com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43) com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48) com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200) com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158) com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137) com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

I also tried the following as suggested by the javadoc but it has errors during pipeline construction.

@SplitRestriction
public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
  return range.split(1000, 10);
}

Without implementing @SplitRestriction, my pipeline can run without any errors. However, I think the SDF is not really splitted by default, which defeats the purpose of improving performance.

Does anyone know if @SplitRestriction is currently supported by Dataflow runner? How can I write a working version with the latest SDK?

Thanks,
Jiayuan