osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Cancel flink job occur exception.


Hi Devin,

Why do you trigger cancel with savepoint immediately after the job status
changes to Deployed? A more secure way is to wait for the job to become
running after it has been running for a while before triggering.

We have also encountered before, there will be a case where the client
times out or still tries to connect to the closed JM after RestClient calls
cancel with savepoint.

Thanks, vino.

devinduan(段丁瑞) <devinduan@xxxxxxxxxxx> 于2018年9月4日周二 下午6:22写道:

> Hi all,
>       I submit a flink job through yarn-cluster mode and cancel job with
> savepoint option immediately after job status change to deployed. Sometimes
> i met this error:
>
> org.apache.flink.util.FlinkException: Could not cancel job xxxx.
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
>         at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>         ... 1 more
> Caused by: java.util.concurrent.CompletionException:
> java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
>         at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 16 more
> Caused by: java.net.ConnectException: Connect refuse:
> xxx/xxx.xxx.xxx.xxx:xxx
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
>         ... 7 more
>
>     I check the jobmanager log, no error found. Savepoint is correct saved
> in hdfs. Yarn appliction status changed to FINISHED and FinalStatus change
> to KILLED.
>     I think this issue occur because RestClusterClient cannot find
> jobmanager addresss after Jobmanager(AM) has shutdown.
>     My flink version is 1.5.3.
>     Anyone could help me to resolve this issue, thanks!
>
> devin.
>
>
>
> ________________________________
>
>