[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: CEP & checkpoints/savepoints

Hi Ron,

Fabian is absolutely right. CEP library uses standard checkpointing mechanisms of Flink. You do not need any additional configuration.

The only consideration one has to think of is that if you change some conditions in your Pattern, and restart from checkpoint/savepoint you might get some outdated results, cause we might have progressed already with old condition. Consider pattern A B, if some event mapped to A with old condition, we are already looking for B. So if we take a savepoint, change condition for A , restart, there might be some matches that do not match the new A condition, cause the state machine is already in B state.



On 17/09/18 22:24, Fabian Hueske wrote:
Hi Ron,

The CEP library is built on top of the DataStream / ProcessFunction API and holds all necessary state (the state of the pattern matching state machine) in regular keyed MapState.
Hence, CEP does not require a dedicated configuration for checkpoints and savepoints, besides the regular application checkpoint configuration.

That's also why there's no dedicated documentation about this subject.

@Dawid or Klou, please correct me if I'm wrong.

Best, Fabian

2018-09-17 19:09 GMT+02:00 Ron Crocker <rcrocker@xxxxxxxxxxxx.invalid>:
I’m working with CEP to detect when something stops reporting (which is very simple), but I need to show the team that the jobs will survive being shutdown and restarted without either a) declaring that everything stopped reporting (false positives) or b) missing things that have indeed stopped reporting (false negatives).

There seems to be NO documentation regarding CEP and checkpoints/savepoints. Am I just missing it? Or is it something so simple that it should be obvious?

Our graph is fairly straightforward - keyed stream using event time and a Pattern that is essentially a report followed by a report within a time window, and we use the timed-out side output as the “events” indicating “thing stopped reporting”. It seems that we need to checkpoint/savepoint the pattern state along with the normal things checkpointed (e.g., Kafka offsets).

For now, I should be able to sell an assertion from you that it works, but official documentation would help.


Ron Crocker
Distinguished Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835

Attachment: signature.asc
Description: OpenPGP digital signature