osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: RichInputFormat working differently in eclipse and in flink cluster


Hi Teena,

which Flink version are you using? Have you tried whether this happens with the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <teena.kappen@xxxxxxxxxx> wrote:

Hi all,

 

I have implemented RichInputFormat for reading result of aggregation queries in Elasticsearch. There are around 100000 buckets, which are of type json array. Note: This is one time response.

 

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

 

public void configure(Configuration parameters) {

System.out.println("configure");

}

 

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {

}

 

public ResponseInputSplit[] createInputSplits(int minNumSplits){

System.out.println("createInputSplits");

 

//read from elastic

// add buckets to array

}

 

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] inputSplits) {

//this is default

System.out.println("getInputSplitAssigner");

return new DefaultInputSplitAssigner(inputSplits);

}

 

public void open(ResponseInputSplit split) {

//read buckets

}

 

public boolean reachedEnd(){

System.out.println("reachedEnd");

}

 

public Bounce nextRecord(Bounce reuse) {

}

 

public void close(){

}

 

// my main method,

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();                             

DataSet<Bounce> bounce_data_set = env.createInput(new MyInputDataSetInputFormat());

 

When running in eclipse, it executes createInputSplits and the results look fine. Logs are given below.

Output is à

configure

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...

configure

createInputSplits

 

When submitting job in flink cluster, it doesn’t execute ‘configure’ and ‘createInputSplits’ methods. Instead it directly goes to nextRecord function. Logs are given below.

Output is à

Starting execution of program

configure

Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job completion.

Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx /user/jobmanager#1219973491] with leader session id...

10/26/2018 15:05:57     Job execution switched to status RUNNING.

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED

java.lang.NullPointerException

                               at com.xxx.test. MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

 

Regards,

Teena