fromtypingimportDict,List,Optional,Unionimporttorchfromtorch._C._distributed_rpcimport_TensorPipeRpcBackendOptionsBasefrom.importconstantsasrpc_contantsDeviceType=Union[int,str,torch.device]__all__=["TensorPipeRpcBackendOptions"]def_to_device(device:DeviceType)->torch.device:device=torch.device(device)ifdevice.type!="cuda":raiseValueError("`set_devices` expect a list of CUDA devices, but got "f"device type {device.type}.")returndevicedef_to_device_map(device_map:Dict[DeviceType,DeviceType])->Dict[torch.device,torch.device]:full_device_map:Dict[torch.device,torch.device]={}reverse_map:Dict[torch.device,torch.device]={}fork,vindevice_map.items():k,v=torch.device(k),torch.device(v)ifvinreverse_map:raiseValueError("`device_map` only supports 1-to-1 mapping, "f"trying to map {k} and {reverse_map[v]} to {v}")full_device_map[k]=vreverse_map[v]=kreturnfull_device_mapdef_to_device_list(devices:List[DeviceType])->List[torch.device]:returnlist(map(_to_device,devices))
[docs]classTensorPipeRpcBackendOptions(_TensorPipeRpcBackendOptionsBase):r""" The backend options for :class:`~torch.distributed.rpc.TensorPipeAgent`, derived from :class:`~torch.distributed.rpc.RpcBackendOptions`. Args: num_worker_threads (int, optional): The number of threads in the thread-pool used by :class:`~torch.distributed.rpc.TensorPipeAgent` to execute requests (default: 16). rpc_timeout (float, optional): The default timeout, in seconds, for RPC requests (default: 60 seconds). If the RPC has not completed in this timeframe, an exception indicating so will be raised. Callers can override this timeout for individual RPCs in :meth:`~torch.distributed.rpc.rpc_sync` and :meth:`~torch.distributed.rpc.rpc_async` if necessary. init_method (str, optional): The URL to initialize the distributed store used for rendezvous. It takes any value accepted for the same argument of :meth:`~torch.distributed.init_process_group` (default: ``env://``). device_maps (Dict[str, Dict], optional): Device placement mappings from this worker to the callee. Key is the callee worker name and value the dictionary (``Dict`` of ``int``, ``str``, or ``torch.device``) that maps this worker's devices to the callee worker's devices. (default: ``None``) devices (List[int, str, or ``torch.device``], optional): all local CUDA devices used by RPC agent. By Default, it will be initialized to all local devices from its own ``device_maps`` and corresponding devices from its peers' ``device_maps``. When processing CUDA RPC requests, the agent will properly synchronize CUDA streams for all devices in this ``List``. """def__init__(self,*,num_worker_threads:int=rpc_contants.DEFAULT_NUM_WORKER_THREADS,rpc_timeout:float=rpc_contants.DEFAULT_RPC_TIMEOUT_SEC,init_method:str=rpc_contants.DEFAULT_INIT_METHOD,device_maps:Optional[Dict[str,Dict[DeviceType,DeviceType]]]=None,devices:Optional[List[DeviceType]]=None,_transports:Optional[List]=None,_channels:Optional[List]=None,):full_device_maps=({}ifdevice_mapsisNoneelse{k:_to_device_map(v)fork,vindevice_maps.items()})full_device_list=[]ifdevicesisNoneelse_to_device_list(devices)super().__init__(num_worker_threads,_transports,_channels,rpc_timeout,init_method,full_device_maps,full_device_list,)
[docs]defset_device_map(self,to:str,device_map:Dict[DeviceType,DeviceType]):r""" Set device mapping between each RPC caller and callee pair. This function can be called multiple times to incrementally add device placement configurations. Args: to (str): Callee name. device_map (Dict of int, str, or torch.device): Device placement mappings from this worker to the callee. This map must be invertible. Example: >>> # xdoctest: +SKIP("distributed") >>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1') """full_device_map=_to_device_map(device_map)curr_device_maps=super().device_mapsiftoincurr_device_maps:fork,vinfull_device_map.items():ifkincurr_device_maps[to]andv!=curr_device_maps[to][k]:raiseValueError("`set_device_map` only supports 1-to-1 mapping, trying"f" to map {k} to {v} and {curr_device_maps[to][k]}")super()._set_device_map(to,full_device_map)
[docs]defset_devices(self,devices:List[DeviceType]):r""" Set local devices used by the TensorPipe RPC agent. When processing CUDA RPC requests, the TensorPipe RPC agent will properly synchronize CUDA streams for all devices in this ``List``. Args: devices (List of int, str, or torch.device): local devices used by the TensorPipe RPC agent. """self.devices=_to_device_list(devices)
Docs
Access comprehensive developer documentation for PyTorch
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. As the current maintainers of this site, Facebook’s Cookies Policy applies. Learn more, including about available controls: Cookies Policy.