[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: SDK Harness Deployment

Yes, it did not occur to me that we have the identifier available for this. I just took a fresh look at https://s.apache.org/beam-fn-api-container-contract

So it should be possible to start a pool of containers with pre-assigned IDs in the pod, communicate the same set of IDs to the runner (via it's configuration) and then come up with some mechanism to assign executable stages to worker IDs as part of the Flink operator initialization.

By the time the SDK boot code calls the provisioning service to fetch the pipeline options, the runner wouldn't be ready (either since the TM isn't running or the executable stages were not deployed into it yet). So will that call just retry until the endpoint becomes available? On the runner side, the endpoint can only be activated (under the fixed address) when the task slots are assigned.



On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <herohde@xxxxxxxxxx> wrote:
Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC header when it connects to the TM. The TM can use a fixed port and multiplex requests based on that id - to match the SDK harness with the appropriate job/slot/whatnot. The relationship between SDK harness and TM is not limited to 1:1, but rather many:1. We'll likely need that for cross-language as well. Wouldn't multiplexing on a single port for the control plane be the easiest solution for both #1 and #2? The data plane can still use various dynamically-allocated ports.

On Kubernetes, we're somewhat constrained by the pod lifetime and multi-job TMs might not be as natural to achieve.


On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <thw@xxxxxxxxxx> wrote:
Hi Henning,

Here is a page that explains the scheduling and overall functioning of the task manager in Flink:

Here are the 2 issues:

#1 each task manager process get assigned multiple units of execution into task slots. So when we deploy a Beam pipeline, we can end up with multiple executable stages running in a single TM JVM.

This where a 1-to-1 relationship between TM and SDK harness can lead to a bottleneck (all task slots of a single TM push their work to a single SDK container).

#2 in a deployment where multiple pipelines share a Flink cluster, the SDK harness per TM approach wouldn't work logically. We would need to have multiple SDK containers, not just for efficiency reasons.

This would not be an issue for the deployment scenario I'm looking at, but it needs to be considered for general Flink runner deployment.

Regarding the assignment of fixed endpoints within the TM, that is possible but it doesn't address #1 and #2.

I hope this clarifies?


On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <herohde@xxxxxxxxxx> wrote:
Thanks for writing down and explaining the problem, Thomas. Let me try to tease some of the topics apart.

First, the basic setup is currently as follows: there are 2 worker processes (A) "SDK harness" and (B) "Runner harness" that needs to communicate. A connects to B. The fundamental endpoint(s) of B as well as an id -- logging, provisioning, artifacts and control -- are provided to A via command line parameters. A is not expected to be able to connect to the control port without first obtaining pipeline options (from provisioning) and staged files (from artifacts). As an side, this is where the separate boot.go code comes in handy. A can assume it will be restarted, if it exits. A does not assume the given endpoints are up when started and should make blocking calls with timeout (but if not and exits, it is restarted anyway and will retry). Note that the data plane endpoints are part of the control instructions and need not be known or allocated at startup or even be served by the same TM.

Second, whether or not docker is used is rather an implementation detail, but if we use Kubernetes (or other such options) then some constraints come into play.

Either way, two scenarios work well:
   (1) B starts A: The ULR and Flink prototype does this. B will delay starting A until it has decided which endpoints to use. This approach requires B to do process/container management, which we'd rather not have to do at scale. But it's convenient for local runners.
   (2) B has its (local) endpoints configured or fixed: A and B can be started concurrently. Dataflow does this. Kubernetes lends itself well to this approach (and handles container management for us).   

The Flink on Kubernetes scenario described above doesn't:
   (3) B must use randomized (local) endpoints _and_ A and B are started concurrently: A would not know where to connect.

Perhaps I'm not understanding the constraints of the TM well enough, but can we really not open a configured/fixed port from the TM -- especially in a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to the pod might by an alternative option and morph (3) into (2). B would configure C when it's ready. A would connect to C, but be blocked until B has configured it. C could perhaps even serve logging, provisioning, and artifacts without B. And the data plane would not go over C anyway. If control proxy'ing is a concern, then alternatively we would add an indirection to the container contract and provide the control endpoint in the provisioning api, say, or even a new discovery service.

There are of course other options and tradeoffs, but having Flink work on Kubernetes and not go against the grain seems desirable to me.


On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <thw@xxxxxxxxxx> wrote:

The current plan for running the SDK harness is to execute docker to launch SDK containers with service endpoints provided by the runner in the docker command line.

In the case of Flink runner (prototype), the service endpoints are dynamically allocated per executable stage. There is typically one Flink task manager running per machine. Each TM has multiple task slots. A subset of these task slots will run the Beam executable stages. Flink allows multiple jobs in one TM, so we could have executable stages of different pipelines running in a single TM, depending on how users deploy. The prototype also has no cleanup for the SDK containers, they remain running and orphaned once the runner is gone.

I'm trying to find out how this approach can be augmented for deployment on Kubernetes. Our deployments won't allow multiple jobs per task manager, so all task slots will belong to the same pipeline context. The intent is to deploy SDK harness containers along with TMs in the same pod. No assumption can be made about the order in which the containers are started, and the SDK container wouldn't know the connect address at startup (it can only be discovered after the pipeline gets deployed into the TMs).

I talked about that a while ago with Henning and one idea was to set a fixed endpoint address so that the boot code in the SDK container knows upfront where to connect to, even when that endpoint isn't available yet. This approach may work with minimal changes to runner and little or no change to SDK container (as long as the SDK is prepared to retry). The downside is that all (parallel) task slots of the TM will use the same SDK worker, which will likely lead to performance issues, at least with the Python SDK that we are planning to use.

An alternative may be to define an SDK worker pool per pod, with a discovery mechanism for workers to find the runner endpoints and a coordination mechanism that distributes the dynamically allocated endpoints that are provided by the executable stage task slots over the available workers.

Any thoughts on this? Is anyone else looking at a docker free deployment?