osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Taskmanager times out continuously for registration with Jobmanager


We were able to fix it by passing IP address instead of hostname for actor system listen address when starting taskmanager:

def runTaskManager(
taskManagerHostname: String,
resourceID: ResourceID,
actorSystemPort: Int,
configuration: Configuration,
highAvailabilityServices: HighAvailabilityServices)
: Unit = {

The following log message at jobmanager gave some clue:

{"timeMillis":1539297842333,"thread":"jobmanager-future-thread-2","level":"DEBUG","loggerName":"org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher","message":"Could not retrieve QueryServiceGateway.","thrown":{"commonElementCount":0,"localizedMessage":"akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070/), Path(/user/MetricQueryService_5261ccab66b86b53a4edd64f26c1f282)]"...

...


We figured there is some problem with hostname resolution after the actor is quarantined, would you know why this happens? Is it some cache problem in Flink or Akka code JobManager is using?

On Fri, Oct 12, 2018 at 1:05 AM Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:
It is hard to tell without all logs but it could easily be a K8s setup problem. Also problematic is that you are running a Flink version which is no longer actively supported. Try at least to use the latest bug fix release for 1.4.

Cheers,
Till

On Fri, Oct 12, 2018, 09:43 Abdul Qadeer <quadeer.leo@xxxxxxxxx> wrote:
Hi Till,

A few more data points:

In a rerun of the same versions with fresh deployment, I see log.debug(s"RegisterTaskManager: $msg") in JobManager, however the AcknowledgeRegistration/AlreadyRegistered messages are never sent, I have taken tcpdump for the taskmanager which doesn't recover and compared it with another taskmanager which recovers after restart (i.e. receives AcknowledgeRegistration message).

Restarting the docker container of bad taskmanager doesn't work. The only workaround right now is to delete the kubernetes pod holding the bad taskmanager container. Does it have to do something with the akka address the jobmanager stores for a taskmanager? The only variable I see between restarting container vs pod is the change in the akka address.

Also, the infinite retries for registration start after the taskmanager container restarts with Jobmanager actor system quarantined:

{"timeMillis":1539282282329,"thread":"flink-akka.actor.default-dispatcher-3","level":"ERROR","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"The actor system akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070 has quarantined the remote actor system akka.tcp://flink@192.168.83.52:6123. Shutting the actor system down to be able to reestablish a connection!","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":49,"threadPriority":5}


A manual restart by docker restart or killing the JVM doesn't reproduce this problem.


On Thu, Oct 11, 2018 at 11:15 AM Abdul Qadeer <quadeer.leo@xxxxxxxxx> wrote:
Hi Till,

I didn't try with newer versions as it is not possible to update the Flink version atm.
If you could give any pointers for debugging that would be great.

On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:
Hi Abdul,

have you tried whether this problem also occurs with newer Flink versions (1.5.4 or 1.6.1)?

Cheers,
Till

On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz <dwysakowicz@xxxxxxxxxx> wrote:

Hi Abdul,

I've added Till and Gary to cc, who might be able to help you.

Best,

Dawid


On 11/10/18 03:05, Abdul Qadeer wrote:

Hi,


We are facing an issue in standalone HA mode in Flink 1.4.0 where Taskmanager restarts and is not able to register with the Jobmanager. It times out awaiting AcknowledgeRegistration/AlreadyRegistered message from Jobmanager Actor and keeps sending RegisterTaskManager message. The logs at Jobmanager don’t show anything about registration failure/request. It doesn’t print log.debug(s"RegisterTaskManager: $msg") (from JobManager.scala) either. The network connection between taskmanager and jobmanager seems fine; tcpdump shows message sent to jobmanager and TCP ACK received from jobmanager. Note that the communication is happening between docker containers.


Following are the logs from Taskmanager:



{"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying to register at JobManager akka.tcp://flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout: 30000 milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}

{"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got ping response for sessionid: 0x10000260ea5002d after 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

{"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got ping response for sessionid: 0x10000260ea5002d after 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

{"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying to register at JobManager akka.tcp://flink@192.168.83.51:6123/user/jobmanager (attempt 1401, timeout: 30000 milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}

{"timeMillis":1539189620251,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got ping response for sessionid: 0x10000260ea5002d after 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

{"timeMillis":1539189632478,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying to register at JobManager akka.tcp://flink@192.168.83.51:6123/user/jobmanager (attempt 1402, timeout: 30000 milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}