osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to submit flink job on yarn by java code


Sorry, I don't know why the code and error are not visible.
The error is :
 The program finished with the following exception:

/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
	at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
	at flink.SubmitDemo.submit(SubmitDemo.java:75)
	at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment. 
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_000002 exited
with  exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
	at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
	at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
	at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
	at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
	... 5 more/

and my code like :

/public class SubmitDemo {


    private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
    private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
    private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";


    public static void main(String[] args) {

        SubmitDemo demo = new SubmitDemo();
        demo.before();
        List<String> parameters = new ArrayList<>();
        parameters.add("run");
        parameters.add("-d");
        parameters.add("-m");
        parameters.add("yarn-cluster");
        parameters.add("-ynm");
        parameters.add("lz_test_alone");
        parameters.add("-yn");
        parameters.add("4");
        parameters.add("-ytm");
        parameters.add("4096");
        parameters.add("-yjm");
        parameters.add("1024");
        parameters.add("-c");
        parameters.add("flink.Demo");
        parameters.add(JAR_FILE);

        try {
            demo.submit(parameters.toArray(new String[parameters.size()]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void submit(String[] args) throws Exception {

        final String configurationDirectory = ENV_CONF;

        File configFIle = new File(FLINK_CONF);

        final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());

        FlinkYarnSessionCli cli = new
FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
"yarn");

        final List<CustomCommandLine&lt;?>> customCommandLines =
CliFrontend.loadCustomCommandLines(
                flinkConfiguration,
                configurationDirectory);

        CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
customCommandLines);
        //submit
        testFrontend.parseParameters(args);
        CommandLine commandLine = CliFrontendParser.parse(
                CliFrontendParser.getRunCommandOptions(),
                args,
                true);
        final ApplicationId clusterId = cli.getClusterId(commandLine);
        System.out.println("ApplicationId=" + clusterId.toString());
    }

    // SET HADOOP ENV 
    private void before() {
        Map<String, String> newenv = Maps.newHashMap();
        newenv.put("HADOOP_CONF_DIR", ENV_CONF);
        newenv.put("YARN_CONF_DIR", ENV_CONF);
           try {
            Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
            Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
            theEnvironmentField.setAccessible(true);
            Map<String, String> env = (Map<String, String>)
theEnvironmentField.get(null);
            env.putAll(newenv);
            Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
            theCaseInsensitiveEnvironmentField.setAccessible(true);
            Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
            cienv.putAll(newenv);
      } catch (NoSuchFieldException e) {
            Class[] classes = Collections.class.getDeclaredClasses();
            Map<String, String> env = System.getenv();
            for (Class cl : classes) {
                if
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
                    Field field = cl.getDeclaredField("m");
                    field.setAccessible(true);
                    Object obj = field.get(env);
                    Map<String, String> map = (Map<String, String>) obj;
                    map.clear();
                    map.putAll(newenv);
                }
            }
        }
    }


}/


the error  is file not found
"/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
"
but I can foud this file .
Previously, I thought it was an environment variable problem and added "
before() ". This method still reported an error




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/