Shortcuts

Process Groups

This module implements fault tolerant process groups that can be reconfigured and resized at runtime.

These extend the standard PyTorch ProcessGroup API and can be used in most places that would accept a standard process group. As these can change size at runtime users need to take care to not assume a static rank or world size.

class torchft.process_group.ErrorSwallowingProcessGroupWrapper(pg: ProcessGroup)[source]

Bases: ProcessGroupWrapper

This is a wrapper around any ProcessGroup that will swallow errors and return dummy results on error.

This is intended to allow handling errors outside of the training loop to avoid having to modify modeling code to support error handling.

After an error occurs all future operations will be skipped until the process group is reconfigured via configure.

allreduce(tensors: List[Tensor], opts: object) Work[source]

Reduces the tensor data across all machines in such a way that all get the final result.

See torch.distributed.all_reduce for more details.

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

error() Optional[Exception][source]

Returns the error that was reported to this process group.

Returns:

exception that was reported

report_error(e: Exception) None[source]

Report an error to this process group. This will cause all future operations to be skipped until the process group is reconfigured via configure.

Parameters:

e – exception to report

class torchft.process_group.ManagedProcessGroup(manager: Manager)[source]

Bases: ProcessGroupWrapper

This is a wrapper around any ProcessGroup that is managed by a torchft Manager.

This uses the ProcessGroup that is configured in the Manager. The world size is dynamic and will report the number of active particpants in the quorum to the model.

Any errors will be asynchronously reported to the manager and only successes will be returned to the caller.

allreduce(tensors: List[Tensor], opts: object) Work[source]

Reduces the tensor data across all machines in such a way that all get the final result.

See torch.distributed.all_reduce for more details.

getBackendName() str[source]
size(self: torch._C._distributed_c10d.ProcessGroup) int[source]

Get the size of this process group.

class torchft.process_group.ProcessGroup(*args: object, **kwargs: object)[source]

Bases: ProcessGroup

abort() None[source]

Aborts the process group.

allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: AllgatherOptions) Work[source]

Gathers tensors from the whole group in a list.

See torch.distributed.all_gather for more details.

allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work[source]

Performs an allgather operation on coalesced tensors.

See torch.distributed.allgather_coalesced for more details.

allreduce(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work[source]

Reduces the tensor data across all machines in such a way that all get the final result.

See torch.distributed.all_reduce for more details.

allreduce_coalesced(tensors: List[Tensor], opts: AllreduceCoalescedOptions) Work[source]

Performs an all_reduce operation in a coalesced manner.

See torch.distributed.all_reduce_coalesced for more details.

alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work[source]

Performs an all_to_all operation.

See torch.distributed.all_to_all_single for more details.

barrier(opts: BarrierOptions) Work[source]

Synchronizes all processes.

See torch.distributed.barrier for more details.

broadcast(tensor_list: List[Tensor], opts: BroadcastOptions) Work[source]

Broadcasts the tensor to the whole group.

See torch.distributed.broadcast for more details.

broadcast_one(tensor: Tensor, root: int) Work[source]
configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

errored() Optional[Exception][source]

Whether an async error occured that requires reconfiguration.

getBackendName() str[source]
property group_name: str

(Gets this process group name. It’s cluster unique)

recv(tensors: List[Tensor], src_rank: int, tag: int) Work[source]

Receives a list of tensors from the process with rank rank.

See torch.distributed.recv for more details.

reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) Work[source]

Reduces, then scatters a list of tensors to all processes in a group.

See torch.distributed.reduce_scatter for more details.

reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work[source]

Performs a reduce-scatter operation on coalesced tensors.

See torch.distributed.reduce_scatter_tensor for more details.

register(name: str) ProcessGroup[source]

Registers the process group with the global registry. This enables usage with things like functional_collectives which are compilable.

This should only be called once.

Parameters:

name – name must be a unique name for this process group

send(tensors: List[Tensor], dst_rank: int, tag: int) Work[source]

Sends a list of tensors to the process with rank dst_rank.

See torch.distributed.send for more details.

shutdown() None[source]

Shuts down the process group.

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]

Get the size of this process group.

unregister() None[source]

Unregisters the process group with the global registry.

Must be registered first.

class torchft.process_group.ProcessGroupBaby(timeout: Union[float, timedelta] = 60.0)[source]

Bases: ProcessGroup

This is a process group that runs the underlying process group in a subprocess. Since it’s running in a subprocess all tensors need to be in shared memory or will be moved to shared memory. CUDA tensors are implicitly share able and don’t need any changes.

allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: AllgatherOptions) Work[source]

Gathers tensors from the whole group in a list.

See torch.distributed.all_gather for more details.

allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work[source]

Performs an allgather operation on coalesced tensors.

See torch.distributed.allgather_coalesced for more details.

allreduce(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work[source]

Reduces the tensor data across all machines in such a way that all get the final result.

See torch.distributed.all_reduce for more details.

allreduce_coalesced(tensors: List[Tensor], opts: Union[AllreduceCoalescedOptions, ReduceOp]) Work[source]

Performs an all_reduce operation in a coalesced manner.

See torch.distributed.all_reduce_coalesced for more details.

alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work[source]

Performs an all_to_all operation.

See torch.distributed.all_to_all_single for more details.

barrier(opts: BarrierOptions) Work[source]

Synchronizes all processes.

See torch.distributed.barrier for more details.

broadcast(tensor_list: List[Tensor], opts: BroadcastOptions) Work[source]

Broadcasts the tensor to the whole group.

See torch.distributed.broadcast for more details.

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

num_active_work() int[source]
recv(tensors: List[Tensor], src_rank: int, tag: int) Work[source]

Receives a list of tensors from the process with rank rank.

See torch.distributed.recv for more details.

reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) Work[source]

Reduces, then scatters a list of tensors to all processes in a group.

See torch.distributed.reduce_scatter for more details.

reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work[source]

Performs a reduce-scatter operation on coalesced tensors.

See torch.distributed.reduce_scatter_tensor for more details.

send(tensors: List[Tensor], dst_rank: int, tag: int) Work[source]

Sends a list of tensors to the process with rank dst_rank.

See torch.distributed.send for more details.

shutdown() None[source]

Shutdown the process group. This will kill the underlying process and close all queues.

This is a no-op if the process group is already shutdown.

ProcessGroup can be reconfigured after shutdown.

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]

Get the size of this process group.

class torchft.process_group.ProcessGroupBabyGloo(timeout: Union[float, timedelta] = 60.0)[source]

Bases: ProcessGroupBaby

This is a ProcessGroup that runs Gloo in a subprocess.

For most use cases you should prefer ProcessGroupGloo or ProcessGroupBabyNCCL.

getBackendName() str[source]
reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) None[source]

This function is a placeholder for the reduce_scatter operation in the ProcessGroupGloo class. However, this operation is not supported by the Gloo backend, and thus, calling this function will raise a RuntimeError.

Raises:
  • RuntimeError – Always raised since reduce_scatter is not

  • supported by ProcessGroupGloo.

reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) None[source]

This function is a placeholder for the reduce_scatter_tensor_coalesced operation in the ProcessGroupBabyGloo class. However, this operation is not supported by the Gloo backend, and thus, calling this function will raise a RuntimeError.

Raises:
  • RuntimeError – Always raised since reduce_scatter is not

  • supported by ProcessGroupBabyGloo.

class torchft.process_group.ProcessGroupBabyNCCL(timeout: Union[float, timedelta] = 60.0)[source]

Bases: ProcessGroupBaby

This is a ProcessGroup that runs NCCL in a subprocess.

For the NCCL backend, extra memory will be used by the subprocesses CUDA context compared to running NCCL in the main process. This is typically around ~1GB.

The returned Work objects only synchronize on the cuda stream and not on the CPU side. This works by passing CUDA Events between the processes. To do a CPU synchronize, call torch.cuda.synchronize() after wait().

WARNING: If the child process is killed while an operation is running, CUDA tensors may leak in the current PyTorch implementation. TODO fix

WARNING: As this uses a separate CUDA context for the subprocess, performance may be slower than using NCCL directly. Separate CUDA contexts can not run at the same time so network and compute kernels will not overlap execution and instead do time sharing which may reduce GPU utilization.

getBackendName() str[source]
class torchft.process_group.ProcessGroupDummy(rank: int, world: int)[source]

Bases: ProcessGroup

This process group discards all data passed to it and returns success. This is intended for rare cases where we want to discard certain operations without modifying the underlying library.

This PG only supports world_size of 1.

allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: object) Work[source]

Gathers tensors from the whole group in a list.

See torch.distributed.all_gather for more details.

allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work[source]

Performs an allgather operation on coalesced tensors.

See torch.distributed.allgather_coalesced for more details.

allreduce(tensors: List[Tensor], opts: object) Work[source]

Reduces the tensor data across all machines in such a way that all get the final result.

See torch.distributed.all_reduce for more details.

allreduce_coalesced(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work[source]

Performs an all_reduce operation in a coalesced manner.

See torch.distributed.all_reduce_coalesced for more details.

alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work[source]

Performs an all_to_all operation.

See torch.distributed.all_to_all_single for more details.

barrier(opts: BarrierOptions) Work[source]

Synchronizes all processes.

See torch.distributed.barrier for more details.

broadcast(tensor_list: List[Tensor], opts: object) Work[source]

Broadcasts the tensor to the whole group.

See torch.distributed.broadcast for more details.

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

getBackendName() str[source]
recv(tensors: List[Tensor], src_rank: int, tag: int) Work[source]

Receives a list of tensors from the process with rank rank.

See torch.distributed.recv for more details.

reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: object) Work[source]

Reduces, then scatters a list of tensors to all processes in a group.

See torch.distributed.reduce_scatter for more details.

reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work[source]

Performs a reduce-scatter operation on coalesced tensors.

See torch.distributed.reduce_scatter_tensor for more details.

send(tensors: List[Tensor], dst_rank: int, tag: int) Work[source]

Sends a list of tensors to the process with rank dst_rank.

See torch.distributed.send for more details.

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]

Get the size of this process group.

class torchft.process_group.ProcessGroupGloo(timeout: timedelta = datetime.timedelta(seconds=60), pg: Optional[ProcessGroup] = None)[source]

Bases: ProcessGroupWrapper

This is a reconfigurable version of ProcessGroupGloo.

getBackendName() str[source]
reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: ReduceScatterOptions) None[source]

This function is a placeholder for the reduce_scatter operation in the ProcessGroupGloo class. However, this operation is not supported by the Gloo backend, and thus, calling this function will raise a RuntimeError.

Raises:
  • RuntimeError – Always raised since reduce_scatter is not

  • supported by ProcessGroupGloo.

reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) None[source]

This function is a placeholder for the reduce_scatter_tensor_coalesced operation in the ProcessGroupGloo class. However, this operation is not supported by the Gloo backend, and thus, calling this function will raise a RuntimeError.

Raises:
  • RuntimeError – Always raised since reduce_scatter is not

  • supported by ProcessGroupGloo.

class torchft.process_group.ProcessGroupNCCL(timeout: timedelta = datetime.timedelta(seconds=60))[source]

Bases: ProcessGroupWrapper

This is a reconfigurable version of ProcessGroupNCCL.

If you are using a supported version of NCCL (NCCL >= 2.26, torch >= 2.7) this will attempt to use ncclCommAbort to recover from any timeouts.

This uses a Python user space event loop to asynchronously wait for the NCCL operations to complete. This should not be used with very long timeouts as the timeout entries are not cleaned up until the elapsed duration completes which may result in slowness or excess memory usage.

WARNING: this may result in deadlocks due to NCCL error handling and on old versions of torch/NCCL will result in deadlocks.

Parameters:

timeout – the timeout to use for NCCL operations.

abort() None[source]

Aborts the process group.

errored() Optional[Exception][source]

Whether an async error occured that requires reconfiguration.

getBackendName() str[source]
class torchft.process_group.ProcessGroupWrapper(timeout: timedelta = datetime.timedelta(seconds=60), pg: Optional[ProcessGroup] = None)[source]

Bases: ProcessGroup

This is a wrapper around any ProcessGroup with a reconfiguration method.

Parameters:
  • timeout – timeout for reconfiguration for TCPStore

  • pg – optional ProcessGroup to use, if None a new one will be created

abort() None[source]

Aborts the process group.

allgather(output_tensors: List[List[Tensor]], input_tensor: List[Tensor], opts: AllgatherOptions) Work[source]

Gathers tensors from the whole group in a list.

See torch.distributed.all_gather for more details.

allgather_into_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: AllgatherOptions) Work[source]

Performs an allgather operation on coalesced tensors.

See torch.distributed.allgather_coalesced for more details.

allreduce(tensors: List[Tensor], opts: object) Work[source]

Reduces the tensor data across all machines in such a way that all get the final result.

See torch.distributed.all_reduce for more details.

allreduce_coalesced(tensors: List[Tensor], opts: Union[AllreduceOptions, ReduceOp]) Work[source]

Performs an all_reduce operation in a coalesced manner.

See torch.distributed.all_reduce_coalesced for more details.

alltoall_base(output_buffer: Tensor, input_buffer: Tensor, output_split_sizes: List[int], input_split_sizes: List[int], opts: AllToAllOptions) Work[source]

Performs an all_to_all operation.

See torch.distributed.all_to_all_single for more details.

barrier(opts: BarrierOptions) Work[source]

Synchronizes all processes.

See torch.distributed.barrier for more details.

broadcast(tensor_list: List[Tensor], opts: object) Work[source]

Broadcasts the tensor to the whole group.

See torch.distributed.broadcast for more details.

configure(store_addr: str, rank: int, world_size: int) None[source]

This reconfigures the ProcessGroup to use a new store, rank and world size.

Every time this is called it must be provided with a unique prefixed store address. I.e. localhost:1234/my/prefix/1

This function will block until the underlying ProcessGroup is created. If an error occurs this will throw.

Parameters:
  • store_addr – address of the store to use

  • rank – rank of this process

  • world_size – world size of this process group

property parent: ProcessGroup
recv(tensors: List[Tensor], src_rank: int, tag: int) Work[source]

Receives a list of tensors from the process with rank rank.

See torch.distributed.recv for more details.

reduce_scatter(output_tensors: List[Tensor], input_tensors: List[List[Tensor]], opts: object) Work[source]

Reduces, then scatters a list of tensors to all processes in a group.

See torch.distributed.reduce_scatter for more details.

reduce_scatter_tensor_coalesced(output_tensors: List[Tensor], input_tensors: List[Tensor], opts: ReduceScatterOptions) Work[source]

Performs a reduce-scatter operation on coalesced tensors.

See torch.distributed.reduce_scatter_tensor for more details.

send(tensors: List[Tensor], dst_rank: int, tag: int) Work[source]

Sends a list of tensors to the process with rank dst_rank.

See torch.distributed.send for more details.

shutdown() None[source]

Shuts down the process group.

size(self: torch._C._distributed_c10d.ProcessGroup) int[source]

Get the size of this process group.

torchft.process_group.create_store_client(store_addr: str, timeout: timedelta) Store[source]

Creates a PrefixStore(TCPStore(…)) client from an address in the format:

host:port/prefix

Ex: localhost:1234/my/prefix

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources