[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Migration to Flip6 Kubernetes

Is anyone actively working on direct Kubernetes support?

I'd be excited to see this get in sooner rather than later, I'd be happy to start a PR.

On 3/22/18 10:37 AM, Till Rohrmann wrote:
Hi Edward and Eron,

you're right that there is currently no JobClusterEntrypoint implementation for Kubernetes. How this entrypoint looks like mostly depends on how the job is stored and retrieved. There are multiple ways conceivable:

- The entrypoint connects to an external system from which it fetches the JobGraph
- The entrypoint contains the serialized JobGraph similar to how the YarnJobClusterEntrypoint works, but this would mean that you have a separate image per job
- The entrypoint actually executes a user jar which generates the JobGraph similar to what happens on the client when you submit a job

I'm not a Kubernetes expert and therefore I don't know what's the most idiomatic approach to it. But once we have figured this out, it should not be too difficult to write the Kubernetes JobClusterEntrypoint.

If we say that Kubernetes is responsible for assigning new resources, then we need a special KubernetesResourceManager which automatically assigns all registered slots to the single JobMaster. This JobMaster would then accept all slots and scale the job to how many slots it got offered. That way we could easily let K8 control the resources.

If there is a way to communicate with K8 from within Flink, then we could also implement a mode which is similar to Flink's Yarn integration. The K8RM would then ask for new pods to be started if the JM needs more slots.

The per-job mode on K8 won't unfortunately make it into Flink 1.5. But I'm confident that the community will address this issue with Flink 1.6.


On Wed, Mar 21, 2018 at 4:08 PM, Eron Wright <eronwright@xxxxxxxxx> wrote:
It would be helpful to expand on how, in job mode, the job graph would be produced.  The phrase 'which contains the single job you want to execute' has a few meanings; I believe Till means a serialized job graph, not an executable JAR w/ main method.  Till is that correct?

On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated with respect to Flip-6. This will be one of the tasks during the Flink 1.5 release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The former starts a cluster to which you can submit multiple jobs. The cluster shares the same ResourceManager and a Dispatcher which is responsible for spawning JobMasters which execute a single job each. The latter starts a Flink cluster which is pre-initialized with a JobGraph and only runs this job. Here we also start a ResourceManager and a MiniDispatcher whose job it is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the JobGraph from HDFS and then automatically starts executing it. There is no script which directly starts this entrypoint, but the YarnClusterDescriptor uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to which you can submit any number of Flink jobs or having a special image which contains the single job you want to exeucte, you either have to call into the SessionClusterEntrypoint or the JobClusterEntrypoint. When starting a session cluster, then you can use bin/flink run to submit a job to this cluster.

Let me know if you have other questions.


On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas <edward.rojascl@xxxxxxxxx> wrote:

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.

I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready.

If I understand correctly from
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.

I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?

* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)

Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed.


Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/