OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Different result on running Flink in local mode and Yarn cluster


Not knowing the library or the config needs I do not have a suggestion. If the config is accumulated from inputs and needs to see all inputs I would suggest setting parallelism to 1 as an experiment, but it would need a redesign to run in parallel. 

Michael

Sent from my iPad

On Apr 26, 2018, at 12:50 AM, Soheil Pourbafrani <soheil.ir08@xxxxxxxxx> wrote:

Thanks, So what is your suggestion to solve the problem? Is it possible to use Broadcast Variables for this senario?

On Thu, Apr 26, 2018 at 10:57 AM, TechnoMage <mlatta@xxxxxxxxxxxxxx> wrote:
What parallelism are you using?  If it is > 1 you can not rely on the config value being passed to each of the parsing functions as they are running on separate threads or even separate machines.

Michael


On Apr 26, 2018, at 12:24 AM, Soheil Pourbafrani <soheil.ir08@xxxxxxxxx> wrote:

As I said at first version of the code I didn't pass any argument to the parse function and the HashMap was static among the Parser class, but it didn't get the desired anwser and I test giving the HashMap as an argument for parse method, but still not getting desired answers! The code is something like the following:

public class Test {

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}




I summary I want the HashMap to be shared among the taskmanagers.