osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

RE: How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?


Hi,

 

Without any other solution, I made a shell script that copies the original content of FLINK_CONF_DIR in a temporary rep, modify flink-conf.yaml to set yarn.properties-file.location, and change FLINK_CONF_DIR to that temp rep before executing flink.

I am now able to select the container I want, but I think it should be made simpler…

I’ll open a Jira.

 

Best regards,

Arnaud

 

 

De : LINZ, Arnaud
Envoyé : jeudi 1 février 2018 16:23
À : user@xxxxxxxxxxxxxxxx
Objet : How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

 

Hello,

 

I am using Flink 1.3.2 and I’m struggling to achieve something that should be simple.

For isolation reasons, I want to start multiple long living yarn session containers (with the same user) and choose at run-time, when I start a HA streaming app, which container will hold it.

 

I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir

The session is created and a .yarn-properties-$USER file is generated.

 

And I’ve tried the following to submit my job:

 

CASE 1

flink-conf.yaml : yarn.properties-file.location: mydir

flink run options : none

  • Uses zookeeper and works  – but I cannot choose the container as the property file is global.

 

CASE 2

flink-conf.yaml : nothing

flink run options : -yid applicationId

  • Do not use zookeeper, tries to connect to yarn job manager but fails in “Job submission to the JobManager timed out” error

 

CASE 3

flink-conf.yaml : nothing

flink run options : -yid applicationId and -yD with all dynamic properties found in the “dynamicPropertiesString” of .yarn-properties-$USER file

  • Same as case 2

 

CASE 4

flink-conf.yaml : nothing

flink run options : -yD yarn.properties-file.location=mydir

  • Tries to connect to local (non yarn) job manager (and fails)

 

CASE 5

Even weirder:

flink-conf.yaml : yarn.properties-file.location: mydir

flink run options : -yD yarn.properties-file.location=mydir

  • Still tries to connect to local (non yarn) job manager!

 

What am I doing wrong?

 

Logs extracts :

CASE 1:

2018:02:01 15:43:20 - Waiting until all TaskManagers have connected

2018:02:01 15:43:20 - Starting client actor system.

2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:20 - Trying to select the network interface and address to use by connecting to the leading JobManager.

2018:02:01 15:43:20 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics

2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.

2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - Slf4jLogger started

2018:02:01 15:43:21 - Starting remoting

2018:02:01 15:43:21 - Remoting started; listening on addresses :[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:36340]

2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - TaskManager status (2/1)

2018:02:01 15:43:21 - All TaskManagers are connected

2018:02:01 15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.

2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.

2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.

2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).

2018:02:01 15:43:21 - Disconnect from JobManager null.

2018:02:01 15:43:21 - Connect to JobManager Actor[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager#-1554418245].

2018:02:01 15:43:21 - Connected to JobManager at Actor[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager#-1554418245] with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.

2018:02:01 15:43:21 - Sending message to JobManager akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress

2018:02:01 15:43:21 - Upload jar files to job manager akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager.

2018:02:01 15:43:21 - Blob client connecting to akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager

2018:02:01 15:43:22 - Submit job to the job manager akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager.

2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to the JobManager akka://flink/deadLetters.

2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to status RUNNING.

 

CASE 2:

2018:02:01 15:48:43 - Waiting until all TaskManagers have connected

2018:02:01 15:48:43 - Starting client actor system.

2018:02:01 15:48:43 - Trying to select the network interface and address to use by connecting to the leading JobManager.

2018:02:01 15:48:43 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics

2018:02:01 15:48:43 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.

2018:02:01 15:48:43 - Slf4jLogger started

2018:02:01 15:48:43 - Starting remoting

2018:02:01 15:48:43 - Remoting started; listening on addresses :[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:34140]

2018:02:01 15:48:43 - TaskManager status (2/1)

2018:02:01 15:48:43 - All TaskManagers are connected

2018:02:01 15:48:43 - Submitting job with JobID: cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.

2018:02:01 15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: cd3e0e223c57d01d415fe7a6a308576c)) but there is no connection to a JobManager yet.

2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c).

2018:02:01 15:48:43 - Disconnect from JobManager null.

2018:02:01 15:48:43 - Connect to JobManager Actor[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager#-1554418245].

2018:02:01 15:48:43 - Connected to JobManager at Actor[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager#-1554418245] with leader session id 00000000-0000-0000-0000-000000000000.

2018:02:01 15:48:43 - Sending message to JobManager akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and wait for progress

2018:02:01 15:48:43 - Upload jar files to job manager akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager.

2018:02:01 15:48:43 - Blob client connecting to akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager

2018:02:01 15:48:45 - Submit job to the job manager akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager.

2018:02:01 15:49:45 - Terminate JobClientActor.

2018:02:01 15:49:45 - Disconnect from JobManager Actor[akka.tcp://flink@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:33970/user/jobmanager#-1554418245].

 

Then

Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

        at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)

        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)

        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)

        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)

        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)

        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)

        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

        at akka.actor.ActorCell.invoke(ActorCell.scala:487)

        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)

        at akka.dispatch.Mailbox.run(Mailbox.scala:220)

        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

CASE 3,4

 

2018:02:01 15:35:14 - Starting client actor system.

2018:02:01 15:35:14 - Trying to select the network interface and address to use by connecting to the leading JobManager.

2018:02:01 15:35:14 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics

2018:02:01 15:35:14 - Retrieved new target address localhost/127.0.0.1:6123.

2018:02:01 15:35:15 - Trying to connect to address localhost/127.0.0.1:6123

2018:02:01 15:35:15 - Failed to connect from address 'elara-edge-u2-n01/10.136.170.196': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)

2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.196': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)

2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.196': Connexion refusée (Connection refused)

2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)

 

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.