OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Can not submit flink job via IP or VIP of jobmanager


Hello Flink team,

We use Flink on DCOS and have problems submitting a Flink job from within a container to the Flink cluster. Both the container and the Flink cluster are running inside DCOS, on different nodes.

 

We have the following setup: Flink was installed on DCOS using the package from the catalog. According to the Flink UI ([DCOS-URL]/service/flink/) the Flink job manager settings are:

 

    jobmanager.rpc.address              ip-10-0-1-95.eu-central-1.compute.internal

    jobmanager.rpc.port          14503

    jobmanager.web.port                       14502

    mesos.artifact-server.port      14505

 

where "ip-10-0-1-95.eu-central-1.compute.internal" is the host name of the DCOS node with IP 10.0.1.95 on which the container with the job manager is running.

 

Furthermore for both the job manager RPC port and the job manager web port a VIP is configured:

 

job manager RPC port: flink.marathon.l4lb.thisdcos.directory:6123

job manager Web port: flink.marathon.l4lb.thisdcos.directory:8081

 

 

Now if we try to submit a Flink job to the job manager via the Flink cli performing the following steps:

1) log into the DCOS master node:

    dcos node ssh --leader --master-proxy

2) start an interactive session inside a Docker container using the Mesosphere Flink image:

    docker run --rm -it mesosphere/dcos-flink:1.4.2-1.0 /bin/bash

3) submit a Flink job to the Flink job manager:

    cd /flink-1.4.2

    ./bin/flink run -m ip-10-0-1-95.eu-central-1.compute.internal:14503 examples/streaming/WordCount.jar

               

everything works fine. The job appears as an entry within the Flink UI and we get the results we expect.

 

But if we try to submit the same job to the job manager using the VIP of the job manager flink.marathon.l4lb.thisdcos.directory:6123 using:

 

    ./bin/flink run -m flink.marathon.l4lb.thisdcos.directory:6123 examples/streaming/WordCount.jar

 

or if we try to submit the job to the job manager using the IP of the DCOS node instead of its host name:

 

    ./bin/flink run -m 10.0.1.95:14503 examples/streaming/WordCount.jar

 

the job can not be submitted. Apparently the connection to the job manager can not be established and nothing appears within the Flink UI. You can find the output in attachment.

Submitting to the jobmanager using the URL from Mesos DNS is also not working.

 

Why this is not working or why we can only submit jobs using the hostname (ip-10-0-1-95.eu-central-1.compute.internal) of the job manager and not the IP or the VIP?


Thank you!


Best regards

Wei


Cluster configuration: Standalone cluster with JobManager at flink.marathon.l4lb.thisdcos.directory/***.***.***.***:6123
Using address flink.marathon.l4lb.thisdcos.directory:6123 to connect to JobManager.
JobManager web interface address http://flink.marathon.l4lb.thisdcos.directory:8081
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: 22d5a45ee33edf71792c6bde8fc75211. Waiting for job completion.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
	at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
	at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
	... 21 more
Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager.
	at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219)
	at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
	at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)