Re: JobManager did not respond within 60000 ms
Glad that you solved your problem :)
Splitting code into smaller functions has its advantages, but more operators/tasks means more overhead for JobManager/TaskManager to manage them. Usually that’s not a big issue, but as I said, you were running your cluster on extremely low memory settings.
> On 9 Oct 2018, at 18:09, jpreisner@xxxxxxx wrote:
> Hi Piotrek,
> Thank you for your answer. Actually it was necessary to increase the memory of the JobManager (I had tested it but I had not restarted Flink ...).
> I will also work on optimization. I thought it was good practice to create as much function as possible based on their functional value (for example: create two FilterFunctions that have a different functional meaning). So I will try to have fewer functions (for example: gather my two FilterFunctions in one).
> Thanks again Piotrek !
> ----- Mail original -----
> De: "Piotr Nowojski" <piotr@xxxxxxxxxxxxxxxxx>
> À: jpreisner@xxxxxxx
> Cc: user@xxxxxxxxxxxxxxxx
> Envoyé: Mardi 9 Octobre 2018 10:37:58
> Objet: Re: JobManager did not respond within 60000 ms
> You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem.
> Secondly, there are quite some results in google search of this error that points toward high-availability issues. Have you read those previously reported problems?
> Thanks, Piotrek
> On 9 Oct 2018, at 09:57, jpreisner@xxxxxxx wrote:
> I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms
> at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
> at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
> at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
> ... 11 more
> Caused by: java.util.concurrent.TimeoutException
> at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
> ... 12 more
> I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification).
> I tried to change the configuration (akka.client.timeout and akka.framesize) without success.
> This is my flink-conf.yaml
> jobmanager.rpc.address: myhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 128
> taskmanager.heap.mb: 1024
> taskmanager.numberOfTaskSlots: 100
> taskmanager.memory.preallocate: false
> taskmanager.data.port: 6121
> parallelism.default: 1
> taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
> blob.storage.directory: /dohdev/flink/tmp/blob
> jobmanager.web.port: -1
> high-availability: zookeeper
> high-availability.zookeeper.quorum: localhost:2181
> high-availability.zookeeper.path.root: /dohdev/flink
> high-availability.cluster-id: dev
> high-availability.storageDir: file:////mnt/metaflink
> high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 1000
> restart-strategy.fixed-delay.delay: 5 s
> zookeeper.sasl.disable: true
> blob.service.cleanup.interval: 60
> And I launch a job with this command : bin/flink run -d myjar.jar
> I added as an attachment a graph of my job when it works (Graph.PNG).
> Do you have an idea of the problem ?