[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Event Time Session Window does not trigger..

Hi Hequn, 

Thanks for link. Looks like I better use ProcessingTime instead of EventTime especially because of the 4th reason you listed..
"Data should cover a longer time span than the window size to advance the event time." 
I need the trigger when the data stops.

I have 1 more question.
Can I set the TimeCharacteristic to the stream level instead on the application level?
Can I use both TimeCharacteristic.ProcessingTime and TimeCharacteristic.EventTime in an application.
Thank you 

On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghequn@xxxxxxxxx> wrote:
Hi shyla,

I answered a similar question on stackoverflow[1], you can take a look first.

Best, Hequn

On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <deshpandeshyla@xxxxxxxxx> wrote:
I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes 
and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code.
Appreciate any help. Thanks
object KafkaEventTimeWindow {

private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
private val LOCAL_KAFKA_BROKER = "localhost:9092"
private val CON_GROUP = "KafkaEventTimeSessionWindow"
private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds

def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val kafkaProps = new Properties
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
kafkaProps.setProperty("group.id", CON_GROUP)
kafkaProps.setProperty("auto.offset.reset", "earliest")

val consumer = new FlinkKafkaConsumer011[PositionEventProto](
new PositionEventProtoSchema,
consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner)

val posstream = env.addSource(consumer)

def convtoepochmilli(cdt: String): Long = {
val odt:OffsetDateTime = OffsetDateTime.parse(cdt);
val i:Instant = odt.toInstant();
val millis:Long = i.toEpochMilli();

val outputstream = posstream
.mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
.reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }


// execute the transformation pipeline
env.execute("Output Stream")

class PositionEventProtoTSAssigner
extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) {

override def extractTimestamp(pos: PositionEventProto): Long = {
val odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
val i:Instant = odt.toInstant();
val millis:Long = i.toEpochMilli();