OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Savepoint failed with error "Checkpoint expired before completing"


Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <myasuka@xxxxxxxx> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]
   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].


Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint

Best
Yun Tang


From: Gagan Agrawal <agrawalgagan@xxxxxxxxx>
Sent: Wednesday, October 31, 2018 19:03
To: happydexutao@xxxxxxxxx
Cc: user@xxxxxxxxxxxxxxxx
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"
 
Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <happydexutao@xxxxxxxxx> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <agrawalgagan@xxxxxxxxx> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)