Shortcuts

ignite.distributed#

Helper module to use distributed settings for multiple backends:

  • backends from native torch distributed configuration: “nccl”, “gloo”, “mpi”

  • XLA on TPUs via pytorch/xla

This module wraps common methods to fetch information about distributed configuration, initialize/finalize process group or spawn multiple processes.

Examples:

  • Example to spawn nprocs processes that run fn with args: spawn()

ignite.distributed.has_xla_support#

True if torch_xla package is found

ignite.distributed.all_gather(tensor)[source]#

Helper method to perform all gather operation.

Parameters

tensor (torch.Tensor or number or str) – tensor or number or str to collect across participating processes.

Returns

torch.Tensor of shape (world_size * tensor.shape[0], tensor.shape[1], …) or List of strings

Return type

Union[Tensor, Number, List[str]]

ignite.distributed.all_reduce(tensor, op='SUM')[source]#

Helper method to perform all reduce operation.

Parameters
  • tensor (torch.Tensor or number) – tensor or number to collect across participating processes.

  • op (str) – reduction operation, “SUM” by default. Possible values: “SUM”, “PRODUCT”, “MIN”, “MAX”, “AND”, “OR”.

Returns

torch.Tensor or number

Return type

Union[Tensor, Number]

ignite.distributed.available_backends()[source]#

Returns available backends.

Return type

Tuple[str]

ignite.distributed.backend()[source]#

Returns computation model’s backend.

  • None for no distributed configuration

  • “nccl” or “gloo” or “mpi” for native torch distributed configuration

  • “xla-tpu” for XLA distributed configuration

Returns

str or None

Return type

Optional[str]

ignite.distributed.barrier()[source]#

Helper method to synchronize all processes.

ignite.distributed.device()[source]#

Returns current device according to current distributed configuration.

  • torch.device(“cpu”) if no distributed configuration or native gloo distributed configuration

  • torch.device(“cuda:local_rank”) if native nccl distributed configuration

  • torch.device(“xla:index”) if XLA distributed configuration

Returns

torch.device

Return type

device

ignite.distributed.finalize()[source]#

Finalizes distributed configuration. For example, in case of native pytorch distributed configuration, it calls dist.destroy_process_group().

ignite.distributed.get_local_rank()[source]#

Returns local process rank within current distributed configuration. Returns 0 if no distributed configuration.

Return type

int

ignite.distributed.get_node_rank()[source]#

Returns node rank within current distributed configuration. Returns 0 if no distributed configuration.

Return type

int

ignite.distributed.get_ntasks_per_node()[source]#

Returns number of processes (or tasks) per node within current distributed configuration. Returns 1 if no distributed configuration.

Return type

int

ignite.distributed.get_num_nodes()[source]#

Returns number of nodes within current distributed configuration. Returns 1 if no distributed configuration.

Return type

int

ignite.distributed.get_rank()[source]#

Returns process rank within current distributed configuration. Returns 0 if no distributed configuration.

Return type

int

ignite.distributed.get_world_size()[source]#

Returns world size of current distributed configuration. Returns 1 if no distributed configuration.

Return type

int

ignite.distributed.hostname()[source]#

Returns host name for current process within current distributed configuration.

Return type

str

ignite.distributed.initialize(backend, **kwargs)[source]#

Initializes distributed configuration according to provided backend

Examples

Launch single node multi-GPU training with torch.distributed.launch utility.

# >>> python -m torch.distributed.launch --nproc_per_node=4 main.py

# main.py

import ignite.distributed as idist

def train_fn(local_rank, a, b, c):
    import torch.distributed as dist
    assert dist.is_available() and dist.is_initialized()
    assert dist.get_world_size() == 4

    device = idist.device()
    assert device == torch.device("cuda:{}".format(local_rank))


idist.initialize("nccl")
local_rank = idist.get_local_rank()
train_fn(local_rank, a, b, c)
idist.finalize()
Parameters
  • backend (str, optional) – backend: nccl, gloo, xla-tpu.

  • **kwargs

    acceptable kwargs according to provided backend:

    • ”nccl” or “gloo” : timeout(=timedelta(minutes=30))

ignite.distributed.model_name()[source]#

Returns distributed configuration name (given by ignite)

  • serial for no distributed configuration

  • native-dist for native torch distributed configuration

  • xla-dist for XLA distributed configuration

Return type

str

ignite.distributed.one_rank_only(rank=0, with_barrier=False)[source]#

Decorator to filter handlers wrt a rank number

Parameters
  • rank (int) – rank number of the handler (default: 0).

  • with_barrier (bool) – synchronisation with a barrier (default: False).

engine = ...

@engine.on(...)
@one_rank_only() # means @one_rank_only(rank=0)
def some_handler(_):
    ...

@engine.on(...)
@one_rank_only(rank=1)
def some_handler(_):
    ...
ignite.distributed.set_local_rank(index)[source]#

Method to hint the local rank in case if torch native distributed context is created by user without using initialize() or spawn().

Usage:

User set up torch native distributed process group

import ignite.distributed as idist

def run(local_rank, *args, **kwargs):

    idist.set_local_rank(local_rank)
    # ...
    dist.init_process_group(**dist_info)
    # ...
Parameters

index (int) – local rank or current process index

ignite.distributed.show_config()[source]#

Helper method to display distributed configuration via logging.

ignite.distributed.spawn(backend, fn, args, kwargs_dict=None, num_procs_per_node=1, **kwargs)[source]#

Spawns num_procs_per_node processes that run fn with args/kwargs_dict and initialize distributed configuration defined by backend.

Examples

  1. Launch single node multi-GPU training

# >>> python main.py

# main.py

import ignite.distributed as idist

def train_fn(local_rank, a, b, c, d=12):
    import torch.distributed as dist
    assert dist.is_available() and dist.is_initialized()
    assert dist.get_world_size() == 4

    device = idist.device()
    assert device == torch.device("cuda:{}".format(local_rank))


idist.spawn("nccl", train_fn, args=(a, b, c), kwargs_dict={"d": 23}, num_procs_per_node=4)
  1. Launch multi-node multi-GPU training

# >>> (node 0): python main.py --node_rank=0 --num_nodes=8 --master_addr=master --master_port=2222
# >>> (node 1): python main.py --node_rank=1 --num_nodes=8 --master_addr=master --master_port=2222
# >>> ...
# >>> (node 7): python main.py --node_rank=7 --num_nodes=8 --master_addr=master --master_port=2222

# main.py

import torch
import ignite.distributed as idist

def train_fn(local_rank, num_nodes, num_procs_per_node):
    import torch.distributed as dist
    assert dist.is_available() and dist.is_initialized()
    assert dist.get_world_size() == num_nodes * num_procs_per_node

    device = idist.device()
    assert device == torch.device("cuda:{}".format(local_rank))

idist.spawn(
    "nccl",
    train_fn,
    args=(num_nodes, num_procs_per_node),
    num_procs_per_node=num_procs_per_node,
    num_nodes=num_nodes,
    node_rank=node_rank,
    master_addr=master_addr,
    master_port=master_port
)
  1. Launch single node multi-TPU training (for example on Google Colab)

# >>> python main.py

# main.py

import ignite.distributed as idist

def train_fn(local_rank, a, b, c, d=12):
    import torch_xla.core.xla_model as xm
    assert xm.get_world_size() == 8

    device = idist.device()
    assert "xla" in device.type


idist.spawn("xla-tpu", train_fn, args=(a, b, c), kwargs_dict={"d": 23}, num_procs_per_node=8)
Parameters
  • backend (str) – backend to use: nccl, gloo, xla-tpu

  • fn (function) – function to called as the entrypoint of the spawned process. This function must be defined at the top level of a module so it can be pickled and spawned. This is a requirement imposed by multiprocessing. The function is called as fn(i, *args, **kwargs_dict), where i is the process index and args is the passed through tuple of arguments.

  • args (tuple) – arguments passed to fn.

  • kwargs_dict (Mapping) – kwargs passed to fn.

  • num_procs_per_node (int) – number of processes to spawn on a single node. Default, 1.

  • **kwargs

    acceptable kwargs according to provided backend:

    • ”nccl” or “gloo” : num_nodes (default, 1), node_rank (default, 0), master_addr
      (default, “127.0.0.1”), master_port (default, 2222), timeout to dist.init_process_group function
      and kwargs for mp.spawn function.
    • ”xla-tpu” : num_nodes (default, 1), node_rank (default, 0) and kwargs to xmp.spawn function.

ignite.distributed.sync()[source]#

Helper method to force this module to synchronize with current distributed context. This method should be used when distributed context is manually created or destroyed.