In the context of torchelastic we use the term
rendezvous to refer to
a particular functionality that combines a distributed
synchronization primitive with peer discovery.
It is used by torchelastic to gather participants of a training job (i.e. workers) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume.
Torchelastic Rendezvous provides the following critical functionalities:
Workers performing rendezvous will all block until the rendezvous is
considered complete - this happens when at least
min total number of
workers have joined the rendezvous barrier (for the same job). This also
implies the barrier is not necessarily of fixed size.
There’s an additional small waiting time after reaching
of workers - this is used to ensure the rendezvous is not completed “too
quickly” (which could potentially exclude additional workers attempting
to join at approximately the same time).
max number of workers is gathered at the barrier, the rendezvous
is completed immediately.
There’s also an overall timeout which causes the rendezvous to fail if
min number of workers is never reached – this is meant to be a
simple fail-safe to help release partially allocated job resources, in
case there’s a problem with the resource manger, and is meant to be
interpreted as non-retryable.
A simple distributed barrier would not be sufficient, as we also need to ensure that only one group of workers exists at any given time (for a given job). In other words, new workers (i.e. joining late) should not be able to form a parallel independent group of workers for the same job.
Torchelastic rendezvous ensures that if a group of workers has already completed a rendezvous (and hence might already be training), then additional “late” workers attempting to rendezvous will only announce themselves as waiting, and will have to wait until the (previously completed) existing rendezvous is destroyed first.
When a rendezvous is completed, all its members will agree on the job membership and everyone’s role in it. This role is represented using an integer, called rank, that is between between 0 and world size.
Note that ranks are not stable, in the sense that the same worker process can be assigned a different rank in the next (re-)rendezvous.
Torchelastic rendezvous is designed to tolerate worker failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy workers will happen automatically.
A worker can also fail after it has completed (or has been
observered by other workers to have completed) the rendezvous - this
scenario will be handled by the torchelastic
(where it will also trigger a re-rendezvous).
Shared key-value store:
When the rendezvous is completed, a shared key-value store is created
and returned. This store implements a
(see distributed communication
This store is only shared by the members of the completed rendezvous. It is intended to be used by torchelastic to exchange information necessary to initialize job control and data-planes.
Waiting workers and rendezvous closing:
Torchelastic rendezvous handler object provides additional functionalities, which are technically not part of the rendezvous process:
Querying how many workers arrived late at the barrier, who can participate in next rendezvous.
Setting the rendezvous closed to signal all workers not to participate in next rendezvous.
Below is a state diagram describing how rendezvous works.
Main rendezvous interface.
torchelastic users normally do not need to implement their own
RendezvousHandler. An implementation based on etcd is already provided, and is recommended for most users, provided they can deploy it in their environment.
torchelastic is currently considered experimental, so the APIs may change!
is_closed() → bool¶
Checks whether rendezvous for current job has been closed, which means all future attempts to re-rendezvous (within same job) will fail.
set_closedhave semantics of eventual propagation, and should not be used for synchronization. The intention here is that if at least one worker decides the job is finished, it will close the rendezvous, and other workers will soon observe this and stop training/rendezvous-ing as well.
next_rendezvous() → Tuple[torch.distributed.Store, int, int]¶
Main entry-point into the rendezvous barrier. Blocks until the rendezvous is complete (and the current process is included in the formed worker group), or a timeout occurs, or rendezvous was marked closed.
Returns: a tuple of (
RendezvousClosedException - if rendezvous for the current – job is closed.
RendezvousTimeoutException - on timeout –
num_nodes_waiting() → int¶
Returns number of workers who arrived late at the rendezvous barrier, hence weren’t included in the current worker group.
Callers should periodically call this method to check whether new members are waiting to join the job and if so admit them by calling
Used to mark the rendezvous (for current job) as closed.
Raised when a rendezvous for the specified run_id is closed. This is used to signal completion to nodes that arrive late.
RendezvousHandler.next_rendezvous()to signal that the rendezvous did not succeed within the allocated time. This is meant to be interpreted as a non-retryable type of failure.
Raised from any of the
RendezvousHandlermethods when a failure occured that should not be retried with the same worker process.
torchelastic.rendezvous.RendezvousHandlerinterface backed by
Torchelastic uses a URL to configure the type of rendezvous to use and to pass implementation specific configurations to the rendezvous module. The basic etcd rendezvous configuration URL looks like the following
etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa W605 -- example -- etcd://localhost:2379/1234?min_workers=1&max_workers=3
The URL above is interpreted as follows:
Use the rendezvous handler that is registered with the
etcdendpoint to use is
job_id == 1234is used as the prefix in etcd (this allows one to share a common etcd server for multiple jobs so long as the
job_idsare guaranteed to be unique). Note that the job id can be any string (e.g. does not need to be a number) as long as it is unique.
max_workers=3specifies a range for membership size - torchelastic starts running the job as long as the cluster size is greater than or equal to
min_workersand admits up to
max_workersinto the cluster.
Below are a full list of the parameters that can be passed to etcd rendezvous:
minimum number of workers for the rendezvous to be valid
maximum number of workers to admit
total timeout within which next_rendezvous is expected to succeed (default 600s)
additional wait amount (“last call”) after min number of workers has been reached (defaults to 30s)
path prefix (from etcd root), inside which all etcd nodes will be created (defaults to
EtcdRendezvous(endpoints, prefix, run_id, num_min_workers, num_max_workers, timeout, last_call_timeout, **kwargs)¶
A rendezvous implementation that uses etcd as the backend store.
Announce this worker is waiting (via num_workers_waiting counter) to join next rendezvous, but only if state and version match.
Helper method for the confirm phase
Once the rendezvous state trainsitions from ‘joinable’ to ‘frozen’, we have every participant confirm their membership and setup per-member keep-alive TTL keys, and then wait for all other participants to confirm, which would then successfully conclude this rendezvous.
Handle the case when there’s an existing (state ‘final) rendezvous already in place, and we have to announce ourselves waiting, and wait until the next rendezvous opportunity.
After we reach min number of workers, one particular worker takes on the responsibility of waiting an additional timeout before closing the join window. If the worker responsible for this fails, the rendezvous will be destroyed due to expiring TTL, and the other participants will re-rendezvous.
Here we expect to see state <joinable, expected_version> Exit gracefully if either:
state becomes <frozen, expected_version>
timeout happens (reaching deadline), in which case we try the tranisiton to <frozen, expected_version>
Exit with exception otherwise.
Initially, the rendezvous state is expected to be one of:
empty (non-existent) - in this case we try to create a new one.
joinable - we try to join it.
final - we announce ourselves as waiting, and go into monitoring mode
Any other state is considered transitional, and will be retried after a short delay.
(rdzv_version, rank, world_size)
RendezvousClosedException - current rendezvous was/is closed –
EtcdRendezvousRetryableFailure - observed some intermediate – state, which is best handled by retrying later
We observed a rendezvous state in ‘joinable’ state, and attempt to join this particular version, and then wait for all other peers to join.
Helper method for the join phase.
Main entry point for next rendezvous. This method is blocking until rendezvous succeeds or a timeout occurs.
(rdzv_version, rank, world_size)
RendezvousTimeoutException - timeout waiting for rendezvous –
RendezvousNonRetryableError - other persistent errors that – render the rendezvous non-retryable
RendezvousClosedException - rendezvous is or was closed while – waiting
Mark rendezvous ‘closed’ for current run_id, which is used to signal other participants to not attempt to perform (re-)rendezvous. This is useful when one of the workers decides the job is complete.
Create new rendezvous state or raise an exception that indicates an unexpected state (e.g. already exists)
RendezvousNonRetryableError - on unexpected state –
Helper method for the confirm phase
Helper method for the join phase.
When there’s an existing valid rendezvous in state ‘final’, we have to wait until the next opportunity to join.
Such opportunity may come from:
rendezvous state changed by someone else, in which case we unblock and retry.
rendezvous becomes invalid because at least one member failed to renew their leased keep_alive node. We detect this, and destroy the rendezvous.
EtcdStore(etcd_client, etcd_store_prefix, timeout: Optional[datetime.timedelta] = None)¶
Implements a c10 Store interface by piggybacking on the rendezvous etcd instance. This is the store object returned by
add(key, num: int) → int¶
Atomically increment a value by an integer amount. The integer is represented as a string using base 10. If key is not present, a default value of
0will be assumed.
the new (incremented) value
check(keys) → bool¶
Check if all of the keys are immediately present (without waiting).
get(key) → bytes¶
Get a value by key, possibly doing a blocking wait.
If key is not immediately present, will do a blocking wait for at most
timeoutduration or until the key is published.
LookupError - If key still not published after timeout –
Write a key/value pair into
EtcdStore. Both key and value may be either Python
Change the timeout used for all future operations.
wait(keys, override_timeout: Optional[datetime.timedelta] = None)¶
Waits until all of the keys are published, or until timeout.
LookupError - if timeout occurs –
EtcdServer is a convenience class that makes it easy for you to
start and stop an etcd server on a subprocess. This is useful for testing
or single-node (multi-worker) deployments where manually setting up an
etcd server on the side is cumbersome.
For production and multi-node deployments please consider properly deploying a highly available etcd server as this is the single point of failure for your distributed jobs.
tested on etcd server v3.4.3
Starts and stops a local standalone etcd server on a random free port. Useful for single node, multi-worker launches or testing, where a sidecar etcd server is more convenient than having to separately setup an etcd server.
This class registers a termination handler to shutdown the etcd subprocess on exit. This termination handler is NOT a substitute for calling the
The following fallback mechanism is used to find the etcd binary:
Uses env var TORCHELASTIC_ETCD_BINARY_PATH
<this file root>/bin/etcdif one exists
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
etcd_binary_path – path of etcd server binary (see above for fallback path)