OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Never gets into ProcessWindowFunction.process()


Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,