osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to implement @SplitRestriction for Splittable DoFn


Hm which runner is this, does this reproduce with direct runner? The NullPointerException is particularly worrying, I'd like to investigate it.

On Thu, Apr 12, 2018 at 6:49 PM Jiayuan Ma <jiayuanmark@xxxxxxxxx> wrote:
Hi Eugene,

Thanks for your reply. I'm no longer having the previous error. I think that error might be because I didn't do a clean build after upgrading SDK from 2.3.0 to 2.4.0.

However, I'm having other exceptions with my SDF.

java.lang.OutOfMemoryError: unable to create new native thread java.lang.Thread.start0(Native Method) java.lang.Thread.start(Thread.java:714) java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1587) java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729) org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed(OutputAndTimeBoundedSplittableProcessElementInvoker.java:265) org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.tryClaim(RestrictionTracker.java:75)

and

java.lang.NullPointerException org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:96) org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:216) org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:369)

The old pipeline I'm trying to optimize is like

.apply(GroupByKey.create())
.apply(ParDo.of(new DoFn<KV<String, Iterable<Object>>, KV<String, KV<String, String>>> {
@ProcessElement
public void process(...) {
Iterable<Object> groupedValues = context.element().getValue();
               for (final Object o1 : groupedValues) {
                 for (final Object o2 : groupedValues) {
                      ....
                 }
               }
}
}))

The optimization I'm doing right now with SDF is roughly like

@ProcessElement
public void processElement(ProcessContext context, OffsetRangeTracker tracker) {
final Iterable<Object> groupedValues = context.element().getValue();
final Iterator<Object> it = actions.iterator();

long index = tracker.currentRestriction().getFrom();
Iterators.advance(it, Math.toIntExact(index));

for (; it.hasNext() && tracker.tryClaim(index); ++index) {
final Object o1 = it.next();
for (final Object o2 : actions) {
... same old logic ...
}
}
}

@GetInitialRestriction
public OffsetRange getInitialRestriction(final KV<String, Iterable<Object>> groupedValues) {
final long size = Iterables.size(groupedValues.getValue());
return new OffsetRange(0, size);
}

@SplitRestriction
public void splitRestriction(final KV<String, Iterable<Object>> groupedValues,
final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
  final long size = Iterables.size(groupedValues.getValue());
    for (final OffsetRange p : range.split(1000000 / size, 10)) {
receiver.output(p);
}
}

@NewTracker
public OffsetRangeTracker newTracker(OffsetRange range) {
return new OffsetRangeTracker(range);
}

Jiayuan




On Wed, Apr 11, 2018 at 3:54 PM, Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
Hi! This looks concerning. Can you show a full code example please? Does it run in direct runner?

On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <jiayuanmark@xxxxxxxxx> wrote:
Hi all,

I'm trying to use ReplicateFn mentioned in this doc in my pipeline to speed up a nested for loop. The use case is exactly the same as "Counting friends in common (cross join by key)"  section. However, I have trouble to make it work with beam 2.4.0 SDK.

I'm implementing @SplitRestriction as follows:

@SplitRestriction
public void splitRestriction(A element, OffsetRange range, OutputReceiver<OffsetRange> out) {
  for (final OffsetRange p : range.split(1000, 10)) {
     out.output(p);
  }
}

Dataflow runner throws exception like this:

java.util.NoSuchElementException com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63) com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308) com.google.cloud.dataflow.worker.repackaged.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294) com.google.cloud.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:97) com.google.cloud.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:71) com.google.cloud.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:61) com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323) com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43) com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48) com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:200) com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158) com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137) com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

I also tried the following as suggested by the javadoc but it has errors during pipeline construction.

@SplitRestriction
public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
  return range.split(1000, 10);
}

Without implementing @SplitRestriction, my pipeline can run without any errors. However, I think the SDF is not really splitted by default, which defeats the purpose of improving performance.

Does anyone know if @SplitRestriction is currently supported by Dataflow runner? How can I write a working version with the latest SDK?

Thanks,
Jiayuan