[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Beam application upgrade on Flink crashes


Unfortunately, there are currently no compatibility guarantees between different Beam versions. Beam itself doesn't have the required interfaces or procedures in place for supporting backwards compatibility of state and there have been quite some changes in the internals between Flink 1.4 and Flink 1.5 that made larger changed necessary in how the Beam-Flink runner handles operator state.


On 22. Aug 2018, at 12:14, Jozef Vilcek <jozo.vilcek@xxxxxxxxx> wrote:

Hm, I am sorry to hear this. I must of missed it in docs, that beam version upgrades can break flink state. It is important information for ones wanting to use Beam on Flink in production.

So, I guess there is no guarantee for another bump of Flink version to not break things until it reach 1.7. 
Event then, thinks can break maybe? Is there a plan making Flink runner more robust and catch compatibility issues early by tests?

Just trying to figure out my options with upgrades. Does other runners suffer the same weak guarantees?

On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen <sewen@xxxxxxxxxx> wrote:
Flink 1.7 will change the way the "restore serializer" is handled, which should make it much easier to handle such cases.
Especially breaking java class version format will not be an issue anymore.

That should help to make it easier to give the Beam-on-Flink runner cross version compatibility.

On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels <mxm@xxxxxxxxxx> wrote:
AFAIK the serializer used here is the CoderTypeSerializer which may not
be recoverable because of changes to the contained Coder
(TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
small changes could break serialization backwards-compatibility.

As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
This should be improved for the next release.



On 20.08.18 17:46, Stephan Ewen wrote:
> Hi Jozef!
> When restoring state, the serializer that created the state must still
> be available, so the state can be read.
> It looks like some serializer classes were removed between Beam versions
> (or changed in an incompatible manner).
> Backwards compatibility of an operator implementation needs cooperation
> from the operator. Withing Flink itself, when we change the way an
> operator uses state, we keep the old codepath and classes in a
> "backwards compatibility restore" that takes the old state and brings it
> into the shape of the new state. 
> I am not deeply into the of how Beam and the Flink runner implement
> their use of state, but it looks this part is not present, which could
> mean that savepoints taken from Beam applications are not backwards
> compatible.
> On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vilcek@xxxxxxxxx
> <mailto:jozo.vilcek@xxxxxxxxx>> wrote:
>     Hello,
>     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>     starting a new App with updated libs from Flink save-point captured
>     by previous version of the app.
>     There is not change in topology. Job is accepted without error to
>     the new cluster which suggests that all operators are matched with
>     state based on IDs. However, app runs only few seccons and then
>     crash with:
>     java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:745)
>     Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 provided restore options.
>       at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>       at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>       ... 5 more
>     Caused by: java.io.IOException: Unable to restore operator state [bundle-buffer-tag]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
>       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
>       at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>       at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>       ... 7 more
>     Does this mean anything to anyone? Am I doing anything wrong or did
>     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
>     seems to be too deep internal in runner for my reach.
>     Any help is much appreciated.
>     Best,
>     Jozo