OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Missing MapState when Timer fires after restored state


Hi Juho,
I tried multi times follow the simple code you privoded, but still can't reproduce the bug you met. There's one more question I'd like to confirm with you, is the stateRetentionMillis a fixed(final) field or it might be changed on some condition?

Best, Sihua
On 05/19/2018 08:19sihua zhou<summerleafs@xxxxxxx> wrote:
Sorry for the incorrect information, that's not the case.

Best, Sihua



On 05/19/2018 07:58sihua zhou<summerleafs@xxxxxxx> wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.

try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

iterator.seek(startKeyGroupPrefixBytes);

while (iterator.isValid()) {

int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

iterator.next();
}
}

for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....

@Stefan, this still need your double check, plz correct me if I'm wrong.

Best, Sihua

On 05/18/2018 17:29sihua zhou<summerleafs@xxxxxxx> wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.

- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)

And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?

Best, Sihua




On 05/18/2018 17:14Juho Autio<juho.autio@xxxxxxxxx> wrote:

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <juho.autio@xxxxxxxxx> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <summerleafs@xxxxxxx> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02Juho Autio<juho.autio@xxxxxxxxx> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.richter@xxxxxxxxxxxxxxxxx> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan