osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Different result on running Flink in local mode and Yarn cluster


Only the anonymous FlatMapFunction instance is sent to the TaskManager. Move the static field to that class. 

Michael

Sent from my iPad

On Apr 25, 2018, at 10:42 PM, Soheil Pourbafrani <soheil.ir08@xxxxxxxxx> wrote:

I run a code using Flink Java API that gets some bytes from Kafka and parses it following by inserting into Cassandra database using another library static method (both parsing and inserting results is done by the library). Running code on local in IDE, I get the desired answer, but running on YARN cluster the parse method didn't work as expected!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

There is a static HashMap field in the classParser that configuration of parsing data is based on its information, and data will insert it during the execution. The problem running on YARN was this data was not available for taskmanagers and they just print config is not available!

So I redefine that HashMap as a parameter for the methodparse, but no differences in results!

How can I fix the problem?