osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to submit flink job on yarn by java code


Hi,

Is this path accessible on the container? If not, use some distributed file system, nfs or -yt —yarnship option of the cli.


Piotrek

On 16 Aug 2018, at 11:05, spoon_lz <971066723@xxxxxx> wrote:

Sorry, I don't know why the code and error are not visible.
The error is :
The program finished with the following exception:

/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at flink.SubmitDemo.submit(SubmitDemo.java:75)
at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_000002 exited
with  exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 5 more/

and my code like :

/public class SubmitDemo {


   private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
   private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
   private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";


   public static void main(String[] args) {

       SubmitDemo demo = new SubmitDemo();
       demo.before();
       List<String> parameters = new ArrayList<>();
       parameters.add("run");
       parameters.add("-d");
       parameters.add("-m");
       parameters.add("yarn-cluster");
       parameters.add("-ynm");
       parameters.add("lz_test_alone");
       parameters.add("-yn");
       parameters.add("4");
       parameters.add("-ytm");
       parameters.add("4096");
       parameters.add("-yjm");
       parameters.add("1024");
       parameters.add("-c");
       parameters.add("flink.Demo");
       parameters.add(JAR_FILE);

       try {
           demo.submit(parameters.toArray(new String[parameters.size()]));
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

   public void submit(String[] args) throws Exception {

       final String configurationDirectory = ENV_CONF;

       File configFIle = new File(FLINK_CONF);

       final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());

       FlinkYarnSessionCli cli = new
FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
"yarn");

       final List<CustomCommandLine&lt;?>> customCommandLines =
CliFrontend.loadCustomCommandLines(
               flinkConfiguration,
               configurationDirectory);

       CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
customCommandLines);
       //submit
       testFrontend.parseParameters(args);
       CommandLine commandLine = CliFrontendParser.parse(
               CliFrontendParser.getRunCommandOptions(),
               args,
               true);
       final ApplicationId clusterId = cli.getClusterId(commandLine);
       System.out.println("ApplicationId=" + clusterId.toString());
   }

   // SET HADOOP ENV
   private void before() {
       Map<String, String> newenv = Maps.newHashMap();
       newenv.put("HADOOP_CONF_DIR", ENV_CONF);
       newenv.put("YARN_CONF_DIR", ENV_CONF);
          try {
           Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
           Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
           theEnvironmentField.setAccessible(true);
           Map<String, String> env = (Map<String, String>)
theEnvironmentField.get(null);
           env.putAll(newenv);
           Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
           theCaseInsensitiveEnvironmentField.setAccessible(true);
           Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
           cienv.putAll(newenv);
     } catch (NoSuchFieldException e) {
           Class[] classes = Collections.class.getDeclaredClasses();
           Map<String, String> env = System.getenv();
           for (Class cl : classes) {
               if
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
                   Field field = cl.getDeclaredField("m");
                   field.setAccessible(true);
                   Object obj = field.get(env);
                   Map<String, String> map = (Map<String, String>) obj;
                   map.clear();
                   map.putAll(newenv);
               }
           }
       }
   }


}/


the error  is file not found
"/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
"
but I can foud this file .
Previously, I thought it was an environment variable problem and added "
before() ". This method still reported an error




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/