Rendezvous
Rendezvous
In the context of torchelastic we use the term rendezvous
to refer to
a particular functionality that combines a distributed
synchronization primitive with peer discovery.
It is used by torchelastic to gather participants of a training job (i.e. workers) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume.
Torchelastic Rendezvous provides the following critical functionalities:
Barrier:
Workers performing rendezvous will all block until the rendezvous is
considered complete - this happens when at least min
total number of
workers have joined the rendezvous barrier (for the same job). This also
implies the barrier is not necessarily of fixed size.
There’s an additional small waiting time after reaching min
number
of workers - this is used to ensure the rendezvous is not completed “too
quickly” (which could potentially exclude additional workers attempting
to join at approximately the same time).
If max
number of workers is gathered at the barrier, the rendezvous
is completed immediately.
There’s also an overall timeout which causes the rendezvous to fail if
min
number of workers is never reached – this is meant to be a
simple fail-safe to help release partially allocated job resources, in
case there’s a problem with the resource manger, and is meant to be
interpreted as non-retryable.
Exclusivity:
A simple distributed barrier would not be sufficient, as we also need to ensure that only one group of workers exists at any given time (for a given job). In other words, new workers (i.e. joining late) should not be able to form a parallel independent group of workers for the same job.
Torchelastic rendezvous ensures that if a group of workers has already completed a rendezvous (and hence might already be training), then additional “late” workers attempting to rendezvous will only announce themselves as waiting, and will have to wait until the (previously completed) existing rendezvous is destroyed first.
Consistency:
When a rendezvous is completed, all its members will agree on the job membership and everyone’s role in it. This role is represented using an integer, called rank, that is between between 0 and world size.
Note that ranks are not stable, in the sense that the same worker process can be assigned a different rank in the next (re-)rendezvous.
Fault-tolerance:
Torchelastic rendezvous is designed to tolerate worker failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy workers will happen automatically.
A worker can also fail after it has completed (or has been
observered by other workers to have completed) the rendezvous - this
scenario will be handled by the torchelastic train_loop
instead
(where it will also trigger a re-rendezvous).
Shared key-value store:
When the rendezvous is completed, a shared key-value store is created
and returned. This store implements a torch.distributed.Store
API
(see distributed communication
docs).
This store is only shared by the members of the completed rendezvous. It is intended to be used by torchelastic to exchange information necessary to initialize job control and data-planes.
Waiting workers and rendezvous closing:
Torchelastic rendezvous handler object provides additional functionalities, which are technically not part of the rendezvous process:
Querying how many workers arrived late at the barrier, who can participate in next rendezvous.
Setting the rendezvous closed to signal all workers not to participate in next rendezvous.
Below is a state diagram describing how rendezvous works.

Handler
-
class
torchelastic.rendezvous.
RendezvousHandler
[source] Main rendezvous interface.
Note
torchelastic users normally do not need to implement their own
RendezvousHandler
. An implementation based on etcd is already provided, and is recommended for most users, provided they can deploy it in their environment.Warning
torchelastic is currently considered experimental, so the APIs may change!
-
abstract
get_run_id
() → str[source] Returns the run_id of this rendezvous handler. The run_id is a user-defined id that uniquely identifies an instance of a distributed application. It typically maps to a job id and is used to allow workers to join the correct distributed application.
-
abstract
is_closed
() → bool[source] Checks whether rendezvous for current job has been closed, which means all future attempts to re-rendezvous (within same job) will fail.
Note
is_closed
andset_closed
have semantics of eventual propagation, and should not be used for synchronization. The intention here is that if at least one worker decides the job is finished, it will close the rendezvous, and other workers will soon observe this and stop training/rendezvous-ing as well.
-
abstract
next_rendezvous
() → Tuple[torch.distributed.Store, int, int][source] Main entry-point into the rendezvous barrier. Blocks until the rendezvous is complete (and the current process is included in the formed worker group), or a timeout occurs, or rendezvous was marked closed.
Returns: a tuple of (
c10d Store
,rank
,world size
)- Raises
RendezvousClosedException - if rendezvous for the current – job is closed.
RendezvousTimeoutException - on timeout –
-
abstract
num_nodes_waiting
() → int[source] Returns number of workers who arrived late at the rendezvous barrier, hence weren’t included in the current worker group.
Callers should periodically call this method to check whether new members are waiting to join the job and if so admit them by calling
next_rendezvous()
(re-rendezvous).
-
abstract
set_closed
()[source] Used to mark the rendezvous (for current job) as closed.
-
abstract
Exceptions
-
class
torchelastic.rendezvous.
RendezvousClosedException
[source] Raised when a rendezvous is closed.
This is used to signal completion to nodes that arrive late.
-
class
torchelastic.rendezvous.
RendezvousTimeoutException
[source] Raised to signal that a rendezvous did not succeed within the allocated time.
This is a non-retryable type of failure.
-
class
torchelastic.rendezvous.
RendezvousNonRetryableError
[source] Raised when a failure occured that should not be retried within the same worker process.
Implmentations
Etcd Rendezvous
-
class
torchelastic.rendezvous.etcd_rendezvous.
EtcdRendezvousHandler
(rdzv_impl)[source] Implements a
torchelastic.rendezvous.RendezvousHandler
interface backed bytorchelastic.rendezvous.etcd_rendezvous.EtcdRendezvous
.Torchelastic uses a URL to configure the type of rendezvous to use and to pass implementation specific configurations to the rendezvous module. The basic etcd rendezvous configuration URL looks like the following
etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa W605 -- example -- etcd://localhost:2379/1234?min_workers=1&max_workers=3
The URL above is interpreted as follows:
Use the rendezvous handler that is registered with the
etcd
schemeThe
etcd
endpoint to use islocalhost:2379
job_id == 1234
is used as the prefix in etcd (this allows one to share a common etcd server for multiple jobs so long as thejob_ids
are guaranteed to be unique). Note that the job id can be any string (e.g. does not need to be a number) as long as it is unique.min_workers=1
andmax_workers=3
specifies a range for membership size - torchelastic starts running the job as long as the cluster size is greater than or equal tomin_workers
and admits up tomax_workers
into the cluster.
Below are a full list of the parameters that can be passed to etcd rendezvous:
Parameter
Description
min_workers
minimum number of workers for the rendezvous to be valid
max_workers
maximum number of workers to admit
timeout
total timeout within which next_rendezvous is expected to succeed (default 600s)
last_call_timeout
additional wait amount (“last call”) after min number of workers has been reached (defaults to 30s)
etcd_prefix
path prefix (from etcd root), inside which all etcd nodes will be created (defaults to
/torchelastic/p2p
)
-
class
torchelastic.rendezvous.etcd_rendezvous.
EtcdRendezvous
(client, prefix, run_id, num_min_workers, num_max_workers, timeout, last_call_timeout)[source] A rendezvous implementation that uses etcd as the backend store.
-
announce_self_waiting
(expected_version)[source] Announce this worker is waiting (via num_workers_waiting counter) to join next rendezvous, but only if state and version match.
-
confirm_membership
(expected_version, this_rank)[source] Helper method for the confirm phase
-
confirm_phase
(expected_version, this_rank)[source] Once the rendezvous state trainsitions from ‘joinable’ to ‘frozen’, we have every participant confirm their membership and setup per-member keep-alive TTL keys, and then wait for all other participants to confirm, which would then successfully conclude this rendezvous.
-
handle_existing_rendezvous
(expected_version)[source] Handle the case when there’s an existing (state ‘final) rendezvous already in place, and we have to announce ourselves waiting, and wait until the next rendezvous opportunity.
-
handle_join_last_call
(expected_version, deadline)[source] After we reach min number of workers, one particular worker takes on the responsibility of waiting an additional timeout before closing the join window. If the worker responsible for this fails, the rendezvous will be destroyed due to expiring TTL, and the other participants will re-rendezvous.
Here we expect to see state <joinable, expected_version> Exit gracefully if either:
state becomes <frozen, expected_version>
timeout happens (reaching deadline), in which case we try the tranisiton to <frozen, expected_version>
Exit with exception otherwise.
-
init_phase
()[source] Initially, the rendezvous state is expected to be one of:
empty (non-existent) - in this case we try to create a new one.
joinable - we try to join it.
final - we announce ourselves as waiting, and go into monitoring mode
Any other state is considered transitional, and will be retried after a short delay.
- Returns
(rdzv_version, rank, world_size)
- Raises
RendezvousClosedException - current rendezvous was/is closed –
EtcdRendezvousRetryableFailure - observed some intermediate – state, which is best handled by retrying later
-
join_phase
(expected_version)[source] We observed a rendezvous state in ‘joinable’ state, and attempt to join this particular version, and then wait for all other peers to join.
-
join_rendezvous
(expected_version)[source] Helper method for the join phase.
-
rendezvous_barrier
()[source] Main entry point for next rendezvous. This method is blocking until rendezvous succeeds or a timeout occurs.
- Returns
(rdzv_version, rank, world_size)
- Raises
RendezvousTimeoutException - timeout waiting for rendezvous –
RendezvousNonRetryableError - other persistent errors that – render the rendezvous non-retryable
RendezvousClosedException - rendezvous is or was closed while – waiting
-
set_closed
()[source] Mark rendezvous ‘closed’ for current run_id, which is used to signal other participants to not attempt to perform (re-)rendezvous. This is useful when one of the workers decides the job is complete.
-
try_create_rendezvous
()[source] Create new rendezvous state or raise an exception that indicates an unexpected state (e.g. already exists)
- Raises
RendezvousNonRetryableError - on unexpected state –
-
wait_for_final
(expected_version)[source] Helper method for the confirm phase
-
wait_for_peers
(expected_version)[source] Helper method for the join phase.
-
wait_for_rendezvous_to_free
(expected_version)[source] When there’s an existing valid rendezvous in state ‘final’, we have to wait until the next opportunity to join.
Such opportunity may come from:
rendezvous state changed by someone else, in which case we unblock and retry.
rendezvous becomes invalid because at least one member failed to renew their leased keep_alive node. We detect this, and destroy the rendezvous.
-
-
class
torchelastic.rendezvous.etcd_rendezvous.
EtcdStore
(etcd_client, etcd_store_prefix, timeout: Optional[datetime.timedelta] = None)[source] Implements a c10 Store interface by piggybacking on the rendezvous etcd instance. This is the store object returned by
EtcdRendezvous
-
add
(key, num: int) → int[source] Atomically increment a value by an integer amount. The integer is represented as a string using base 10. If key is not present, a default value of
0
will be assumed.- Returns
the new (incremented) value
-
get
(key) → bytes[source] Get a value by key, possibly doing a blocking wait.
If key is not immediately present, will do a blocking wait for at most
timeout
duration or until the key is published.- Returns
value
(bytes)
- Raises
LookupError - If key still not published after timeout –
-
set
(key, value)[source] Write a key/value pair into
EtcdStore
. Both key and value may be either Pythonstr
orbytes
.
-
set_timeout
(timeout: datetime.timedelta)[source] Change the timeout used for all future operations.
-
wait
(keys, override_timeout: Optional[datetime.timedelta] = None)[source] Waits until all of the keys are published, or until timeout.
- Raises
LookupError - if timeout occurs –
-
Etcd Server
The EtcdServer
is a convenience class that makes it easy for you to
start and stop an etcd server on a subprocess. This is useful for testing
or single-node (multi-worker) deployments where manually setting up an
etcd server on the side is cumbersome.
Warning
For production and multi-node deployments please consider properly deploying a highly available etcd server as this is the single point of failure for your distributed jobs.
-
class
torchelastic.rendezvous.etcd_server.
EtcdServer
(data_dir: Optional[str] = None)[source] Note
tested on etcd server v3.4.3
Starts and stops a local standalone etcd server on a random free port. Useful for single node, multi-worker launches or testing, where a sidecar etcd server is more convenient than having to separately setup an etcd server.
This class registers a termination handler to shutdown the etcd subprocess on exit. This termination handler is NOT a substitute for calling the
stop()
method.The following fallback mechanism is used to find the etcd binary:
Uses env var TORCHELASTIC_ETCD_BINARY_PATH
Uses
<this file root>/bin/etcd
if one existsUses
etcd
fromPATH
Usage
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- Parameters
etcd_binary_path – path of etcd server binary (see above for fallback path)