OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-9547) CEP pattern not called on windowed stream


Lucas Resch created FLINK-9547:
----------------------------------

             Summary: CEP pattern not called on windowed stream
                 Key: FLINK-9547
                 URL: https://issues.apache.org/jira/browse/FLINK-9547
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.5.0, 1.3.2
            Reporter: Lucas Resch


When trying to match a pattern on a stream that was windowed the pattern will not be called. The following shows example code where the issue was noticed:
{code:java}
// Set up stream
SingleOutputStreamOperator<ForceZ> forces = ...
        .filter(new FilterForcesFunction())
        .process(new ProcessForcesFunction());

// Define mock pattern
Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new SimpleCondition<ForceZ>() {
    @Override
    public boolean filter(ForceZ value) {
        // This is called as expected
        return true;
    }
});

// Print pattern results
// This actually prints all incoming events as expected
CEP.pattern(forcesMock, mock)
        .select(new PatternSelectFunction<ForceZ, ForceZ>() {
            @Override
            public ForceZ select(Map<String, List<ForceZ>> pattern){
                return pattern.get("start").get(0);
            }
        }).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Interval> intervals = forces
        .countWindowAll(2, 1)
        .process(new ForceWindowFunction());

// Define mock pattern
Pattern<Interval, Interval> intervalMock = Pattern.<Interval>begin("start").where(new SimpleCondition<Interval>() {
    @Override
    public boolean filter(Interval value) throws Exception {
        // This is never called
        return true;
    }
});

// Print pattern results
// Doesn't print anything since the mock condition is never called
CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Interval, Interval>() {
            @Override
            public Interval select(Map<String, List<Interval>> pattern) throws Exception {
                return pattern.get("start").get(0);
            }
        }).print();
{code}
Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)