[docs]classParallel:"""Distributed launcher context manager to simplify distributed configuration setup for multiple backends: - backends from native torch distributed configuration: "nccl", "gloo" and "mpi" (if available) - XLA on TPUs via `pytorch/xla <https://github.com/pytorch/xla>`_ (if installed) - using `Horovod distributed framework <https://horovod.readthedocs.io>`_ (if installed) Namely, it can: 1) Spawn ``nproc_per_node`` child processes and initialize a processing group according to provided ``backend`` (useful for standalone scripts). 2) Only initialize a processing group given the ``backend`` (useful with tools like `torchrun`_, `horovodrun`_, etc). Args: backend: backend to use: `nccl`, `gloo`, `xla-tpu`, `horovod`. If None, no distributed configuration. nproc_per_node: optional argument, number of processes per node to specify. If not None, :meth:`~ignite.distributed.launcher.Parallel.run` will spawn ``nproc_per_node`` processes that run input function with its arguments. nnodes: optional argument, number of nodes participating in distributed configuration. If not None, :meth:`~ignite.distributed.launcher.Parallel.run` will spawn ``nproc_per_node`` processes that run input function with its arguments. Total world size is `nproc_per_node * nnodes`. This option is only supported by native torch distributed module. For other modules, please setup ``spawn_kwargs`` with backend specific arguments. node_rank: optional argument, current machine index. Mandatory argument if ``nnodes`` is specified and larger than one. This option is only supported by native torch distributed module. For other modules, please setup ``spawn_kwargs`` with backend specific arguments. master_addr: optional argument, master node TCP/IP address for torch native backends (`nccl`, `gloo`). Mandatory argument if ``nnodes`` is specified and larger than one. master_port: optional argument, master node port for torch native backends (`nccl`, `gloo`). Mandatory argument if ``master_addr`` is specified. init_method: optional argument to specify processing group initialization method for torch native backends (`nccl`, `gloo`). Default, "env://". See more info: `dist.init_process_group`_. spawn_kwargs: kwargs to ``idist.spawn`` function. Examples: 1) Single node or Multi-node, Multi-GPU training launched with `torchrun` or `horovodrun`_ tools Single node option with 4 GPUs .. code-block:: bash torchrun --nproc_per_node=4 main.py # or if installed horovod horovodrun -np=4 python main.py Multi-node option : 2 nodes with 8 GPUs each .. code-block:: bash ## node 0 torchrun --nnodes=2 --node_rank=0 --master_addr=master --master_port=3344 \ --nproc_per_node=8 main.py # or if installed horovod horovodrun -np 16 -H hostname1:8,hostname2:8 python main.py ## node 1 torchrun --nnodes=2 --node_rank=1 --master_addr=master --master_port=3344 \ --nproc_per_node=8 main.py User code is the same for both options: .. code-block:: python # main.py import ignite.distributed as idist def training(local_rank, config, **kwargs): # ... print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend()) # ... backend = "nccl" # or "horovod" if package is installed config = {"key": "value"} with idist.Parallel(backend=backend) as parallel: parallel.run(training, config, a=1, b=2) 2) Single node, Multi-GPU training launched with `python` .. code-block:: bash python main.py .. code-block:: python # main.py import ignite.distributed as idist def training(local_rank, config, **kwargs): # ... print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend()) # ... backend = "nccl" # or "horovod" if package is installed # no "init_method" was specified , "env://" will be used with idist.Parallel(backend=backend, nproc_per_node=4) as parallel: parallel.run(training, config, a=1, b=2) Initializing the process using ``file://`` .. code-block:: python with idist.Parallel(backend=backend, init_method='file:///d:/tmp/some_file', nproc_per_node=4) as parallel: parallel.run(training, config, a=1, b=2) Initializing the process using ``tcp://`` .. code-block:: python with idist.Parallel(backend=backend, init_method='tcp://10.1.1.20:23456', nproc_per_node=4) as parallel: parallel.run(training, config, a=1, b=2) 3) Single node, Multi-TPU training launched with `python` .. code-block:: bash python main.py .. code-block:: python # main.py import ignite.distributed as idist def training(local_rank, config, **kwargs): # ... print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend()) # ... config = {"key": "value"} with idist.Parallel(backend="xla-tpu", nproc_per_node=8) as parallel: parallel.run(training, config, a=1, b=2) 4) Multi-node, Multi-GPU training launched with `python`. For example, 2 nodes with 8 GPUs: Using torch native distributed framework: .. code-block:: bash # node 0 python main.py --node_rank=0 # node 1 python main.py --node_rank=1 .. code-block:: python # main.py import ignite.distributed as idist def training(local_rank, config, **kwargs): # ... print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend()) # ... dist_config = { "nproc_per_node": 8, "nnodes": 2, "node_rank": args.node_rank, "master_addr": "master", "master_port": 15000 } config = {"key": "value"} with idist.Parallel(backend="nccl", **dist_config) as parallel: parallel.run(training, config, a=1, b=2) .. _torchrun: https://pytorch.org/docs/stable/elastic/run.html#launcher-api .. _horovodrun: https://horovod.readthedocs.io/en/latest/api.html#module-horovod.run .. _dist.init_process_group: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group .. versionchanged:: 0.4.2 ``backend`` now accepts `horovod` distributed framework. .. versionchanged:: 0.4.5 ``init_method`` added. """def__init__(self,backend:Optional[str]=None,nproc_per_node:Optional[int]=None,nnodes:Optional[int]=None,node_rank:Optional[int]=None,master_addr:Optional[str]=None,master_port:Optional[int]=None,init_method:Optional[str]=None,**spawn_kwargs:Any,)->None:ifbackendisnotNone:ifbackendnotinidist.available_backends():raiseValueError(f"Unknown backend '{backend}'. Available backends: {idist.available_backends()}")else:arg_names=["nproc_per_node","nnodes","node_rank","master_addr","master_port"]arg_values=[nproc_per_node,nnodes,node_rank,master_addr,master_port]forname,valueinzip(arg_names,arg_values):ifvalueisnotNone:raiseValueError(f"If backend is None, argument '{name}' should be also None, but given {value}")self.backend=backendself._spawn_params=Noneself.init_method=init_methodifself.backendisnotNone:ifnproc_per_nodeisnotNone:self._spawn_params=self._setup_spawn_params(nproc_per_node,nnodes,node_rank,master_addr,master_port,init_method,**spawn_kwargs)# The logger will be setup after the idist.initialize() callself._logger=None@staticmethoddef_setup_spawn_params(nproc_per_node:int,nnodes:Optional[int]=None,node_rank:Optional[int]=None,master_addr:Optional[str]=None,master_port:Optional[int]=None,init_method:Optional[str]=None,**spawn_kwargs:Any,)->Dict:ifnproc_per_node<1:raiseValueError(f"Argument nproc_per_node should positive, but given {nproc_per_node}")ifnnodesisNone:nnodes=1ifnnodes<1:raiseValueError(f"Argument nnodes should positive, but given {nnodes}")ifnode_rankisNone:ifnnodes>1:raiseValueError("If number of nodes larger than one, arguments node_rank should be given")node_rank=0ifnode_rank>=nnodesornode_rank<0:raiseValueError(f"Argument node_rank should be between 0 and {nnodes-1}, but given {node_rank}")ifnnodes>1and(master_addrisNoneormaster_portisNone)andinit_methodisNone:raiseValueError("If number of nodes larger than one, arguments master_addr and master_port or init_method "f"should be specified, but given master_addr={master_addr}, master_port={master_port} and "f"init_method={init_method}.")params={"nproc_per_node":nproc_per_node,"nnodes":nnodes,"node_rank":node_rank,"master_addr":master_addr,"master_port":master_port,"init_method":init_method,}params.update(spawn_kwargs)return{k:vfork,vinparams.items()ifvisnotNone}
[docs]defrun(self,func:Callable,*args:Any,**kwargs:Any)->None:"""Execute ``func`` with provided arguments in distributed context. Args: func: function to execute. First argument of the function should be `local_rank` - local process index. args: positional arguments of ``func`` (without `local_rank`). kwargs: keyword arguments of ``func``. Examples: .. code-block:: python def training(local_rank, config, **kwargs): # ... print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend()) # ... config = {"key": "value"} with idist.Parallel(backend=backend) as parallel: parallel.run(training, config, a=1, b=2) """ifself._spawn_paramsisnotNoneandself.backendisnotNone:self._logger.info(# type: ignore[attr-defined]f"Spawn function '{func}' in {self._spawn_params['nproc_per_node']} processes")idist.spawn(self.backend,func,args=args,kwargs_dict=kwargs,**self._spawn_params)else:self._logger.info(f"- Run '{func}' in {idist.get_world_size()} processes")# type: ignore[attr-defined]local_rank=idist.get_local_rank()func(local_rank,*args,**kwargs)self._logger.info("End of run")# type: ignore[attr-defined]
def__enter__(self)->"Parallel":ifself.backendisnotNoneandself._spawn_paramsisNone:idist.initialize(self.backend,init_method=self.init_method)# The logger can be setup from now since idist.initialize() has been called (if needed)self._logger=setup_logger(__name__+"."+self.__class__.__name__)# type: ignore[assignment]ifself.backendisnotNone:ifself._spawn_paramsisNone:self._logger.info(# type: ignore[attr-defined]f"Initialized processing group with backend: '{self.backend}'")else:self._logger.info(# type: ignore[attr-defined]f"Initialized distributed launcher with backend: '{self.backend}'")msg="\n\t".join([f"{k}: {v}"fork,vinself._spawn_params.items()ifvisnotNone])self._logger.info(f"- Parameters to spawn processes: \n\t{msg}")# type: ignore[attr-defined]returnselfdef__exit__(self,*args:Any,**kwargs:Any)->None:if(self.backendisnotNone)andself._spawn_paramsisNone:self._logger.info(# type: ignore[attr-defined]f"Finalized processing group with backend: '{self.backend}'")idist.finalize()