importwarningsimporttorchfromtorch.cudaimportncclfromtorch._utilsimport_take_tensors,_flatten_dense_tensors, \
_unflatten_dense_tensors,_reorder_tensors_as,_get_device_index,_handle_complexfromtypingimportListdefbroadcast(tensor,devices=None,*,out=None):r"""Broadcasts a tensor to specified GPU devices. Args: tensor (Tensor): tensor to broadcast. Can be on CPU or GPU. devices (Iterable[torch.device, str or int], optional): an iterable of GPU devices, among which to broadcast. out (Sequence[Tensor], optional, keyword-only): the GPU tensors to store output results. .. note:: Exactly one of :attr:`devices` and :attr:`out` must be specified. Returns: - If :attr:`devices` is specified, a tuple containing copies of :attr:`tensor`, placed on :attr:`devices`. - If :attr:`out` is specified, a tuple containing :attr:`out` tensors, each containing a copy of :attr:`tensor`. """tensor=_handle_complex(tensor)ifnot((devicesisNone)^(outisNone)):raiseRuntimeError("Exactly one of 'devices' and 'out' must be specified, but got ""devices={} and out={}".format(devices,out))ifdevicesisnotNone:devices=[_get_device_index(d)fordindevices]returntorch._C._broadcast(tensor,devices)else:returntorch._C._broadcast_out(tensor,out)defbroadcast_coalesced(tensors,devices,buffer_size=10485760):"""Broadcasts a sequence tensors to the specified GPUs. Small tensors are first coalesced into a buffer to reduce the number of synchronizations. Args: tensors (sequence): tensors to broadcast. Must be on the same device, either CPU or GPU. devices (Iterable[torch.device, str or int]): an iterable of GPU devices, among which to broadcast. buffer_size (int): maximum size of the buffer used for coalescing Returns: A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`. """devices=[_get_device_index(d)fordindevices]tensors=[_handle_complex(t)fortintensors]returntorch._C._broadcast_coalesced(tensors,devices,buffer_size)
[docs]defreduce_add(inputs,destination=None):"""Sums tensors from multiple GPUs. All inputs should have matching shapes, dtype, and layout. The output tensor will be of the same shape, dtype, and layout. Args: inputs (Iterable[Tensor]): an iterable of tensors to add. destination (int, optional): a device on which the output will be placed (default: current device). Returns: A tensor containing an elementwise sum of all inputs, placed on the :attr:`destination` device. """destination=_get_device_index(destination,optional=True)input_size=inputs[0].size()root_index=None# index of input tensor that already is on the correct devicefori,inpinenumerate(inputs):assertinp.device.type!="cpu","reduce_add expects all inputs to be on GPUs"ifinp.get_device()==destination:root_index=iifinp.size()!=input_size:got='x'.join(str(x)forxininp.size())expected='x'.join(str(x)forxininput_size)raiseValueError("input {} has invalid size: got {}, but expected ""{}".format(i,got,expected))ifroot_indexisNone:raiseRuntimeError("reduce_add expects destination to be on the same GPU with one of the tensors")iflen(inputs)==1:returninputs[0]ifnccl.is_available(inputs):result=torch.empty_like(inputs[root_index])nccl.reduce(inputs,output=result,root=root_index)else:destination_device=torch.device(inputs[root_index].device.type,destination)nonroot=[tfori,tinenumerate(inputs)ifi!=root_index]# make a new tensor w/o cloneresult=inputs[root_index]+nonroot[0].to(device=destination_device,non_blocking=True)forotherinnonroot[1:]:result.add_(,non_blocking=True))returnresult
defreduce_add_coalesced(inputs,destination=None,buffer_size=10485760):"""Sums tensors from multiple GPUs. Small tensors are first coalesced into a buffer to reduce the number of synchronizations. Args: inputs (Iterable[Iterable[Tensor]]): iterable of iterables that contain tensors from a single device. destination (int, optional): a device on which the output will be placed (default: current device). buffer_size (int): maximum size of the buffer used for coalescing Returns: A tuple of tensors containing an elementwise sum of each group of inputs, placed on the ``destination`` device. """# TODO: When `len(inputs) == 1` and all inputs are on `destination`, just# return `inputs`.dense_tensors:List[List]=[[]for_ininputs]# shape (num_gpus, num_tensors)output=[]ref_order=[]# process sparse ones first since they may have different sizes on different gpusfortensor_at_gpusinzip(*inputs):ifall(t.is_sparsefortintensor_at_gpus):result=reduce_add(tensor_at_gpus,destination)# this will be sparse toooutput.append(result)ref_order.append(tensor_at_gpus[0])else:forcoll,tinzip(dense_tensors,tensor_at_gpus):coll.append(t.to_dense()ift.is_sparseelset)ref_order.append(dense_tensors[0][-1])itrs=[_take_tensors(tensors,buffer_size)fortensorsindense_tensors]# now the dense ones, which have consistent sizesforchunksinzip(*itrs):flat_tensors=[_flatten_dense_tensors(chunk)forchunkinchunks]# (num_gpus,)flat_result=reduce_add(flat_tensors,destination)fortin_unflatten_dense_tensors(flat_result,chunks[0]):# The unflattened tensors do not share storage, and we don't expose# base flat tensor anyways, so give them different version counters.# See NOTE [ Version Counter in comm.*_coalesced ]output.append(,ref_order))
[docs]defscatter(tensor,devices=None,chunk_sizes=None,dim=0,streams=None,*,out=None):"""Scatters tensor across multiple GPUs. Args: tensor (Tensor): tensor to scatter. Can be on CPU or GPU. devices (Iterable[torch.device, str or int], optional): an iterable of GPU devices, among which to scatter. chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on each device. It should match :attr:`devices` in length and sums to ``tensor.size(dim)``. If not specified, :attr:`tensor` will be divided into equal chunks. dim (int, optional): A dimension along which to chunk :attr:`tensor`. Default: ``0``. streams (Iterable[Stream], optional): an iterable of Streams, among which to execute the scatter. If not specified, the default stream will be utilized. out (Sequence[Tensor], optional, keyword-only): the GPU tensors to store output results. Sizes of these tensors must match that of :attr:`tensor`, except for :attr:`dim`, where the total size must sum to ``tensor.size(dim)``. .. note:: Exactly one of :attr:`devices` and :attr:`out` must be specified. When :attr:`out` is specified, :attr:`chunk_sizes` must not be specified and will be inferred from sizes of :attr:`out`. Returns: - If :attr:`devices` is specified, a tuple containing chunks of :attr:`tensor`, placed on :attr:`devices`. - If :attr:`out` is specified, a tuple containing :attr:`out` tensors, each containing a chunk of :attr:`tensor`. """tensor=_handle_complex(tensor)ifoutisNone:devices=[_get_device_index(d)fordindevices]returntuple(torch._C._scatter(tensor,devices,chunk_sizes,dim,streams))else:ifdevicesisnotNone:raiseRuntimeError("'devices' must not be specified when 'out' is specified, but ""got devices={}".format(devices))ifchunk_sizesisnotNone:raiseRuntimeError("'chunk_sizes' must not be specified when 'out' is specified, ""but got chunk_sizes={}".format(chunk_sizes))returntuple(torch._C._scatter_out(tensor,out,dim,streams))
defgather(tensors,dim=0,destination=None,*,out=None):r"""Gathers tensors from multiple GPU devices. Args: tensors (Iterable[Tensor]): an iterable of tensors to gather. Tensor sizes in all dimensions other than :attr:`dim` have to match. dim (int, optional): a dimension along which the tensors will be concatenated. Default: ``0``. destination (torch.device, str, or int, optional): the output device. Can be CPU or CUDA. Default: the current CUDA device. out (Tensor, optional, keyword-only): the tensor to store gather result. Its sizes must match those of :attr:`tensors`, except for :attr:`dim`, where the size must equal ``sum(tensor.size(dim) for tensor in tensors)``. Can be on CPU or CUDA. .. note:: :attr:`destination` must not be specified when :attr:`out` is specified. Returns: - If :attr:`destination` is specified, a tensor located on :attr:`destination` device, that is a result of concatenating :attr:`tensors` along :attr:`dim`. - If :attr:`out` is specified, the :attr:`out` tensor, now containing results of concatenating :attr:`tensors` along :attr:`dim`. """tensors=[_handle_complex(t)fortintensors]ifoutisNone:ifdestination==-1:warnings.warn('Using -1 to represent CPU tensor is deprecated. Please use a ''device object or string instead, e.g., "cpu".')destination=_get_device_index(destination,allow_cpu=True,optional=True)returntorch._C._gather(tensors,dim,destination)else:ifdestinationisnotNone:raiseRuntimeError("'destination' must not be specified when 'out' is specified, but ""got destination={}".format(destination))returntorch._C._gather_out(tensors,out,dim)
Access comprehensive developer documentation for PyTorch
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. As the current maintainers of this site, Facebook’s Cookies Policy applies. Learn more, including about available controls: Cookies Policy.