osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to anchor a job level singleton object as part of TaskManager JVM?


Hi,

I'm using Flink 1.4.2 release.

My Flink streaming job needs a way to make a singleton object available throughout the job graph which consists of Kafka Source, ProcessFunction, RichAsyncFunction, RichSinkFunction.  What's the best way to achieve this?

As an attempt, I have tried anchoring my object in a jar that is included as part of the TaskManager JVM class path.  But it does not seem to work in the following scenario:
1. When the job manager process was restarted (due to a pod failure) while my job is running
2. When the job manager comes back, it attempts to restart my job, which seems a bit odd.
3. But, my job kept failing with a ClassCastException where it attempts to retrieve the singleton from TaskManager class loader.

2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (10/12) (289e6ee24be520d81890dfe5a1a164fa) switched from RUNNING to FAILED.
java.lang.ClassCastException: org.monstor.flink.coordination.Coordinator cannot be cast to org.monstor.flink.coordination.Coordinator
        at org.monstor.flink.coordination.CoordinatorHolder.<init>(CoordinatorHolder.java:48)
        at org.monstor.myjob.flink.MyAsyncWriter.open(GsiStoreAsyncWriter.java:102)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:164)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (2/12) (e55b878f5c9d91a0dc557dd60b80e867) switched from RUNNING to CANCELING.

Thanks,
Connie