Thanks for looking into this problem. The cause seems to be Flink's
pipelined execution mode. It runs multiple tasks in one task slot and
produces a deadlock when the pipelined operators schedule the SDK
harness DoFns in non-topological order.
The problem would be resolved if we scheduled the tasks in topological
order. Doing that is not easy because they run in separate Flink
operators and the SDK Harness would have to have insight into the
execution graph (which is not desirable).
The easiest method, which you proposed in 1) is to ensure that the
number of threads in the SDK harness matches the number of
ExecutableStage DoFn operators.
The approach in 2) is what Flink does as well. It glues together
horizontal parts of the execution graph, also in multiple threads. So I
agree with your proposed solution.
On 17.08.18 03:10, Ankur Goenka wrote:
> tl;dr Dead Lock in task execution caused by limited task parallelism on
> * Job type: /*Beam Portable Python Batch*/ Job on Flink standalone
> * Only a single job is scheduled on the cluster.
> * Everything is running on a single machine with single Flink task
> * Flink Task Manager Slots is 1.
> * Flink Parallelism is 1.
> * Python SDKHarness has 1 thread.
> *Example pipeline:*
> Read -> MapA -> GroupBy -> MapB -> WriteToSink
> With multi stage job, Flink schedule different dependent sub tasks
> concurrently on Flink worker as long as it can get slots. Each map tasks
> are then executed on SDKHarness.
> Its possible that MapB gets to SDKHarness before MapA and hence gets
> into the execution queue before MapA. Because we only have 1 execution
> thread on SDKHarness, MapA will never get a chance to execute as MapB
> will never release the execution thread. MapB will wait for input from
> MapA. This gets us to a dead lock in a simple pipeline.
> Set worker_count in pipeline options more than the expected sub tasks
> in pipeline.
> 1. We can get the maximum concurrency from the runner and make sure
> that we have more threads than max concurrency. This approach
> assumes that Beam has insight into runner execution plan and can
> make decision based on it.
> 2. We dynamically create thread and cache them with a high upper bound
> in SDKHarness. We can warn if we are hitting the upper bound of
> threads. This approach assumes that runner does a good job of
> scheduling and will distribute tasks more or less evenly.
> We expect good scheduling from runners so I prefer approach 2. It is
> simpler to implement and the implementation is not runner specific. This
> approach better utilize resource as it creates only as many threads as
> needed instead of the peak thread requirement.
> And last but not the least, it gives runner control over managing truly
> active tasks.
> Please let me know if I am missing something and your thoughts on the