[docs]defsync(temporary:bool=False)->None:"""Helper method to force this module to synchronize with current distributed context. This method should be used when distributed context is manually created or destroyed. Args: temporary: If True, distributed model synchronization is done every call of ``idist.get_*`` methods. This may have a negative performance impact. """global_modelforcomp_model_clsinregistered_computation_models:ifcomp_model_cls==_SerialModel:continuemodel=comp_model_cls.create_from_context()ifmodelisnotNone:_set_model(model,temporary=temporary)return_model=_SerialModel()
[docs]defdevice()->torch.device:"""Returns current device according to current distributed configuration. - `torch.device("cpu")` if no distributed configuration or torch native gloo distributed configuration - `torch.device("cuda:local_rank")` if torch native nccl or horovod distributed configuration - `torch.device("xla:index")` if XLA distributed configuration Returns: torch.device .. versionchanged:: 0.4.2 Added Horovod distributed framework. """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.device()
[docs]defbackend()->Optional[str]:"""Returns computation model's backend. - `None` for no distributed configuration - "nccl" or "gloo" or "mpi" for native torch distributed configuration - "xla-tpu" for XLA distributed configuration - "horovod" for Horovod distributed framework Returns: str or None .. versionchanged:: 0.4.2 Added Horovod distributed framework. """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.backend()
[docs]defavailable_backends()->Tuple[str,...]:"""Returns available backends."""out:Tuple[str,...]=()forminregistered_computation_models:out+=m.available_backendsreturnout
[docs]defmodel_name()->str:"""Returns distributed configuration name (given by ignite) - `serial` for no distributed configuration - `native-dist` for native torch distributed configuration - `xla-dist` for XLA distributed configuration - `horovod-dist` for Horovod distributed framework .. versionchanged:: 0.4.2 `horovod-dist` will be returned for Horovod distributed framework. """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.name
[docs]defget_world_size()->int:"""Returns world size of current distributed configuration. Returns 1 if no distributed configuration."""if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.get_world_size()
[docs]defget_rank()->int:"""Returns process rank within current distributed configuration. Returns 0 if no distributed configuration."""if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.get_rank()
[docs]defget_local_rank()->int:"""Returns local process rank within current distributed configuration. Returns 0 if no distributed configuration."""if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.get_local_rank()
[docs]defget_nproc_per_node()->int:"""Returns number of processes (or tasks) per node within current distributed configuration. Returns 1 if no distributed configuration. """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.get_nproc_per_node()
[docs]defget_nnodes()->int:"""Returns number of nodes within current distributed configuration. Returns 1 if no distributed configuration. """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.get_nnodes()
[docs]defget_node_rank()->int:"""Returns node rank within current distributed configuration. Returns 0 if no distributed configuration. """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.get_node_rank()
[docs]defhostname()->str:"""Returns host name for current process within current distributed configuration."""returnsocket.gethostname()
[docs]defspawn(backend:str,fn:Callable,args:Tuple,kwargs_dict:Optional[Mapping]=None,nproc_per_node:int=1,**kwargs:Any,)->None:"""Spawns ``nproc_per_node`` processes that run ``fn`` with ``args``/``kwargs_dict`` and initialize distributed configuration defined by ``backend``. Args: backend: backend to use: `nccl`, `gloo`, `xla-tpu`, `horovod` fn: function to called as the entrypoint of the spawned process. This function must be defined at the top level of a module so it can be pickled and spawned. This is a requirement imposed by multiprocessing. The function is called as ``fn(i, *args, **kwargs_dict)``, where `i` is the process index and args is the passed through tuple of arguments. args: arguments passed to `fn`. kwargs_dict: kwargs passed to `fn`. nproc_per_node: number of processes to spawn on a single node. Default, 1. kwargs: acceptable kwargs according to provided backend: - | "nccl" or "gloo" : ``nnodes`` (default, 1), ``node_rank`` (default, 0), ``master_addr`` | (default, "127.0.0.1"), ``master_port`` (default, 2222), ``init_method`` (default, "env://"), | `timeout` to `dist.init_process_group`_ function | and kwargs for `mp.start_processes`_ function. - | "xla-tpu" : ``nnodes`` (default, 1), ``node_rank`` (default, 0) and kwargs to `xmp.spawn`_ function. - | "horovod": ``hosts`` (default, None) and other kwargs to `hvd_run`_ function. Arguments ``nnodes=1`` | and ``node_rank=0`` are tolerated and ignored, otherwise an exception is raised. Examples: 1) Launch single node multi-GPU training using torch native distributed framework .. code-block:: python # >>> python main.py # main.py import ignite.distributed as idist def train_fn(local_rank, a, b, c, d=12): import torch.distributed as dist assert dist.is_available() and dist.is_initialized() assert dist.get_world_size() == 4 device = idist.device() assert device == torch.device(f"cuda:{local_rank}") idist.spawn("nccl", train_fn, args=(a, b, c), kwargs_dict={"d": 23}, nproc_per_node=4) 2) Launch multi-node multi-GPU training using torch native distributed framework .. code-block:: python # >>> (node 0): python main.py --node_rank=0 --nnodes=8 --master_addr=master --master_port=2222 # >>> (node 1): python main.py --node_rank=1 --nnodes=8 --master_addr=master --master_port=2222 # >>> ... # >>> (node 7): python main.py --node_rank=7 --nnodes=8 --master_addr=master --master_port=2222 # main.py import torch import ignite.distributed as idist def train_fn(local_rank, nnodes, nproc_per_node): import torch.distributed as dist assert dist.is_available() and dist.is_initialized() assert dist.get_world_size() == nnodes * nproc_per_node device = idist.device() assert device == torch.device(f"cuda:{local_rank}") idist.spawn( "nccl", train_fn, args=(nnodes, nproc_per_node), nproc_per_node=nproc_per_node, nnodes=nnodes, node_rank=node_rank, master_addr=master_addr, master_port=master_port ) 3) Launch single node multi-TPU training (for example on Google Colab) using PyTorch/XLA .. code-block:: python # >>> python main.py # main.py import ignite.distributed as idist def train_fn(local_rank, a, b, c, d=12): import torch_xla.core.xla_model as xm assert xm.get_world_size() == 8 device = idist.device() assert "xla" in device.type idist.spawn("xla-tpu", train_fn, args=(a, b, c), kwargs_dict={"d": 23}, nproc_per_node=8) .. _dist.init_process_group: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group .. _mp.start_processes: https://pytorch.org/docs/stable/multiprocessing.html#torch.multiprocessing.spawn.spawn .. _xmp.spawn: https://pytorch.org/xla/release/1.6/index.html#torch_xla.distributed.xla_multiprocessing.spawn .. _hvd_run: https://horovod.readthedocs.io/en/latest/api.html#module-horovod.run .. versionchanged:: 0.4.2 ``backend`` now accepts `horovod` distributed framework. """_assert_backend(backend)ifkwargs_dictisNone:kwargs_dict={}forcomp_model_clsinregistered_computation_models:ifbackendnotincomp_model_cls.available_backends:continuecomp_model_cls.spawn(fn,args=args,kwargs_dict=kwargs_dict,nproc_per_node=nproc_per_node,backend=backend,**kwargs)
[docs]defall_reduce(tensor:Union[torch.Tensor,float],op:str="SUM",group:Optional[Union[Any,List[int]]]=None)->Union[torch.Tensor,float]:"""Helper method to perform all reduce operation. Args: tensor: tensor or number to collect across participating processes. op: reduction operation, "SUM" by default. Possible values: "SUM", "PRODUCT", "MIN", "MAX", "AND", "OR". Horovod backend supports only "SUM", "AVERAGE", "ADASUM", "MIN", "MAX", "PRODUCT". group: list of integer or the process group for each backend. If None, the default process group will be used. Returns: torch.Tensor or number .. versionchanged:: 0.4.11 added ``group`` """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)ifisinstance(group,list)andall(isinstance(item,int)foritemingroup):group=_model.new_group(group)return_model.all_reduce(tensor,op,group=group)
[docs]defall_gather(tensor:Union[torch.Tensor,float,str,Any],group:Optional[Union[Any,List[int]]]=None)->Union[torch.Tensor,float,List[float],List[str],List[Any]]:"""Helper method to perform all gather operation. Args: tensor: tensor or number or str to collect across participating processes. If tensor, it should have the same shape across processes. group: list of integer or the process group for each backend. If None, the default process group will be used. Returns: If input is a tensor, returns a torch.Tensor of shape ``(world_size * tensor.shape[0], tensor.shape[1], ...)``. If input is a number, a torch.Tensor of shape ``(world_size, )`` is returned and finally a list of strings is returned if input is a string. If current process does not belong to `group`, the very ``tensor`` is returned. .. versionchanged:: 0.4.11 added ``group`` """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)ifisinstance(group,list)andall(isinstance(item,int)foritemingroup):group=_model.new_group(group)return_model.all_gather(tensor,group=group)
[docs]defbroadcast(tensor:Union[torch.Tensor,float,str,None],src:int=0,safe_mode:bool=False)->Union[torch.Tensor,float,str]:"""Helper method to perform broadcast operation. Args: tensor: tensor or number or str to broadcast to participating processes. Make sure to respect data type of torch tensor input for all processes, otherwise execution will crash. Can use None for non-source data with ``safe_mode=True``. src: source rank. Default, 0. safe_mode: if True, non source input data can be ``None`` or anything (will be discarded), otherwise data type of the input ``tensor`` should be respected for all processes. Please, keep in mind, this mode is working only for dense tensors as source input if a tensor is provided. It also leads to some additional collectives before the broadcast, making it slower than without using this mode. Default, False. Returns: torch.Tensor or string or number Examples: .. code-block:: python y = None if idist.get_rank() == 0: t1 = torch.rand(4, 5, 6, device=idist.device()) s1 = "abc" x = 12.3456 y = torch.rand(1, 2, 3, device=idist.device()) else: t1 = torch.empty(4, 5, 6, device=idist.device()) s1 = "" x = 0.0 # Broadcast tensor t1 from rank 0 to all processes t1 = idist.broadcast(t1, src=0) assert isinstance(t1, torch.Tensor) # Broadcast string s1 from rank 0 to all processes s1 = idist.broadcast(s1, src=0) # >>> s1 = "abc" # Broadcast float number x from rank 0 to all processes x = idist.broadcast(x, src=0) # >>> x = 12.3456 # Broadcast any of those types from rank 0, # but other ranks do not define the placeholder y = idist.broadcast(y, src=0, safe_mode=True) assert isinstance(y, torch.Tensor) .. versionadded:: 0.4.2 .. versionchanged:: 0.4.5 added ``safe_mode`` """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.broadcast(tensor,src=src,safe_mode=safe_mode)
[docs]defbarrier()->None:"""Helper method to synchronize all processes."""if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)_model.barrier()
[docs]defnew_group(ranks:List[int],**kwargs:Any)->Any:"""Helper method to make group for each backend from ranks. Args: ranks: subset of ranks to be grouped. kwargs: acceptable kwargs according to provided backend: - | "nccl" or "gloo" : ``backend (=None)``, ``pg_options (=None)``. Examples: Launch single node multi-GPU training with ``torchrun`` utility. .. code-block:: python import ignite.distributed as idist ranks = [0, 1] group = idist.new_group(ranks) .. versionadded:: 0.4.11 """if_need_to_syncandisinstance(_model,_SerialModel):sync(temporary=True)return_model.new_group(ranks,**kwargs)
[docs]defset_local_rank(index:int)->None:"""Method to hint the local rank in case if torch native distributed context is created by user without using :meth:`~ignite.distributed.utils.initialize` or :meth:`~ignite.distributed.utils.spawn`. Args: index: local rank or current process index Examples: User set up torch native distributed process group .. code-block:: python import ignite.distributed as idist def run(local_rank, *args, **kwargs): idist.set_local_rank(local_rank) # ... dist.init_process_group(**dist_info) # ... """fromignite.distributed.comp_models.baseimportComputationModelComputationModel._ext_local_rank=index
def_set_model(model:Any,temporary:bool=False)->None:global_model,_need_to_sync_model=model_need_to_sync=Trueifnotisinstance(_model,_SerialModel)andnottemporary:_need_to_sync=Falsedef_assert_backend(backend:str)->None:backends=available_backends()ifbackendnotinbackends:raiseValueError(f"Backend should be one of '{backends}'")
[docs]definitialize(backend:str,**kwargs:Any)->None:"""Initializes distributed configuration according to provided ``backend`` Args: backend: backend: `nccl`, `gloo`, `xla-tpu`, `horovod`. kwargs: acceptable kwargs according to provided backend: - | "nccl" or "gloo" : ``timeout(=timedelta(minutes=30))``, ``init_method(=None)``, | ``rank(=None)``, ``world_size(=None)``. | By default, ``init_method`` will be "env://". See more info about parameters: `torch_init`_. - | "horovod" : comm(=None), more info: `hvd_init`_. Examples: Launch single node multi-GPU training with ``torchrun`` utility. .. code-block:: python # >>> torchrun --nproc_per_node=4 main.py # main.py import ignite.distributed as idist def train_fn(local_rank, a, b, c): import torch.distributed as dist assert dist.is_available() and dist.is_initialized() assert dist.get_world_size() == 4 device = idist.device() assert device == torch.device(f"cuda:{local_rank}") backend = "nccl" # or "gloo" or "horovod" or "xla-tpu" idist.initialize(backend) # or for torch native distributed on Windows: # idist.initialize("nccl", init_method="file://tmp/shared") local_rank = idist.get_local_rank() train_fn(local_rank, a, b, c) idist.finalize() .. _torch_init: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group .. _hvd_init: https://horovod.readthedocs.io/en/latest/api.html#module-horovod.torch .. versionchanged:: 0.4.2 ``backend`` now accepts `horovod` distributed framework. .. versionchanged:: 0.4.5 ``kwargs`` now accepts ``init_method``, ``rank``, ``world_size`` for PyTorch native distributed backend. """ifnot(has_xla_supportorhas_native_dist_supportorhas_hvd_support):# nothing to do => serial model# maybe warn about thisreturn_assert_backend(backend)forcomp_model_clsinregistered_computation_models:ifbackendnotincomp_model_cls.available_backends:continue_set_model(comp_model_cls(backend,**kwargs))
[docs]deffinalize()->None:"""Finalizes distributed configuration. For example, in case of native pytorch distributed configuration, it calls ``dist.destroy_process_group()``. """_model.finalize()_set_model(_SerialModel())
[docs]defone_rank_only(rank:int=0,with_barrier:bool=False)->Callable:"""Decorator to filter handlers wrt a rank number Args: rank: rank number of the handler (default: 0). with_barrier: synchronisation with a barrier (default: False). Examples: .. code-block:: python engine = ... @engine.on(...) @one_rank_only() # means @one_rank_only(rank=0) def some_handler(_): ... @engine.on(...) @one_rank_only(rank=1) def some_handler(_): ... """def_one_rank_only(func:Callable)->Callable:@wraps(func)defwrapper(*args:Any,**kwargs:Any)->Optional[Any]:ret=Noneifget_rank()==rank:ret=func(*args,**kwargs)ifwith_barrier:barrier()returnretreturnwrapperreturn_one_rank_only
[docs]@contextmanagerdefone_rank_first(rank:int=0,local:bool=False)->Any:"""Context manager that ensures a specific rank runs first before others in a distributed environment. Args: rank: rank of the process that should execute the code block inside the context manager first. Default, 0. local: flag to specify local rank or global rank. If True ``rank`` argument will define a local rank to run first. Default, False Examples: .. code-block:: python def download_dataset(): ... with idist.one_rank_first(): ds = download_dataset() dp = ds[0] .. versionadded:: 0.4.13 """current_rank=get_local_rank()iflocalelseget_rank()size=get_nproc_per_node()iflocalelseget_world_size()ifrank>=sizeorrank<0:raiseValueError(f"rank should be between 0 and {size-1}, but given {rank}")ifcurrent_rank!=rank:barrier()yieldifcurrent_rank==rank:barrier()