Shortcuts

Process Groups

This module implements fault tolerant process groups that can be reconfigured and resized at runtime.

These extend the standard PyTorch ProcessGroup API and can be used in most places that would accept a standard process group. As these can change size at runtime users need to take care to not assume a static rank or world size.

class torchft.process_group.ErrorSwallowingProcessGroupWrapper(pg: ProcessGroup)[source]

Bases: ProcessGroupWrapper

This is a wrapper around any ProcessGroup that will swallow errors and return dummy results on error.

This is intended to allow handling errors outside of the training loop to avoid having to modify modeling code to support error handling.

After an error occurs all future operations will be skipped until the process group is reconfigured via configure.

allreduce(*args, **kwargs)[source]

Overloaded function.

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work

  2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

  3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

error() Optional[Exception][source]

Returns the error that was reported to this process group.

Returns:

exception that was reported

report_error(e: Exception) None[source]

Report an error to this process group. This will cause all future operations to be skipped until the process group is reconfigured via configure.

Parameters:

e – exception to report

class torchft.process_group.ManagedProcessGroup(manager: Manager)[source]

Bases: ProcessGroupWrapper

This is a wrapper around any ProcessGroup that is managed by a torchft Manager.

This uses the ProcessGroup that is configured in the Manager. The world size is dynamic and will report the number of active particpants in the quorum to the model.

Any errors will be asynchronously reported to the manager and only successes will be returned to the caller.

allreduce(*args, **kwargs)[source]

Overloaded function.

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work

  2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

  3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]
class torchft.process_group.ProcessGroup(*args: object, **kwargs: object)[source]

Bases: ProcessGroup

allgather(*args, **kwargs)[source]

Overloaded function.

  1. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[list[torch.Tensor]], input_tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f06a7194e70>) -> c10d::Work

  2. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[torch.Tensor], input_tensor: torch.Tensor) -> c10d::Work

allreduce(*args, **kwargs)[source]

Overloaded function.

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work

  2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

  3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

broadcast(*args, **kwargs)[source]

Overloaded function.

  1. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f06caf11230>) -> c10d::Work

  2. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: int) -> c10d::Work

broadcast_one(tensor: Tensor, root: int) Work[source]
configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

getBackendName() str[source]
property group_name: str

(Gets this process group name. It’s cluster unique)

register(name: str) ProcessGroup[source]

Registers the process group with the global registry. This enables usage with things like functional_collectives which are compilable.

This should only be called once.

Parameters:

name – name must be a unique name for this process group

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]
unregister() None[source]

Unregisters the process group with the global registry.

Must be registered first.

class torchft.process_group.ProcessGroupBaby(timeout: Union[float, timedelta] = 60.0)[source]

Bases: ProcessGroup

This is a process group that runs the underlying process group in a subprocess. Since it’s running in a subprocess all tensors need to be in shared memory or will be moved to shared memory. CUDA tensors are implicitly share able and don’t need any changes.

WORK_CLASS

alias of _BabyWork

allreduce(*args, **kwargs)[source]

Overloaded function.

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work

  2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

  3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]
class torchft.process_group.ProcessGroupBabyGloo(timeout: Union[float, timedelta] = 60.0)[source]

Bases: ProcessGroupBaby

This is a ProcessGroup that runs Gloo in a subprocess.

For most use cases you should prefer ProcessGroupGloo or ProcessGroupBabyNCCL.

getBackendName() str[source]
class torchft.process_group.ProcessGroupBabyNCCL(timeout: Union[float, timedelta] = 60.0)[source]

Bases: ProcessGroupBaby

This is a ProcessGroup that runs NCCL in a subprocess.

For the NCCL backend, extra memory will be used by the subprocesses CUDA context compared to running NCCL in the main process. This is typically around ~1GB.

The returned Work objects only synchronize on the cuda stream and not on the CPU side. This works by passing CUDA Events between the processes. To do a CPU synchronize, call torch.cuda.synchronize() after wait().

WARNING: If the child process is killed while an operation is running, CUDA tensors may leak in the current PyTorch implementation. TODO fix

WORK_CLASS

alias of _BabyWorkNCCL

getBackendName() str[source]
class torchft.process_group.ProcessGroupDummy(rank: int, world: int)[source]

Bases: ProcessGroup

This process group discards all data passed to it and returns success. This is intended for rare cases where we want to discard certain operations without modifying the underlying library.

This PG only supports world_size of 1.

allgather(*args, **kwargs)[source]

Overloaded function.

  1. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[list[torch.Tensor]], input_tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f06a7194e70>) -> c10d::Work

  2. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[torch.Tensor], input_tensor: torch.Tensor) -> c10d::Work

allreduce(*args, **kwargs)[source]

Overloaded function.

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work

  2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

  3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

broadcast(*args, **kwargs)[source]

Overloaded function.

  1. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f06caf11230>) -> c10d::Work

  2. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: int) -> c10d::Work

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

getBackendName() str[source]
size(self: torch._C._distributed_c10d.ProcessGroup) int[source]
class torchft.process_group.ProcessGroupGloo(timeout: timedelta = datetime.timedelta(seconds=60))[source]

Bases: ProcessGroupWrapper

This is a reconfigurable version of ProcessGroupGloo.

getBackendName() str[source]
class torchft.process_group.ProcessGroupNCCL(pg: Optional[ProcessGroup] = None)[source]

Bases: ProcessGroupWrapper

This is a reconfigurable version of ProcessGroupNCCL.

WARNING: this may result in deadlocks due to NCCL error handling. This is provided for completeness but your mileage may vary.

TODO: verify shutdown correctness with latest NCCL. This currently will call abort when reconfiguring, we need to ensure this is safe.

getBackendName() str[source]
class torchft.process_group.ProcessGroupWrapper(pg: Optional[ProcessGroup] = None)[source]

Bases: ProcessGroup

This is a wrapper around any ProcessGroup with a reconfiguration method.

allgather(*args, **kwargs)[source]

Overloaded function.

  1. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[list[torch.Tensor]], input_tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f06a7194e70>) -> c10d::Work

  2. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: list[torch.Tensor], input_tensor: torch.Tensor) -> c10d::Work

allreduce(*args, **kwargs)[source]

Overloaded function.

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f06a7195470>) -> c10d::Work

  2. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

  3. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>) -> c10d::Work

broadcast(*args, **kwargs)[source]

Overloaded function.

  1. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: list[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f06caf11230>) -> c10d::Work

  2. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: int) -> c10d::Work

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

property parent: ProcessGroup
size(self: torch._C._distributed_c10d.ProcessGroup) int[source]
torchft.process_group.create_store_client(store_addr: str) Store[source]

Creates a PrefixStore(TCPStore(…)) client from an address in the format:

host:port/prefix

Ex: localhost:1234/my/prefix

torchft.process_group.extend_device_mesh(mesh: DeviceMesh, pg: ProcessGroup, name: str = 'dp', dim: int = 0) DeviceMesh[source]

This is a helper method to extend a traditional DeviceMesh with a torchft ProcessGroup for usage with DeviceMesh based APIs such as FSDPv2 with hybrid sharding.

Resizable PGs aren’t natively supported by DeviceMesh so we lie to DeviceMesh and say the PG is world size 1. This is fine as long as any numeric scaling is handled at the PG level.

Parameters:
  • mesh – The DeviceMesh to extend

  • pg – The ProcessGroup to add to the mesh

  • name – The name of the new dimension

  • dim – The dimension to add the ProcessGroup to

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources