OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Starting a seperate Java process within a Flink cluster


Hello,


I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate 

process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process.  My implementation is created with Maven and it could potentially be added as an dependency. 


Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataStream.addSink(
	(SinkFunction)DSPConnectorFactory
		.getInstance()
		.createSinkConnector(
			new DSPConnectorConfig
				.Builder("localhost", 9656)
                    		.withDSP("flink")
				.withBufferConnectorString("buffer-connection-string")
                    		.withHWM(20)
                    		.withTimeout(10000)
                    		.build()));



The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame);
How
JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome +
File.separator + "bin" +
File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = javaClass.getCanonicalName();

System.out.println("Trying to build process " + classpath + " " + className);

ProcessBuilder builder = new ProcessBuilder(
javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));

builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
builder.redirectError(ProcessBuilder.Redirect.INHERIT);

Process process = builder.start();
return process;
}

I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me!

Thanks for any help!

With best regards