[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Windows aligned with EDT ( with day light saving )

Hey Fabian, it seems as simple as this ?

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;

* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements and the date zone the offset it computed off. Windows cannot overlap.
* <p>For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TimeZoneAwareTumblingEventTimeWindows.of(Time.minutes(1), ZoneId.of(AMERICA_NEW_YORK_ZONE)));
* } </pre>
public class TimeZoneAwareTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {

private static final long serialVersionUID = 1L;

private final long size;

private final ZoneId zoneID;

protected TimeZoneAwareTumblingEventTimeWindows(long size, ZoneId zoneID) {
this.size = size;
this.zoneID = zoneID;

* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
* @param size The size of the generated windows.
* @return The time policy.
public static TimeZoneAwareTumblingEventTimeWindows of(Time size, ZoneId zoneID) {
return new TimeZoneAwareTumblingEventTimeWindows(size.toMilliseconds(), zoneID);

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
ZonedDateTime dateTime = Instant.ofEpochMilli(timestamp).atZone(zoneID);
long start = TimeWindow.getWindowStartWithOffset(timestamp, dateTime.getOffset().getTotalSeconds()*1000l, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +

public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();

public String toString() {
return "TumblingEventTimeWindows(" + size + ")";

public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();

public boolean isEventTime() {
return true;


On Wed, May 2, 2018 at 9:31 AM, Vishal Santoshi <vishal.santoshi@xxxxxxxxx> wrote:
True that. Thanks. Wanted to be sure before I go down that path.

On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
Hi Vishal,

AFAIK it is not possible with Flink's default time windows.
However, it should be possible to implement a custom WindowAssigner for your use case.
I'd have a look at the TumblingEventTimeWindows class and copy/modify it to your needs.

Best, Fabian

2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vishal.santoshi@xxxxxxxxx>:
This does not seem possible but need some confirmation. Anyone ?

On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <vishal.santoshi@xxxxxxxxx> wrote:
How do I align a Window with EDT with day light saving correction ?  The offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 , 18 and so on but on EDT.