[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Multiple env.execute() into one Flink batch job

We solved this issue (of read the value of an accumulator) by calling a REST endpoint after the job end, in order to store the value associated to the accumulator in some database.
This is very awful but I didn't find any better solution..

This is the code that runs the job (of course its not complete but it could help you to get some insight):

You need to import the following Java lib first:

-------------------- FlinkSshJobRun.java
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.config.keys.FilePasswordProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSshJobRun implements Runnable {
  private static final Logger LOG = LoggerFactory.getLogger(FlinkSshJobRun.class);

  private final Integer sshPort;
  private final String sshUser;
  private final String sshKeyPassword;
  private final String command;
  private final Integer sshTimeoutMillis;
  private final String targetHost;
  private final Charset encoding;

  public FlinkSshJobRun(FlinkProperties flinkProps,
      String jarFile, String entryClass, String argsStr, Integer parallelism)
      throws MalformedURLException {
    final URL jobManagerUrl = new URL(flinkProps.getJobManagerUrl());
    this.targetHost = jobManagerUrl.getHost();
    this.sshUser = flinkProps.getSsh().getUser();
    this.sshPort = flinkProps.getSsh().getPort();
    this.sshKeyPassword = flinkProps.getSsh().getKeyPassword();
    this.sshTimeoutMillis = flinkProps.getSsh().getTimeoutMs();
    String flinkBinClient = flinkProps.getSsh().getFlinkDistDir() + "/bin/flink";
    if (parallelism != null) {
      flinkBinClient += " -p " + parallelism;
    final String flinkUploadDir = flinkProps.getSsh().getFlinkJarUploadDir();
    final Path jarPathOnServer = Paths.get(flinkUploadDir, jarFile);//
    this.command = flinkBinClient + " run -c " + entryClass //
        + " " + jarPathOnServer //
        + " " + (argsStr == null ? "" : argsStr);
    this.encoding = Charset.forName(StandardCharsets.UTF_8.name());

  public void run() {
    ClientSession session = null;
    final ByteArrayOutputStream stdErr = new ByteArrayOutputStream();
    final ByteArrayOutputStream stdOut = new ByteArrayOutputStream();
    try (SshClient cl = SshClient.setUpDefaultClient();) {

      if (sshKeyPassword != null) {
        cl.setFilePasswordProvider(file -> sshKeyPassword);
      session = cl.connect(sshUser, targetHost, sshPort)//
      session.auth().verify(Math.multiplyExact(sshTimeoutMillis, 4));
      LOG.info("Executing SSH: {}@{}:{} -> {}", sshUser, targetHost, sshPort, command);
      session.executeRemoteCommand(command, stdOut, stdErr, encoding);
      LOG.info("SSH successfully executed {}@{}:{} -> {}", sshUser, targetHost, sshPort, command);
      final String stdOutTxt = getOsContentOnException(true, stdOut);
      LOG.info("SSH stdout for {}@{}:{} -> {}\n{}", sshUser, targetHost, sshPort, command,
    } catch (IOException ex) {
      final String stdOutTxt = getOsContentOnException(true, stdOut);
      final String stdErrTxt = getOsContentOnException(false, stdErr);
      final String errorMsg = String.format(
          "Error during SSH execution  %s@%s:%s -> %s%nSTDOUT:%s%n%nSTDERR:%s", sshUser, targetHost,
          sshPort, command, stdOutTxt, stdErrTxt);
      LOG.error(errorMsg, ex);
    } finally {
      try {
      } catch (IOException ex) {
        LOG.error("Error during STDERR buffer close", ex);
      try {
      } catch (IOException ex) {
        LOG.error("Error during STDOUT buffer close", ex);
      if (session != null) {
        try {
        } catch (IOException ex) {
          LOG.error("Error during SSH session close", ex);

  private String getOsContentOnException(boolean stdOut, final ByteArrayOutputStream os) {
    final String targetOutStream = stdOut ? "STDOUT" : "STDERR";
    String ret = String.format("<Error while retrieving %s>", targetOutStream);
    try {
      ret = os.toString(StandardCharsets.UTF_8.name());
    } catch (UnsupportedEncodingException ex2) {
      LOG.error("Error while reading " + targetOutStream + " after exception", ex2);
    return ret;


package it.okkam.datalinks.job.api.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "flink", ignoreUnknownFields = true)
public class FlinkProperties {
  private String jobManagerUrl;
  private final Ssh ssh = new Ssh();

  public String getJobManagerUrl() {
    return jobManagerUrl;

  public void setJobManagerUrl(String jobManagerUrl) {
    this.jobManagerUrl = jobManagerUrl;

  public Ssh getSsh() {
    return ssh;

  public static class Async {

    private Integer corePoolSize;
    private Integer maxPoolSize;
    private Integer queueCapacity;

    public Integer getCorePoolSize() {
      return corePoolSize;

    public void setCorePoolSize(final Integer corePoolSize) {
      this.corePoolSize = corePoolSize;

    public Integer getMaxPoolSize() {
      return maxPoolSize;

    public void setMaxPoolSize(final Integer maxPoolSize) {
      this.maxPoolSize = maxPoolSize;

    public Integer getQueueCapacity() {
      return queueCapacity;

    public void setQueueCapacity(final Integer queueCapacity) {
      this.queueCapacity = queueCapacity;

  public static class Ssh {
    private Integer port;
    private Integer timeoutMs;
    private String user;
    private String keyPassword;
    private String flinkDistDir;
    private String flinkJarUploadDir;

    private final Async async = new Async();

    public Async getAsync() {
      return async;

    public Integer getPort() {
      return port;

    public void setPort(Integer port) {
      this.port = port;

    public String getUser() {
      return user;

    public void setUser(String user) {
      this.user = user;

    public Integer getTimeoutMs() {
      return timeoutMs;

    public void setTimeoutMs(Integer timeoutMs) {
      this.timeoutMs = timeoutMs;

    public String getFlinkDistDir() {
      return flinkDistDir;

    public void setFlinkDistDir(String flinkDistDir) {
      this.flinkDistDir = flinkDistDir;

    public String getFlinkJarUploadDir() {
      return flinkJarUploadDir;

    public void setFlinkJarUploadDir(String flinkJarUploadDir) {
      this.flinkJarUploadDir = flinkJarUploadDir;

    public String getKeyPassword() {
      return keyPassword;

    public void setKeyPassword(String keyPassword) {
      this.keyPassword = keyPassword;

------------------- Rest service (via Spring boot)

private TaskExecutor taskExecutor;
  @GetMapping(METHOD_JOB_RUN + "/{" + PARAMS_JARFILE + ":.+}")
  public JobRun runJob(//
      @PathVariable(name = PARAMS_JARFILE) String jarFile, //
      @RequestParam(name = "entry-class", required = false) String entryClass,
      @RequestParam(name = "program-args", required = false) String argsStr,
      @RequestParam(name = "parallelism", required = false) Integer parallelism)
      throws IOException {

    final long start = System.currentTimeMillis();
    final String jobId = "OK"; //it could be set if Flink REST API would work as expected..
    taskExecutor.execute(new FlinkSshJobRun(flinkProperties, jarFile, entryClass, argsStr, parallelism));    
    final long elapsed = System.currentTimeMillis() - start;
    return new JobRun(elapsed, null, jobId);

On Fri, Nov 23, 2018 at 4:47 PM bastien dine <bastien.dine@xxxxxxxxx> wrote:
Oh god, if we have some code with Accumulator after the env.execute(), this will not be executed on the JobManager too ?
Thanks, I would be interested indeed !


Bastien DINE
Data Architect / Software Engineer / Sysadmin

Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier <pompermaier@xxxxxxxx> a écrit :
The problem is that the REST API block on env.execute.
If you want to run your Flink job you have to submit it using the CLI client.
As a workaround we wrote a Spring REST API that to run a job open an SSH connection to the job manager and execute the bin/flink run command..

If you're interested in I can share some code..

On Fri, Nov 23, 2018 at 4:32 PM bastien dine <bastien.dine@xxxxxxxxx> wrote:

I need to chain processing in DataSet API, so I am launching severals jobs, with multiple env.execute() :



This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first topology..

Could you please shed some light on this issue?


Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809