osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-10381) concurrent submit job get ProgramAbortException


Youjun Yuan created FLINK-10381:
-----------------------------------

             Summary: concurrent submit job get ProgramAbortException
                 Key: FLINK-10381
                 URL: https://issues.apache.org/jira/browse/FLINK-10381
             Project: Flink
          Issue Type: Bug
          Components: JobManager
    Affects Versions: 1.6.0, 1.5.1, 1.4.0
         Environment: Flink 1.4.0, standardalone cluster.
            Reporter: Youjun Yuan
             Fix For: 1.7.0
         Attachments: image-2018-09-20-22-40-31-846.png

if submit multiple jobs concurrently, some the them are likely to fail, and return following exception: 

_java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar._ 
_at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)_ 
_at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$47/713642705.get(Unknown Source)_ 
_at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)_ 
_at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)_ 
_at java.util.concurrent.FutureTask.run(FutureTask.java:266)_ 
_at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)_ 
_at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)_ 
_at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)_ 
_at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)_ 
_at java.lang.Thread.run(Thread.java:745)_
_Caused by: org.apache.flink.util.FlinkException: Could not run the jar. ... 10 more_

_Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error:_ 
_at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)_ 
_at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)_ 
_at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)_ 
_at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69) ... 9 more_

_Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException_ 
_at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:72)_ 
_..._ 
_at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_ 
_at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)_ 
_at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_ 
_at java.lang.reflect.Method.invoke(Method.java:497)_ 
_at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)_ 
_at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)_ 
_at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83_

 
h2. Possible Cause:

in OptimizerPlanEnvironment.getOptimizerPlan(), setAsContext() will set a static variable named contextEnvironmentFactory in ExecutionEnviroment, which will eventually cause ExecutionEnviroment.getExecutionEnvironment() returns the currently OptimizerPlanEnvironment instance, and capture the optimizerPlan and save to a instance vairable in OptimizerPlanEnvironment.

However, if multiple jobs are submitted at the same time, the static variable contextEnvironmentFactory in ExecutionEnvironment will be set again by a following job, hence force ExecutionEnviroment.getExecutionEnvironment() return another new instance of OptimizerPlanEnvironment, therefore, the first intance of OptimizerPlanEnvironment will not caputre the optimizerPlan, and throws ProgramInvocationException. The spot is copied below for you convience:

setAsContext();
 try {
 prog.invokeInteractiveModeForExecution();
 }
 catch (ProgramInvocationException e) {
 throw e;
 }
 catch (Throwable t) {
 // the invocation gets aborted with the preview plan
 if (optimizerPlan != null) {
 return optimizerPlan;
 } else {
 throw new ProgramInvocationException("The program caused an error: ", t);
 }
 }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)