osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Very slow checkpoints occasionally occur


Hi Stefan,

Thanks a lot for shedding a light on this!

 it seems to me that checkpoint execution (the times of the sync and async part) are always reasonable fast once they are executed on the task
Okay, HDFS clients seem to have nothing to do with it. Thanks a lot for pointing this out.

 Could it be that, as time progresses or jumps, there is a spike in session window triggering?
As you said, there were spikes after the period as shown in [1].

When time moves it could be possible that suddenly a lot of windows are triggered and when a checkpoint barrier is arriving after the firing was triggered, it will have to wait until all window firing is completed for consistency reason. This would also explain the backpressure that you observe during this period, coming from a lot of / expensive window firing and future events/checkpoints can only proceed when the firing is done. 
Should cluster load be high if Flink spends most of time for taking care of window firing? However, as shown in [2], cluster load/disk throughput/network throughput are low during the period. 
In addition, we've observed this problem even during the night time when user requests are much lower than this and even when message rates are decreasing.

You could investigate if that is what is happening and maybe take measure to avoid this, but that is highly dependent on your job logic.
I’ve implemented a custom trigger for session window [3] to trigger early firing as depicted in [4].
Could a custom trigger implementation be a source of the problem?

Thanks a lot for taking a look at it :-)

Best,

- Dongwon


[1] 

[2]


On Tue, Dec 11, 2018 at 7:54 PM Stefan Richter <s.richter@xxxxxxxxxxxxxxxxx> wrote:
>
> Hi,
>
> Looking at the numbers, it seems to me that checkpoint execution (the times of the sync and async part) are always reasonable fast once they are executed on the task, but there are changes in the alignment time and the time from triggering a checkpoint to executing a checkpoint. As you are using windows and looking at the way the state size behaves before and after the problem, I might have a suggestion what could cause the problem. Before and during the problematic checkpoints, state size is rising. After the problem is gone, the state size is significantly smaller. Could it be that, as time progresses or jumps, there is a spike in session window triggering? When time moves it could be possible that suddenly a lot of windows are triggered and when a checkpoint barrier is arriving after the firing was triggered, it will have to wait until all window firing is completed for consistency reason. This would also explain the backpressure that you observe during this period, coming from a lot of / expensive window firing and future events/checkpoints can only proceed when the firing is done. You could investigate if that is what is happening and maybe take measure to avoid this, but that is highly dependent on your job logic.
>
> Best,
> Stefan
>
> On 11. Dec 2018, at 10:26, Dongwon Kim <eastcirclek@xxxxxxxxx> wrote:
>
> Hi all,
>
> We're facing the same problem mentioned in [1] - Very slow checkpoint attempts of few tasks cause checkpoint failures and, furthermore, incur high back pressure.
> We're running our Flink jobs on a cluster where
> - 2 masters + 8 worker nodes
> - all nodes, even masters, are equipped with SSD's
> - we have a separate cluster for Kafka
> - we depend largely on Hadoop-2.7.3; YARN for deploying per-job clusters and HDFS for storing checkpoints and savepoints
> - All SSD's of each node serve as local-dirs for YARN NM and data-dirs for HDFS DN
> - we use RocksDB state backend
> - we use the latest version, flink-1.7.0
> - we trigger checkpoints every 30 minutes and the size of state is not that large as shown in the attached screenshot.
>
> The job itself recovers from checkpoint failures and back pressure after a while; [2] shows that the job recovers after three failed checkpoints.
>
> Below is part of JM log message:
>
> 2018-12-10 17:24:36,150 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 14 @ 1544430276096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 17:24:57,912 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 14 for job e0cf3843cba85e8fdd5570ba18970d33 (43775252946 bytes in 21781 ms).
> 2018-12-10 17:54:36,133 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 15 @ 1544432076096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 18:04:36,134 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 15 of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 18:24:36,156 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 16 @ 1544433876096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 18:34:36,157 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 16 of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 18:54:36,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 17 @ 1544435676096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:04:36,139 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 17 of job e0cf3843cba85e8fdd5570ba18970d33 expired before completing.
> 2018-12-10 19:15:44,849 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 15 from e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:16:37,822 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 16 from e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:17:12,974 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 17 from e81a7fb90d05da2bcec02a34d6f821e3 of job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:24:36,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 18 @ 1544437476096 for job e0cf3843cba85e8fdd5570ba18970d33.
> 2018-12-10 19:32:05,869 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 18 for job e0cf3843cba85e8fdd5570ba18970d33 (52481833829 bytes in 449738 ms).
>
> #15, #16, and #17 fail due to a single task e81a7fb90d05da2bcec02a34d6f821e3, which is a session-window task.
>
> As shown in [3], during that period, the average checkpoint end-to-end duration for the window operation increased as follows:
> - #15 : 37s
> - #16 : 1m 4s
> - #17 : 1m 25s
> However, the average end-to-end duration for normal situations is not that long (less than 30s).
> During that period, back pressure affect the throughput a lot which is very frustrating.
>
> How can I get rid of checkpoint failures and back pressure?
> Isn't it somewhat related to HDFS clients?
>
>
> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html
> [2]
> <failures.png>
> [3]
> <???? ????? ᆺ 2018-12-11 오후 4.25.54.png>
>
>