Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?


You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:

        if (savedState != null) {

Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159


On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_lists@xxxxxxxxxxxx> wrote:

Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.


— Ken

package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    public void setUp() throws Exception {
    public void testTimerSaving() throws Throwable {
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        // We begin at time zero

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        // Close the first test harness
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        for (long i = 1; i < 20; i++) {
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    new TimerTool.IdentityKeySelector<Integer>(),
        if (savedState != null) {
        return result;

package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);

    public TimerFunction(List<Long> firedTimers, long period) {
        _firedTimers = firedTimers;
        _period = period;

    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",

    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",

