[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Starting a seperate Java process within a Flink cluster


I am currently working on my masters and I encountered a difficult problem.

Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate 

process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process.  My implementation is created with Maven and it could potentially be added as an dependency. 

Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"

Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

			new DSPConnectorConfig
				.Builder("localhost", 9656)

The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame);
JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome +
File.separator + "bin" +
File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = javaClass.getCanonicalName();

System.out.println("Trying to build process " + classpath + " " + className);

ProcessBuilder builder = new ProcessBuilder(
javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));


Process process = builder.start();
return process;

I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me!

Thanks for any help!

With best regards