osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Accessing keyed state in portable timer callbacks


Hi Lukasz,

Thanks for promptly fixing this [1]. I saw that the current element was not set correctly when timers are processed, but wanted to make sure any changes would be aligned with the harness processing model.

I think I favor the currentElementOrTimer approach because it makes things more explicit, but the solution is fine for now.

Thanks,
Max

[1] https://github.com/apache/beam/pull/6902

On 31.10.18 19:09, Lukasz Cwik wrote:
I filed https://issues.apache.org/jira/browse/BEAM-5930.

On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik <lcwik@xxxxxxxxxx <mailto:lcwik@xxxxxxxxxx>> wrote:

    That looks like a bug in the FnApiDoFnRunner.java

    The FnApiStateAccessor is given a callback to get the current
    element and it is not handling the case where the current element is
    a timer.

    callback:
    https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
    where the current "element" gets set:
    https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
    where the current "timer" gets set:
    https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237

    The easiest fix would be to have the callback return the first non
    null from currentElement/currentTimer but longer term I think we'll
    want a different solution. Alternatively, we could collapse
    currentElement and currentTimer to be currentElementOrTimer which
    would solve the accessor issue.

    On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels <mxm@xxxxxxxxxx
    <mailto:mxm@xxxxxxxxxx>> wrote:

        Hi,

        I have a question regarding user state during timer callback in
        the FnApiDoFnRunner (Java SDK Harness).

        I've started implementing Timers for the portable Flink Runner.
        I can register a timer via the timer output collection and fire
        the timer via the timer input of the SDK Harness. But when I try
        to access state in the Timer callback, I get the exception below.

        Is this a bug or if not, how is the timer's key supposed to be
        set? I assume that it should be set from the timer element which
        contains the key.

        Thanks,
        Max


        Caused by: java.util.concurrent.ExecutionException:
        java.lang.RuntimeException: Error received from SDK harness for
        instruction 72: java.util.concurrent.ExecutionException:
        java.lang.NullPointerException
                 at
        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
                 at
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
                 at
        org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
                 at
        org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
                 at
        org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
                 at
        org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
                 at
        org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
                 at
        org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
                 at
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                 at
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                 at java.lang.Thread.run(Thread.java:748)
        Caused by: java.lang.NullPointerException
                 at
        org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
                 at
        org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
                 at
        org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
                 at
        org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
                 at
        org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
                 at
        org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
                 at
        StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown
        Source)
                 at
        org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
                 at
        org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
                 at
        org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
                 at
        org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
                 at
        org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
                 at
        org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
                 at
        org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
                 at
        org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50)
                 at
        org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
                 at
        org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
                 at
        org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
                 at
        org.apache.beam.vendor.grpc.v1.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519)
                 at
        org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
                 at
        org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)