osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: error while joining two datastream


Hi,

I assume that 

withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();

Actually prints what you have expected? 

If so, the problem might be that:
a) time/watermarks are not progressing (watermarks are triggering the output of your `TumblingEventTimeWindows.of(Time.seconds(15))`)
b) data are not being joined, because:
  - there are no matching elements (based on your KeySelectors) to join with between those two streams
  - elements are out of sync with respect to window length (within your 15 second tumbling window, there are no elements to join)
c) streams are producing different event times/watermarks (for example one is far ahead of the other). Windowed join will produce result only once their’s both watermarks catch up/sync up.
  
Piotrek 

On 23 Nov 2018, at 08:50, Abhijeet Kumar <abhijeet.kumar@xxxxxxxxxxxx> wrote:

DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

/**
*/
private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) {
return element.f10;
}
});

DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

/**
*/
private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(
Tuple7<String, String, String, String, String, String, Long> element) {
return element.f6;
}
});

withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();

DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1
.join(withTimestampsAndWatermarks2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
/**
*/
private static final long serialVersionUID = 1L;

public String getKey(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
/**
*/
private static final long serialVersionUID = 1L;

public String getKey(Tuple7<String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

/**
*/
private static final long serialVersionUID = 1L;

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0first.f1first.f2first.f3first.f4first.f5first.f6first.f7first.f8first.f9second.f1second.f2second.f3second.f4second.f5second.f6first.f10);
}
});

joined.print();

Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream.

On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <nagarjun@xxxxxxxxx> wrote:

Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.kumar@xxxxxxxxxxxx> wrote:
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !

-- 
Regards,
Nagarjun

Success is not final, failure is not fatal: it is the courage to continue that counts. 
- Winston Churchill -