Process Groups¶
This module implements fault tolerant process groups that can be reconfigured and resized at runtime.
These extend the standard PyTorch ProcessGroup API and can be used in most places that would accept a standard process group. As these can change size at runtime users need to take care to not assume a static rank or world size.
- class torchft.process_group.ErrorSwallowingProcessGroupWrapper(pg: ProcessGroup)[source]¶
Bases:
ProcessGroupWrapper
This is a wrapper around any ProcessGroup that will swallow errors and return dummy results on error.
This is intended to allow handling errors outside of the training loop to avoid having to modify modeling code to support error handling.
After an error occurs all future operations will be skipped until the process group is reconfigured via
configure
.- allreduce(*args, **kwargs)[source]¶
Overloaded function.
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
- configure(store_addr: str, rank: int, world_size: int) None [source]¶
This reconfigures the ProcessGroup to use a new store, rank and world size.
Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1
This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.
- Parameters:
store_addr – address of the store to use
rank – rank of this process
world_size – world size of this process group
- class torchft.process_group.ManagedProcessGroup(manager: Manager)[source]¶
Bases:
ProcessGroupWrapper
This is a wrapper around any ProcessGroup that is managed by a torchft Manager.
This uses the ProcessGroup that is configured in the Manager. The world size is dynamic and will report the number of active particpants in the quorum to the model.
Any errors will be asynchronously reported to the manager and only successes will be returned to the caller.
- allreduce(*args, **kwargs)[source]¶
Overloaded function.
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
- class torchft.process_group.ProcessGroup(*args: object, **kwargs: object)[source]¶
Bases:
ProcessGroup
- allgather(*args, **kwargs)[source]¶
Overloaded function.
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[list[torch.Tensor]], input_tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f06a7194e70>) -> c10d::Work
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[torch.Tensor], input_tensor: torch.Tensor) -> c10d::Work
- allreduce(*args, **kwargs)[source]¶
Overloaded function.
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
- broadcast(*args, **kwargs)[source]¶
Overloaded function.
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f06caf11230>) -> c10d::Work
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: int) -> c10d::Work
- configure(store_addr: str, rank: int, world_size: int) None [source]¶
This reconfigures the ProcessGroup to use a new store, rank and world size.
Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1
This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.
- Parameters:
store_addr – address of the store to use
rank – rank of this process
world_size – world size of this process group
- property group_name: str¶
(Gets this process group name. It’s cluster unique)
- register(name: str) ProcessGroup [source]¶
Registers the process group with the global registry. This enables usage with things like functional_collectives which are compilable.
This should only be called once.
- Parameters:
name – name must be a unique name for this process group
- class torchft.process_group.ProcessGroupBaby(timeout: Union[float, timedelta] = 60.0)[source]¶
Bases:
ProcessGroup
This is a process group that runs the underlying process group in a subprocess. Since it’s running in a subprocess all tensors need to be in shared memory or will be moved to shared memory. CUDA tensors are implicitly share able and don’t need any changes.
- WORK_CLASS¶
alias of
_BabyWork
- allreduce(*args, **kwargs)[source]¶
Overloaded function.
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
- configure(store_addr: str, rank: int, world_size: int) None [source]¶
This reconfigures the ProcessGroup to use a new store, rank and world size.
Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1
This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.
- Parameters:
store_addr – address of the store to use
rank – rank of this process
world_size – world size of this process group
- class torchft.process_group.ProcessGroupBabyGloo(timeout: Union[float, timedelta] = 60.0)[source]¶
Bases:
ProcessGroupBaby
This is a ProcessGroup that runs Gloo in a subprocess.
For most use cases you should prefer ProcessGroupGloo or ProcessGroupBabyNCCL.
- class torchft.process_group.ProcessGroupBabyNCCL(timeout: Union[float, timedelta] = 60.0)[source]¶
Bases:
ProcessGroupBaby
This is a ProcessGroup that runs NCCL in a subprocess.
For the NCCL backend, extra memory will be used by the subprocesses CUDA context compared to running NCCL in the main process. This is typically around ~1GB.
The returned Work objects only synchronize on the cuda stream and not on the CPU side. This works by passing CUDA Events between the processes. To do a CPU synchronize, call torch.cuda.synchronize() after wait().
WARNING: If the child process is killed while an operation is running, CUDA tensors may leak in the current PyTorch implementation. TODO fix
- WORK_CLASS¶
alias of
_BabyWorkNCCL
- class torchft.process_group.ProcessGroupDummy(rank: int, world: int)[source]¶
Bases:
ProcessGroup
This process group discards all data passed to it and returns success. This is intended for rare cases where we want to discard certain operations without modifying the underlying library.
This PG only supports world_size of 1.
- allgather(*args, **kwargs)[source]¶
Overloaded function.
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[list[torch.Tensor]], input_tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f06a7194e70>) -> c10d::Work
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[torch.Tensor], input_tensor: torch.Tensor) -> c10d::Work
- allreduce(*args, **kwargs)[source]¶
Overloaded function.
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
- broadcast(*args, **kwargs)[source]¶
Overloaded function.
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f06caf11230>) -> c10d::Work
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: int) -> c10d::Work
- configure(store_addr: str, rank: int, world_size: int) None [source]¶
This reconfigures the ProcessGroup to use a new store, rank and world size.
Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1
This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.
- Parameters:
store_addr – address of the store to use
rank – rank of this process
world_size – world size of this process group
- class torchft.process_group.ProcessGroupGloo(timeout: timedelta = datetime.timedelta(seconds=60))[source]¶
Bases:
ProcessGroupWrapper
This is a reconfigurable version of ProcessGroupGloo.
- class torchft.process_group.ProcessGroupNCCL(pg: Optional[ProcessGroup] = None)[source]¶
Bases:
ProcessGroupWrapper
This is a reconfigurable version of ProcessGroupNCCL.
WARNING: this may result in deadlocks due to NCCL error handling. This is provided for completeness but your mileage may vary.
TODO: verify shutdown correctness with latest NCCL. This currently will call abort when reconfiguring, we need to ensure this is safe.
- class torchft.process_group.ProcessGroupWrapper(pg: Optional[ProcessGroup] = None)[source]¶
Bases:
ProcessGroup
This is a wrapper around any ProcessGroup with a reconfiguration method.
- allgather(*args, **kwargs)[source]¶
Overloaded function.
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[list[torch.Tensor]], input_tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f06a7194e70>) -> c10d::Work
allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[torch.Tensor], input_tensor: torch.Tensor) -> c10d::Work
- allreduce(*args, **kwargs)[source]¶
Overloaded function.
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work
- broadcast(*args, **kwargs)[source]¶
Overloaded function.
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f06caf11230>) -> c10d::Work
broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: int) -> c10d::Work
- configure(store_addr: str, rank: int, world_size: int) None [source]¶
This reconfigures the ProcessGroup to use a new store, rank and world size.
Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1
This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.
- Parameters:
store_addr – address of the store to use
rank – rank of this process
world_size – world size of this process group
- property parent: ProcessGroup¶
- torchft.process_group.create_store_client(store_addr: str) Store [source]¶
Creates a PrefixStore(TCPStore(…)) client from an address in the format:
host:port/prefix
Ex: localhost:1234/my/prefix
- torchft.process_group.extend_device_mesh(mesh: DeviceMesh, pg: ProcessGroup, name: str = 'dp', dim: int = 0) DeviceMesh [source]¶
This is a helper method to extend a traditional DeviceMesh with a torchft ProcessGroup for usage with DeviceMesh based APIs such as FSDPv2 with hybrid sharding.
Resizable PGs aren’t natively supported by DeviceMesh so we lie to DeviceMesh and say the PG is world size 1. This is fine as long as any numeric scaling is handled at the PG level.
- Parameters:
mesh – The DeviceMesh to extend
pg – The ProcessGroup to add to the mesh
name – The name of the new dimension
dim – The dimension to add the ProcessGroup to