osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] Proposal of external shuffle service


Hi Zhijiang,

Thanks for sharing the document Zhijiang. 
I decided to compile my thoughts to consider here, not to overload document comments any more :)

I think I still have question about job level configuration for the shuffle service. You mentioned that we can keep several shuffle manager objects in one task executor for different jobs. This is valid. My concerns are:
- how do we share shuffle manager resources among different job tasks within one task executor process? It could be some static objects shared by all shuffle manager objects of some type but it might be not scalable approach. Example could be multiplexed netty connections (as I understand, current netty stack can become just custom shuffle service).
- In case of having it per job, we might need to provide compatibility check between shuffle service and cluster mode (e.g. yarn ext shuffle service for standalone mode cluster) if it is an issue.
- Having it per job feels like the same complexity as having it per operator, at the first glance, just changes its granularity and where objects reside.
- what is the problem to use cluster per job mode? Then shuffle manager per cluster and per job is the same but might simplify other issues at the beginning. Streaming and batch jobs with different shuffle requirements could be started in different clusters per job. 

As for ShuffleManager interface, I think I see your point with the ResultPartitionLocation. I agree that partition needs some addressing of underlying connection or resources in general. It can be thinked of as an argument of ShuffleManager factory methods.

My point is that task code might not need to be coupled to shuffle interface. This way we could keep task code more independent of records transfer layer. We can always change later how shuffle/network service is organised internally without any consequences for the general task code. If task code calls just factories provided by JM, it might not even matter for the task in future whether it is configured per cluster, job or operator. Internally, factory can hold location of concrete type if needed.

Code example could be:

Job Manager side:

interface ShuffleManager {
  ResultPartionWriterFactory createResultPartionWriterFactory(job/task/topology descriptors);
  // similar for input gate factory
}

class ShuffleManagerImpl implements ShuffleManager {
  private general config, services etc;
  ResultPartionWriterFactory createResultPartionWriterFactory(job/task/topology descriptors) {
    return new ResultPartionWriterFactoryImpl(location, job, oper id, other specific config etc);
  }
  // similar for input gate factory
}
...
// somewhere in higher level code put ResultPartionWriterFactory into descriptor

Task executor side receives the factory inside the descriptor and calls factory.create(ShuffleServiceRegistry). Example of factory:

class ResultPartionWriterFactoryImpl implements ResultPartionWriterFactory {
  // all fields are lightweight and serialisable, received from JM
  private location, shuffle service id, other specific config etc;
  
  ResultPartionWriter create(ShuffleServiceRegistry registry, maybe more generic args) {
    // get or create task local specific ShuffleServiceImpl by id in registry
    // ShuffleServiceImpl object can be shared between jobs
    // register with the ShuffleServiceImpl by location, id, config etc
  }
}

interface ShuffleService extends AutoClosable {
  getId();
}

ShuffleServiceImpl manages resources and decides internally whether to do it per task executor, task, job or operator. It can contain network stack, e,g, netty connections etc. In case of external service, it can hold partition manager, transport client etc. It is not enforced to have it per job by this contract or even to have it at all. ShuffleServiceImpl also does not need to depend on all TaskManagerServices, only create relevant inside, e.g. network.

class ShuffleServiceRegistry {
  <T extends ShuffleService> T getShuffleService(id);
 registerShuffleService(ShuffleService, id);
  deregisterShuffleService(id); // remove and close ShuffleService
  close(); // close all
}

ShuffleServiceRegistry is just a generic container of all available ShuffleService’s. It could be part of TaskManagerServices instead of NetworkEnvironment which could go into specific ShuffleServiceImpl.

I might still miss some details, I would appreciate any feedback.

Best,
Andrey

> On 28 Nov 2018, at 08:59, zhijiang <wangzhijiang999@xxxxxxxxxx.INVALID> wrote:
> 
> Hi all,
> 
> I adjusted the umbrella jira [1] and corresponding google doc [2] to narrow down the scope of introducing pluggable shuffle manager architecture as the first step. 
> Welcome further feedbacks and suggestions, then I would create specific subtasks for it to forward.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10653
> 
> [2] https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing
> ------------------------------------------------------------------
> 发件人:zhijiang <wangzhijiang999@xxxxxxxxxx.INVALID>
> 发送时间:2018年11月1日(星期四) 17:19
> 收件人:dev <dev@xxxxxxxxxxxxxxxx>; Jin Sun <isunjin@xxxxxxxxx>
> 抄 送:Nico Kruber <nico@xxxxxxxxxxxxxxxxx>; Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>; Stephan Ewen <sewen@xxxxxxxxxx>
> 主 题:回复:[DISCUSS] Proposal of external shuffle service
> 
> Thanks for the efficient response till!
> 
> Thanks sunjin for the good feedbacks, we will further confirm with the comments then! :)
> ------------------------------------------------------------------
> 发件人:Jin Sun <isunjin@xxxxxxxxx>
> 发送时间:2018年11月1日(星期四) 06:42
> 收件人:dev <dev@xxxxxxxxxxxxxxxx>
> 抄 送:Zhijiang(wangzhijiang999) <wangzhijiang999@xxxxxxxxxx>; Nico Kruber <nico@xxxxxxxxxxxxxxxxx>; Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>; Stephan Ewen <sewen@xxxxxxxxxx>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
> 
> Thanks Zhijiang for the proposal. I like the idea of external shuffle service, have left some comments on the document. 
> 
>> On Oct 31, 2018, at 2:26 AM, Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:
>> 
>> Thanks for the update Zhijiang! The community is currently quite busy with
>> the next Flink release. I hope that we can finish the release in two weeks.
>> After that people will become more responsive again.
>> 
>> Cheers,
>> Till
>> 
>> On Wed, Oct 31, 2018 at 7:49 AM zhijiang <wangzhijiang999@xxxxxxxxxx> wrote:
>> 
>>> I already created the umbrella jira [1] for this improvement, and attched
>>> the design doc [2] in this jira.
>>> 
>>> Welcome for further discussion about the details.
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-10653
>>> [2]
>>> https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing
>>> 
>>> 
>>> <https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing>
>>> Best,
>>> Zhijiang
>>> 
>>> ------------------------------------------------------------------
>>> 发件人:Zhijiang(wangzhijiang999) <wangzhijiang999@xxxxxxxxxx.INVALID>
>>> 发送时间:2018年9月11日(星期二) 15:21
>>> 收件人:dev <dev@xxxxxxxxxxxxxxxx>
>>> 抄 送:dev <dev@xxxxxxxxxxxxxxxx>
>>> 主 题:回复:[DISCUSS] Proposal of external shuffle service
>>> 
>>> Many thanks Till!
>>> 
>>> 
>>> I would create a JIRA for this feature and design a document attched with it.
>>> I will let you know after ready! :)
>>> 
>>> Best,
>>> Zhijiang
>>> 
>>> 
>>> ------------------------------------------------------------------
>>> 发件人:Till Rohrmann <trohrmann@xxxxxxxxxx>
>>> 发送时间:2018年9月7日(星期五) 22:01
>>> 收件人:Zhijiang(wangzhijiang999) <wangzhijiang999@xxxxxxxxxx>
>>> 抄 送:dev <dev@xxxxxxxxxxxxxxxx>
>>> 主 题:Re: [DISCUSS] Proposal of external shuffle service
>>> 
>>> The rough plan sounds good Zhijiang. I think we should continue with what
>>> you've proposed: Open a JIRA issue and creating a design document which
>>> outlines the required changes a little bit more in detail. Once this is
>>> done, we should link the design document in the JIRA issue and post it here
>>> for further discussion.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
>>> wangzhijiang999@xxxxxxxxxx> wrote:
>>> 
>>>> Glad to receive your positive feedbacks Till!
>>>> 
>>>> Actually our motivation is to support batch job well as you mentioned.
>>>> 
>>>> For output level, flink already has the Subpartition abstraction(writer),
>>>> and currently there are PipelinedSubpartition(memory output) and
>>>> SpillableSubpartition(one-sp-one-file output) implementations. We can
>>>> extend this abstraction to realize other persistent outputs (e.g.
>>>> sort-merge-file).
>>>> 
>>>> For transport level(shuffle service), the current SubpartitionView
>>>> abstraction(reader) seems as the brige linked with the output level, then
>>> 
>>>> the view can understand and read the different output formats. The current
>>>> NetworkEnvironment seems take the role of internal shuffle service in
>>>> TaskManager and the transport server is realized by netty inside. This
>>> 
>>>> component can also be started in other external containers like NodeManager
>>>> of yarn to take the role of external shuffle service. Further we can
>>> 
>>>> abstract to extend the shuffle service for transporting outputs by http or
>>> 
>>>> rdma instead of current netty.  This abstraction should provide the way for
>>>> output registration in order to read the results correctly, similar with
>>>> current SubpartitionView.
>>>> 
>>>> The above is still a rough idea. Next I plan to create a feature jira to
>>>> cover the related changes if possible. It would be better if getting help
>>>> from related committers to review the detail designs together.
>>>> 
>>>> Best,
>>>> Zhijiang
>>>> 
>>>> ------------------------------------------------------------------
>>>> 发件人:Till Rohrmann <trohrmann@xxxxxxxxxx>
>>>> 发送时间:2018年8月29日(星期三) 17:36
>>>> 收件人:dev <dev@xxxxxxxxxxxxxxxx>; Zhijiang(wangzhijiang999) <
>>>> wangzhijiang999@xxxxxxxxxx>
>>>> 主 题:Re: [DISCUSS] Proposal of external shuffle service
>>>> 
>>>> Thanks for starting this design discussion Zhijiang!
>>>> 
>>>> I really like the idea to introduce a ShuffleService abstraction which
>>> 
>>>> allows to have different implementations depending on the actual use case.
>>> 
>>>> Especially for batch jobs I can clearly see the benefits of persisting the
>>>> results somewhere else.
>>>> 
>>>> Do you already know which interfaces we need to extend and where to
>>>> introduce new abstractions?
>>>> 
>>>> Cheers,
>>>> Till
>>>> 
>>>> On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
>>>> <wangzhijiang999@xxxxxxxxxx.invalid> wrote:
>>>> Hi all!
>>>> 
>>> 
>>>> The shuffle service is responsible for transporting upstream produced data
>>>> to the downstream side. In flink, the NettyServer is used for network
>>> 
>>>> transport service and this component is started in the TaskManager process.
>>>> That means the TaskManager can support internal shuffle service which
>>>> exists some concerns:
>>>> 1. If a task finishes, the ResultPartition of this task still retains
>>>> registered in TaskManager, because the output buffers have to be
>>>> transported by internal shuffle service in TaskManager. That means the
>>>> TaskManager can not be released by ResourceManager until ResultPartition
>>>> released. It may waste container resources and can not support well for
>>>> dynamic resource scenarios.
>>>> 2. If we want to expand another shuffle service implementation, the
>>>> current mechanism is not easy to handle, because the output level (result
>>>> partition) and transport level (shuffle service) are not divided clearly
>>>> and loss of abstraction to be extended.
>>>> 
>>>> For above considerations, we propose the external shuffle service which
>>>> can be deployed on any other external contaienrs, e.g. NodeManager
>>> 
>>>> container in yarn. Then the TaskManager can be released ASAP ifneeded when
>>>> all the internal tasks finished. The persistent output files of these
>>>> finished tasks can be served to transport by external shuffle service in
>>>> the same machine.
>>>> 
>>>> Further we can abstract both of the output level and transport level to
>>> 
>>>> support different implementations. e.g. We realized merging the data of all
>>> 
>>>> the subpartitions into limited persistent local files for disk improvements
>>>> in some scenarios instead of one-subpartiton-one-file.
>>>> 
>>>> I know it may be a big work for doing this, and I just point out some
>>>> ideas, and wish getting any feedbacks from you!
>>>> 
>>>> Best,
>>>> Zhijiang
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
> 
>