[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: SDK Harness Deployment

> a Flink task manager lifecycle isn't tied to a single job (it is a worker that exists even before the pipeline was submitted). Therefore we will need to fall back to (1). Boot code would fail to connect until the job was deployed [...].

The pipeline defines which environment (= SDK harness) to use, so we couldn't in general be able run it before the job is submitted. If you have some more specialized scenario in mind -- which I assume here is the case -- where the setup supports only a fixed set of SDK harnesses (or a custom one you author) then you may also be able to use the same SDK harness to run multiple jobs. How practical that is depends on the SDK, whether it uses job-specific information, what kind of isolation between jobs is needed and whether the artifacts are identical.

For Go, for example, the default container image is identical across jobs and the sole artifact is the user binary, so to be usable across jobs all DoFns for all jobs must be compiled into a single binary. Nothing currently uses the job-specific information. If the SDK harness is not given persistent storage through the 'semi_persistent_path' flag, it is stateless and could serve multiple jobs in sequence if restarted -- assuming provisioning/artifacts each time updated to serve information for the new job. An "exit" instruction might be helpful to force the SDK harness to exit gracefully. If the setup is not accepting arbitrary container images (or not using containers at all), you could also use a special container image to restart just the inner Go process and re-pull provisioning/artifacts instead of the whole container if that works better. Either way, you could have a setup where a fixed set of such Go containers match the same number of slots of the TM and can handle multiple jobs. If the setup needs to support arbitrary SDKs and container images, however, then you'd be back in the world of dynamically starting them somehow.


On Fri, Jun 8, 2018 at 11:45 AM Thomas Weise <thw@xxxxxxxxxx> wrote:
Sounds good.

Regarding (2), the bootstrapping endpoints serve job specific info, but a Flink task manager lifecycle isn't tied to a single job (it is a worker that exists even before the pipeline was submitted). Therefore we will need to fall back to (1). Boot code would fail to connect until the job was deployed and the Flink runner has established the endpoint. This should work fine, as long as the boot code is retried without causing the entire container to exit, it may just be some noise in the logs? 

In my scenario there won't be a second job that runs on the same task manager, since we are planning to deploy Flink along with the application. But Flink in general also supports a "session" mode where multiple jobs can share the same set of task managers. In that case it would be necessary to isolate the SDK workers because they can only serve a single job (unless what you have listed under static information is identical).

Looking at the current runner code there will be some work in the JobResourceManager/SingletonSdkHarnessManager neighborhood that I can pick up once we have the basics working in master. Currently SDK workers can only be distinguished by the port they connect to, the runner does not look at the worker ID or makes it available in any way. So the support to multiplex has to be added. Perhaps Ben/Alex can comment on this? 


On Fri, Jun 8, 2018 at 10:19 AM, Henning Rohde <herohde@xxxxxxxxxx> wrote:
You're right. That is the idea.

Two comments on the executable stage not being available yet:
  (1) An SDK harness may either retry or fail (exit) if it can't connect/times out/gets an error. If it exits, the runner/environment is responsible for restarting the process/container. So it will effectively always retry. The boot code currently used tries to connect for 2 mins after which it gives up (and in turn is restarted and tries again). The 2min is set a bit arbitrarily, btw, so we can adjust it for the default containers.
  (2) The 2 bootstrapping endpoints serve static information (pipeline options, artifacts, and job metadata) that may not require an executable stage -- for example, if the artifact service just serves data from HDFS. The control endpoint is mainly driven by the runner side, so the multiplexer could allow any SDK harness to connect, but it just wouldn't send any actual instructions until the executable stages were ready. So for 2nd jobs or if we have some global hooks into the TM (or deploy a separate process -- provisioning and artifacts are separate services to make this possible), it might be possible to allow the SDK harness to boot in parallel with the TM being fully ready. Disclaimer: I have a limited understanding of the Flink constraints here.


On Fri, Jun 8, 2018 at 7:49 AM Thomas Weise <thw@xxxxxxxxxx> wrote:
Yes, it did not occur to me that we have the identifier available for this. I just took a fresh look at https://s.apache.org/beam-fn-api-container-contract

So it should be possible to start a pool of containers with pre-assigned IDs in the pod, communicate the same set of IDs to the runner (via it's configuration) and then come up with some mechanism to assign executable stages to worker IDs as part of the Flink operator initialization.

By the time the SDK boot code calls the provisioning service to fetch the pipeline options, the runner wouldn't be ready (either since the TM isn't running or the executable stages were not deployed into it yet). So will that call just retry until the endpoint becomes available? On the runner side, the endpoint can only be activated (under the fixed address) when the task slots are assigned.



On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <herohde@xxxxxxxxxx> wrote:
Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC header when it connects to the TM. The TM can use a fixed port and multiplex requests based on that id - to match the SDK harness with the appropriate job/slot/whatnot. The relationship between SDK harness and TM is not limited to 1:1, but rather many:1. We'll likely need that for cross-language as well. Wouldn't multiplexing on a single port for the control plane be the easiest solution for both #1 and #2? The data plane can still use various dynamically-allocated ports.

On Kubernetes, we're somewhat constrained by the pod lifetime and multi-job TMs might not be as natural to achieve.


On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <thw@xxxxxxxxxx> wrote:
Hi Henning,

Here is a page that explains the scheduling and overall functioning of the task manager in Flink:

Here are the 2 issues:

#1 each task manager process get assigned multiple units of execution into task slots. So when we deploy a Beam pipeline, we can end up with multiple executable stages running in a single TM JVM.

This where a 1-to-1 relationship between TM and SDK harness can lead to a bottleneck (all task slots of a single TM push their work to a single SDK container).

#2 in a deployment where multiple pipelines share a Flink cluster, the SDK harness per TM approach wouldn't work logically. We would need to have multiple SDK containers, not just for efficiency reasons.

This would not be an issue for the deployment scenario I'm looking at, but it needs to be considered for general Flink runner deployment.

Regarding the assignment of fixed endpoints within the TM, that is possible but it doesn't address #1 and #2.

I hope this clarifies?


On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <herohde@xxxxxxxxxx> wrote:
Thanks for writing down and explaining the problem, Thomas. Let me try to tease some of the topics apart.

First, the basic setup is currently as follows: there are 2 worker processes (A) "SDK harness" and (B) "Runner harness" that needs to communicate. A connects to B. The fundamental endpoint(s) of B as well as an id -- logging, provisioning, artifacts and control -- are provided to A via command line parameters. A is not expected to be able to connect to the control port without first obtaining pipeline options (from provisioning) and staged files (from artifacts). As an side, this is where the separate boot.go code comes in handy. A can assume it will be restarted, if it exits. A does not assume the given endpoints are up when started and should make blocking calls with timeout (but if not and exits, it is restarted anyway and will retry). Note that the data plane endpoints are part of the control instructions and need not be known or allocated at startup or even be served by the same TM.

Second, whether or not docker is used is rather an implementation detail, but if we use Kubernetes (or other such options) then some constraints come into play.

Either way, two scenarios work well:
   (1) B starts A: The ULR and Flink prototype does this. B will delay starting A until it has decided which endpoints to use. This approach requires B to do process/container management, which we'd rather not have to do at scale. But it's convenient for local runners.
   (2) B has its (local) endpoints configured or fixed: A and B can be started concurrently. Dataflow does this. Kubernetes lends itself well to this approach (and handles container management for us).   

The Flink on Kubernetes scenario described above doesn't:
   (3) B must use randomized (local) endpoints _and_ A and B are started concurrently: A would not know where to connect.

Perhaps I'm not understanding the constraints of the TM well enough, but can we really not open a configured/fixed port from the TM -- especially in a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to the pod might by an alternative option and morph (3) into (2). B would configure C when it's ready. A would connect to C, but be blocked until B has configured it. C could perhaps even serve logging, provisioning, and artifacts without B. And the data plane would not go over C anyway. If control proxy'ing is a concern, then alternatively we would add an indirection to the container contract and provide the control endpoint in the provisioning api, say, or even a new discovery service.

There are of course other options and tradeoffs, but having Flink work on Kubernetes and not go against the grain seems desirable to me.


On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <thw@xxxxxxxxxx> wrote:

The current plan for running the SDK harness is to execute docker to launch SDK containers with service endpoints provided by the runner in the docker command line.

In the case of Flink runner (prototype), the service endpoints are dynamically allocated per executable stage. There is typically one Flink task manager running per machine. Each TM has multiple task slots. A subset of these task slots will run the Beam executable stages. Flink allows multiple jobs in one TM, so we could have executable stages of different pipelines running in a single TM, depending on how users deploy. The prototype also has no cleanup for the SDK containers, they remain running and orphaned once the runner is gone.

I'm trying to find out how this approach can be augmented for deployment on Kubernetes. Our deployments won't allow multiple jobs per task manager, so all task slots will belong to the same pipeline context. The intent is to deploy SDK harness containers along with TMs in the same pod. No assumption can be made about the order in which the containers are started, and the SDK container wouldn't know the connect address at startup (it can only be discovered after the pipeline gets deployed into the TMs).

I talked about that a while ago with Henning and one idea was to set a fixed endpoint address so that the boot code in the SDK container knows upfront where to connect to, even when that endpoint isn't available yet. This approach may work with minimal changes to runner and little or no change to SDK container (as long as the SDK is prepared to retry). The downside is that all (parallel) task slots of the TM will use the same SDK worker, which will likely lead to performance issues, at least with the Python SDK that we are planning to use.

An alternative may be to define an SDK worker pool per pod, with a discovery mechanism for workers to find the runner endpoints and a coordination mechanism that distributes the dynamically allocated endpoints that are provided by the executable stage task slots over the available workers.

Any thoughts on this? Is anyone else looking at a docker free deployment?