osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Accessing keyed state in portable timer callbacks


Yes, I was hoping to merge the logic within the SplittableProcessElementsRunner back into the FnApiDoFnRunner since there is some duplication and it is the original reason why FnApiStateAccessor exists. Previously we just referenced the correct object directly within the class based upon the current context (StartBundle/FinishBundle/ProcessElement/OnTimer).

On Thu, Nov 1, 2018 at 2:43 AM Maximilian Michels <mxm@xxxxxxxxxx> wrote:
Hi Lukasz,

Thanks for promptly fixing this [1]. I saw that the current element was
not set correctly when timers are processed, but wanted to make sure any
changes would be aligned with the harness processing model.

I think I favor the currentElementOrTimer approach because it makes
things more explicit, but the solution is fine for now.

Thanks,
Max

[1] https://github.com/apache/beam/pull/6902

On 31.10.18 19:09, Lukasz Cwik wrote:
> I filed https://issues.apache.org/jira/browse/BEAM-5930.
>
> On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik <lcwik@xxxxxxxxxx
> <mailto:lcwik@xxxxxxxxxx>> wrote:
>
>     That looks like a bug in the FnApiDoFnRunner.java
>
>     The FnApiStateAccessor is given a callback to get the current
>     element and it is not handling the case where the current element is
>     a timer.
>
>     callback:
>     https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
>     where the current "element" gets set:
>     https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
>     where the current "timer" gets set:
>     https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237
>
>     The easiest fix would be to have the callback return the first non
>     null from currentElement/currentTimer but longer term I think we'll
>     want a different solution. Alternatively, we could collapse
>     currentElement and currentTimer to be currentElementOrTimer which
>     would solve the accessor issue.
>
>     On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels <mxm@xxxxxxxxxx
>     <mailto:mxm@xxxxxxxxxx>> wrote:
>
>         Hi,
>
>         I have a question regarding user state during timer callback in
>         the FnApiDoFnRunner (Java SDK Harness).
>
>         I've started implementing Timers for the portable Flink Runner.
>         I can register a timer via the timer output collection and fire
>         the timer via the timer input of the SDK Harness. But when I try
>         to access state in the Timer callback, I get the exception below.
>
>         Is this a bug or if not, how is the timer's key supposed to be
>         set? I assume that it should be set from the timer element which
>         contains the key.
>
>         Thanks,
>         Max
>
>
>         Caused by: java.util.concurrent.ExecutionException:
>         java.lang.RuntimeException: Error received from SDK harness for
>         instruction 72: java.util.concurrent.ExecutionException:
>         java.lang.NullPointerException
>                  at
>         java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>                  at
>         java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>                  at
>         org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
>                  at
>         org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
>                  at
>         org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
>                  at
>         org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
>                  at
>         org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
>                  at
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>                  at
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                  at java.lang.Thread.run(Thread.java:748)
>         Caused by: java.lang.NullPointerException
>                  at
>         org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
>                  at
>         org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
>                  at
>         org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
>                  at
>         org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
>                  at
>         org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
>                  at
>         org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
>                  at
>         StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown
>         Source)
>                  at
>         org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
>                  at
>         org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
>                  at
>         org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>                  at
>         org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
>