Shortcuts

DistributedDataParallel

class torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False)[source]

Implements distributed data parallelism that is based on torch.distributed package at the module level.

This container parallelizes the application of the given module by splitting the input across the specified devices by chunking in the batch dimension. The module is replicated on each machine and each device, and each such replica handles a portion of the input. During the backwards pass, gradients from each node are averaged.

The batch size should be larger than the number of GPUs used locally.

See also: Basics and Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel. The same constraints on input as in torch.nn.DataParallel apply.

Creation of this class requires that torch.distributed to be already initialized, by calling torch.distributed.init_process_group().

DistributedDataParallel is proven to be significantly faster than torch.nn.DataParallel for single-node multi-GPU data parallel training.

To use DistributedDataParallel on a host with N GPUs, you should spawn up N processes, ensuring that each process exclusively works on a single GPU from 0 to N-1. This can be done by either setting CUDA_VISIBLE_DEVICES for every process or by calling:

>>> torch.cuda.set_device(i)

where i is from 0 to N-1. In each process, you should refer the following to construct this module:

>>> torch.distributed.init_process_group(
>>>     backend='nccl', world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)

In order to spawn up multiple processes per node, you can use either torch.distributed.launch or torch.multiprocessing.spawn.

Note

Please refer to PyTorch Distributed Overview for a brief introduction to all features related to distributed training.

Note

DistributedDataParallel can be used in conjunction with torch.distributed.optim.ZeroRedundancyOptimizer to reduce per-rank optimizer states memory footprint. Please refer to ZeroRedundancyOptimizer recipe for more details.

Note

nccl backend is currently the fastest and highly recommended backend when using GPUs. This applies to both single-node and multi-node distributed training.

Note

This module also supports mixed-precision distributed training. This means that your model can have different types of parameters such as mixed types of fp16 and fp32, the gradient reduction on these mixed types of parameters will just work fine.

Note

If you use torch.save on one process to checkpoint the module, and torch.load on some other processes to recover it, make sure that map_location is configured properly for every process. Without map_location, torch.load would recover the module to devices where the module was saved from.

Note

When a model is trained on M nodes with batch=N, the gradient will be M times smaller when compared to the same model trained on a single node with batch=M*N if the loss is summed (NOT averaged as usual) across instances in a batch (because the gradients between different nodes are averaged). You should take this into consideration when you want to obtain a mathematically equivalent training process compared to the local training counterpart. But in most cases, you can just treat a DistributedDataParallel wrapped model, a DataParallel wrapped model and an ordinary model on a single GPU as the same (E.g. using the same learning rate for equivalent batch size).

Note

Parameters are never broadcast between processes. The module performs an all-reduce step on gradients and assumes that they will be modified by the optimizer in all processes in the same way. Buffers (e.g. BatchNorm stats) are broadcast from the module in process of rank 0, to all other replicas in the system in every iteration.

Note

If you are using DistributedDataParallel in conjunction with the Distributed RPC Framework, you should always use torch.distributed.autograd.backward() to compute gradients and torch.distributed.optim.DistributedOptimizer for optimizing parameters.

Example:

>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # Setup optimizer
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>>     optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>>     optim.SGD,
>>>     optimizer_params,
>>>     lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>>     pred = ddp_model(rref.to_here())
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
>>>     dist_optim.step()

Note

To let a non-DDP model load a state dict from a DDP model, consume_prefix_in_state_dict_if_present() needs to be applied to strip the prefix “module.” in the DDP state dict before loading.

Warning

Constructor, forward method, and differentiation of the output (or a function of the output of this module) are distributed synchronization points. Take that into account in case different processes might be executing different code.

Warning

This module assumes all parameters are registered in the model by the time it is created. No parameters should be added nor removed later. Same applies to buffers.

Warning

This module assumes all parameters are registered in the model of each distributed processes are in the same order. The module itself will conduct gradient allreduce following the reverse order of the registered parameters of the model. In other words, it is users’ responsibility to ensure that each distributed process has the exact same model and thus the exact same parameter registration order.

Warning

This module allows parameters with non-rowmajor-contiguous strides. For example, your model may contain some parameters whose torch.memory_format is torch.contiguous_format and others whose format is torch.channels_last. However, corresponding parameters in different processes must have the same strides.

Warning

This module doesn’t work with torch.autograd.grad() (i.e. it will only work if gradients are to be accumulated in .grad attributes of parameters).

Warning

If you plan on using this module with a nccl backend or a gloo backend (that uses Infiniband), together with a DataLoader that uses multiple workers, please change the multiprocessing start method to forkserver (Python 3 only) or spawn. Unfortunately Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will likely experience deadlocks if you don’t change this setting.

Warning

Forward and backward hooks defined on module and its submodules won’t be invoked anymore, unless the hooks are initialized in the forward() method.

Warning

You should never try to change your model’s parameters after wrapping up your model with DistributedDataParallel. Because, when wrapping up your model with DistributedDataParallel, the constructor of DistributedDataParallel will register the additional gradient reduction functions on all the parameters of the model itself at the time of construction. If you change the model’s parameters afterwards, gradient redunction functions no longer match the correct set of parameters.

Warning

Using DistributedDataParallel in conjunction with the Distributed RPC Framework is experimental and subject to change.

Warning

The gradient_as_bucket_view mode does not yet work with Automatic Mixed Precision (AMP). AMP maintains stashed gradients that are used for unscaling gradients. With gradient_as_bucket_view=True, these stashed gradients will point to communication buckets in the first iteration. In the next iteration, the communication buckets are mutated and thus these stashed gradients will be unexpectedly mutated as well, which might lead to wrong results.

Parameters
  • module (Module) – module to be parallelized

  • device_ids (list of python:int or torch.device) –

    CUDA devices. 1) For single-device modules, device_ids can contain exactly one device id, which represents the only CUDA device where the input module corresponding to this process resides. Alternatively, device_ids can also be None. 2) For multi-device modules and CPU modules, device_ids must be None.

    When device_ids is None for both cases, both the input data for the forward pass and the actual module must be placed on the correct device. (default: None)

  • output_device (int or torch.device) – Device location of output for single-device CUDA modules. For multi-device modules and CPU modules, it must be None, and the module itself dictates the output location. (default: device_ids[0] for single-device modules)

  • broadcast_buffers (bool) – Flag that enables syncing (broadcasting) buffers of the module at beginning of the forward function. (default: True)

  • process_group – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by torch.distributed.init_process_group(), will be used. (default: None)

  • bucket_cap_mbDistributedDataParallel will bucket parameters into multiple buckets so that gradient reduction of each bucket can potentially overlap with backward computation. bucket_cap_mb controls the bucket size in MegaBytes (MB). (default: 25)

  • find_unused_parameters (bool) – Traverse the autograd graph from all tensors contained in the return value of the wrapped module’s forward function. Parameters that don’t receive gradients as part of this graph are preemptively marked as being ready to be reduced. Note that all forward outputs that are derived from module parameters must participate in calculating loss and later the gradient computation. If they don’t, this wrapper will hang waiting for autograd to produce gradients for those parameters. Any outputs derived from module parameters that are otherwise unused can be detached from the autograd graph using torch.Tensor.detach. (default: False)

  • check_reduction – This argument is deprecated.

  • gradient_as_bucket_view (bool) – This is a prototype feature and subject to changes. When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad() function in torch/optim/optimizer.py as a solution.

Variables

~DistributedDataParallel.module (Module) – the module to be parallelized.

Example:

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model, pg)
get_ddp_logging_data()[source]

This interface can be called after DistributedDataParallel() is constructed. It returns DDPLoggingData for debugging and analysis. More detailed explanation of the fields in DDPLoggingData are in torch/c10/util/Logging.h.

join(divide_by_initial_world_size=True, enable=True)[source]

A context manager to be used in conjunction with an instance of torch.nn.parallel.DistributedDataParallel to be able to train with uneven inputs across participating processes.

This context manager will keep track of already-joined DDP processes, and “shadow” the forward and backward passes by inserting collective communication operations to match with the ones created by non-joined DDP processes. This will ensure each collective call has a corresponding call by already-joined DDP processes, preventing hangs or errors that would otherwise happen when training with uneven inputs across processes.

Once all DDP processes have joined, the context manager will broadcast the model corresponding to the last joined process to all processes to ensure the model is the same across all processes (which is guaranteed by DDP).

To use this to enable training with uneven inputs across processes, simply wrap this context manager around your training loop. No further modifications to the model or data loading is required.

Warning

This module currently does not support custom distributed collective operations in the forward pass, such as SyncBatchNorm or other custom defined collectives in the model’s forward pass.

Parameters
  • divide_by_initial_world_size (bool) – If True, will divide gradients by the initial world_size DDP training was launched with. If False, will compute the effective world size (number of ranks that have not depleted their inputs yet) and divide gradients by that during allreduce. Set divide_by_initial_world_size=True to ensure every input sample including the uneven inputs have equal weight in terms of how much they contribute to the global gradient. This is achieved by always dividing the gradient by the initial world_size even when we encounter uneven inputs. If you set this to False, we divide the gradient by the remaining number of nodes. This ensures parity with training on a smaller world_size although it also means the uneven inputs would contribute more towards the global gradient. Typically, you would want to set this to True for cases where the last few inputs of your training job are uneven. In extreme cases, where there is a large discrepancy in the number of inputs, setting this to False might provide better results.

  • enable (bool) – Whether to enable uneven input detection or not. Pass in enable=False to disable in cases where you know that inputs are even across participating processes. Default is True.

Example:

>>>  import torch
>>>  import torch.distributed as dist
>>>  import os
>>>  import torch.multiprocessing as mp
>>>  import torch.nn as nn
>>>  # On each spawned worker
>>>  def worker(rank):
>>>      dist.init_process_group("nccl", rank=rank, world_size=2)
>>>      torch.cuda.set_device(rank)
>>>      model = nn.Linear(1, 1, bias=False).to(rank)
>>>      model = torch.nn.parallel.DistributedDataParallel(
>>>          model, device_ids=[rank], output_device=rank
>>>      )
>>>      # Rank 1 gets one more input than rank 0.
>>>      inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>>      with model.join():
>>>          for _ in range(5):
>>>              for inp in inputs:
>>>                  loss = model(inp).sum()
>>>                  loss.backward()
>>>  # Without the join() API, the below synchronization will hang
>>>  # blocking for rank 1's allreduce to complete.
>>>  torch.cuda.synchronize(device=rank)
no_sync()[source]

A context manager to disable gradient synchronizations across DDP processes. Within this context, gradients will be accumulated on module variables, which will later be synchronized in the first forward-backward pass exiting the context.

Example:

>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>>   for input in inputs:
>>>     ddp(input).backward()  # no synchronization, accumulate grads
>>> ddp(another_input).backward()  # synchronize grads
register_comm_hook(state, hook)[source]

Registers a communication hook which is an enhancement that provides a flexible hook to users where they can specify how DDP aggregates gradients across multiple workers.

This hook would be very useful for researchers to try out new ideas. For example, this hook can be used to implement several algorithms like GossipGrad and gradient compression which involve different communication strategies for parameter syncs while running Distributed DataParallel training.

Parameters
  • state (object) –

    Passed to the hook to maintain any state information during the training process. Examples include error feedback in gradient compression, peers to communicate with next in GossipGrad, etc.

    It is locally stored by each worker and shared by all the gradient tensors on the worker.

  • hook (callable) –

    Averages gradient tensors across workers and defined as: hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future:

    This function is called once the bucket is ready. The hook can perform whatever processing is needed and return a Future indicating completion of any async work (ex: allreduce). If the hook doesn’t perform any communication, it can also just return a completed Future. The Future should hold the new value of grad bucket’s tensors. Once a bucket is ready, c10d reducer would call this hook and use the tensors returned by the Future and copy grads to individual parameters.

    We also provide an API called get_future to retrieve a Future associated with the completion of c10d.ProcessGroup.work.

Warning

Grad bucket’s tensors will not be predivided by world_size. User is responsible to divide by the world_size in case of operations like allreduce.

Warning

DDP communication hook can only be registered once and should be registered before calling backward.

Warning

The Future object that hook returns should contain a result that has the same shape with the tensors inside grad bucket.

Warning

DDP communication hook does not support single-process multiple-device mode. Gradbucket tensors should consist of only a single tensor.

Warning

get_future API supports only NCCL backend and will return a torch._C.Future which is an internal type and should be used with caution. It can still be used by register_comm_hook API, but it is subject to some subtle differences compared to torch.futures.Future.

Warning

DDP communication hook is experimental and subject to change.

Example::

Below is an example of a noop hook that returns the same tensors.

>>> def noop(state: object, bucket: dist.GradBucket): -> torch.futures.Future
>>>     fut = torch.futures.Future()
>>>     fut.set_result(bucket.get_tensors())
>>>     return fut
>>> ddp.register_comm_hook(state = None, hook = noop)
Example::

Below is an example of a Parallel SGD algorithm where gradients are encoded before allreduce, and then decoded after allreduce.

>>> def encode_and_decode(state: object, bucket: dist.GradBucket): -> torch.futures.Future
>>>     tensors = [t / process_group.world_size for t in bucket.get_tensors()]
>>>     encoded_tensors = encode(tensors) # encode gradients
>>>     fut = process_group.allreduce(encoded_tensors).get_future()
>>>     # Define the then callback to decode.
>>>     def decode(fut):
>>>         decoded_tensors = decode(fut.value()) # decode gradients
>>>         return decoded_tensors
>>>     return fut.then(decode)
>>> ddp.register_comm_hook(state = None, hook = encode_and_decode)
set_ddp_runtime_logging_sample_rate(sample_rate)[source]

This interface allows users to set sample_rate of collecting runtime stats. The runtime stats will be recorded for the first 10 iterations, after 10 iteratons runtime stats will be recorded once every “sample_rate” training iterations. In default, runtime stats are recorded for the first 10 iterations, after 10 iterations runtime stats are recorded once every “kDDPRuntimeLoggingSampleRate=100” training iterations.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources