OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Help with OneInputStreamOperatorTestHarness


Hi Fabian,

I created FLINK-9262.

FYI,

- Chris

On Apr 26, 2018, at 3:07 AM, Fabian Hueske <fhueske@xxxxxxxxx> wrote:

Thanks for reporting the issue Chris!
Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?

Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK

2018-04-25 21:11 GMT+02:00 Chris Schneider <cschneider@xxxxxxxxxxxxxxxxxx>:
Hi Gang,

FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 to Flink 1.4.0, but that resulted in the same failure mode.

I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0.

FYI,

- Chris


On Apr 18, 2018, at 8:07 PM, Chris Schneider <CSchneider@xxxxxxxxxxxxxxxxxx> wrote:

Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhihong@xxxxxxxxx> wrote:

Which release are you using ?

See if the work around from FLINK-8268 helps.

Cheers

On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <cschneider@xxxxxxxxxxxxxxxxxx> wrote:
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------