osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Serious stability issues when running on YARN (Flink 1.7.0)


Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have observed some pretty bad stability issues with the Yarn execution. 

It's pretty hard to understand what's going on so sorry for the vague description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots each) and the job tries to recover we can observe taskmanagers start to fail.

The errors usually look like this:
20181220T141057.132+0100  INFO The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021 timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run() @ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection container_e15_1542798679751_0033_01_000021 because: The heartbeat of TaskManager with id container_e15_1542798679751_0033_01_000021  timed out.  [org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection() @ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config stream -> (Filter Failures, Flat Map), Filter BEA) (168/180) (3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e15_1542798679751_0033_01_000021_0 was removed.
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
	at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
	at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
	at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]

The job then goes in a restart loop, where taskmanagers come and go, the UI sometimes displays more than 30 taskmanagers and some extra slots. I have in some instances seen "GC overhead limit exceeded" during the recovery which is very strange.

I suspect there might be something strange happening, maybe some broken logic in the slot allocations or some memory leak. 

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in the previous Flink releases where we always used the "legacy" execution mode.

Thank you!
Gyula