[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Use event time

Hi again!

Flink doesn’t order/sort the records according to event time. The preveiling idea is:
- records will be arriving out of order, operators should handle that
- watermarks are used for indicators of the current lower bound of the event time “clock”

For examples windowed joins/aggregations  assign records to one or more time windows, collect all of the data belonging to a window and when watermark exceeds/overtakes the window that when that window is being evaluated.


On 7 Dec 2018, at 09:22, min.tan@xxxxxxx wrote:



I am new to Flink. 


I have the following small code to use the event time. I did not get the result expected, i.e. it print out events in the order of event time.


Did I miss something here?







--------------Event time------------------

   public static void main(String[] args) throws Exception  {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
long start =System.currentTimeMillis();
        DataStream<Event> stream = env.fromElements(
new Event(0,start,start),
new Event(1,start+10,start+10), new Event(2,start+20,start-20),
new Event(3,start+30,start-30), new Event(4,start+40,start-40));

        stream.map(event -> 
"RAW order " + event.toString()).print();

        stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override public long extractAscendingTimestamp(Event element) { return element.time1; } })
                .map(event -> 
"time1 order:: " + event.toString()).print();

        stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override public long extractAscendingTimestamp(Event element) { return element.time2; } })
                .map(event -> 
"time2 order:: " + event.toString()).print();

env.execute("event time ........");

static public class Event {
int id;
long time1;
long time2;

int id, long time1, long time2){
this.id =id;

public String toString() {
return "id=" + id + "; time1=" + time1 + "; time2=" + time2;


Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html