# Rendezvous¶

Rendezvous

In the context of torchelastic we use the term rendezvous to refer to a particular functionality that combines a distributed synchronization primitive with peer discovery.

It is used by torchelastic to gather participants of a training job (i.e. workers) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume.

Torchelastic Rendezvous provides the following critical functionalities:

Barrier:

Workers performing rendezvous will all block until the rendezvous is considered complete - this happens when at least min total number of workers have joined the rendezvous barrier (for the same job). This also implies the barrier is not necessarily of fixed size.

There’s an additional small waiting time after reaching min number of workers - this is used to ensure the rendezvous is not completed “too quickly” (which could potentially exclude additional workers attempting to join at approximately the same time).

If max number of workers is gathered at the barrier, the rendezvous is completed immediately.

There’s also an overall timeout which causes the rendezvous to fail if min number of workers is never reached – this is meant to be a simple fail-safe to help release partially allocated job resources, in case there’s a problem with the resource manger, and is meant to be interpreted as non-retryable.

Exclusivity:

A simple distributed barrier would not be sufficient, as we also need to ensure that only one group of workers exists at any given time (for a given job). In other words, new workers (i.e. joining late) should not be able to form a parallel independent group of workers for the same job.

Torchelastic rendezvous ensures that if a group of workers has already completed a rendezvous (and hence might already be training), then additional “late” workers attempting to rendezvous will only announce themselves as waiting, and will have to wait until the (previously completed) existing rendezvous is destroyed first.

Consistency:

When a rendezvous is completed, all its members will agree on the job membership and everyone’s role in it. This role is represented using an integer, called rank, that is between between 0 and world size.

Note that ranks are not stable, in the sense that the same worker process can be assigned a different rank in the next (re-)rendezvous.

Fault-tolerance:

Torchelastic rendezvous is designed to tolerate worker failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy workers will happen automatically.

A worker can also fail after it has completed (or has been observered by other workers to have completed) the rendezvous - this scenario will be handled by the torchelastic train_loop instead (where it will also trigger a re-rendezvous).

Shared key-value store:

When the rendezvous is completed, a shared key-value store is created and returned. This store implements a torch.distributed.Store API (see distributed communication docs).

This store is only shared by the members of the completed rendezvous. It is intended to be used by torchelastic to exchange information necessary to initialize job control and data-planes.

Waiting workers and rendezvous closing:

Torchelastic rendezvous handler object provides additional functionalities, which are technically not part of the rendezvous process:

1. Querying how many workers arrived late at the barrier, who can participate in next rendezvous.

2. Setting the rendezvous closed to signal all workers not to participate in next rendezvous.

Below is a state diagram describing how rendezvous works.

## Handler¶

class torchelastic.rendezvous.RendezvousHandler[source]

Main rendezvous interface.

Note

torchelastic users normally do not need to implement their own RendezvousHandler. An implementation based on etcd is already provided, and is recommended for most users, provided they can deploy it in their environment.

Warning

torchelastic is currently considered experimental, so the APIs may change!

abstract is_closed() → bool[source]

Checks whether rendezvous for current job has been closed, which means all future attempts to re-rendezvous (within same job) will fail.

Note

is_closed and set_closed have semantics of eventual propagation, and should not be used for synchronization. The intention here is that if at least one worker decides the job is finished, it will close the rendezvous, and other workers will soon observe this and stop training/rendezvous-ing as well.

abstract next_rendezvous() → Tuple[torch.distributed.Store, int, int][source]

Main entry-point into the rendezvous barrier. Blocks until the rendezvous is complete (and the current process is included in the formed worker group), or a timeout occurs, or rendezvous was marked closed.

Returns: a tuple of (c10d Store, rank, world size)

Raises
• RendezvousClosedException - if rendezvous for the current – job is closed.

• RendezvousTimeoutException - on timeout

abstract num_nodes_waiting() → int[source]

Returns number of workers who arrived late at the rendezvous barrier, hence weren’t included in the current worker group.

Callers should periodically call this method to check whether new members are waiting to join the job and if so admit them by calling next_rendezvous() (re-rendezvous).

abstract set_closed()[source]

Used to mark the rendezvous (for current job) as closed.

## Exceptions¶

class torchelastic.rendezvous.RendezvousClosedException[source]

Raised when a rendezvous for the specified run_id is closed. This is used to signal completion to nodes that arrive late.

class torchelastic.rendezvous.RendezvousTimeoutException[source]

Raised from RendezvousHandler.next_rendezvous() to signal that the rendezvous did not succeed within the allocated time. This is meant to be interpreted as a non-retryable type of failure.

class torchelastic.rendezvous.RendezvousNonRetryableError[source]

Raised from any of the RendezvousHandler methods when a failure occured that should not be retried with the same worker process.

## Implmentations¶

### Etcd Rendezvous¶

class torchelastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl)[source]

Implements a torchelastic.rendezvous.RendezvousHandler interface backed by torchelastic.rendezvous.etcd_rendezvous.EtcdRendezvous.

Torchelastic uses a URL to configure the type of rendezvous to use and to pass implementation specific configurations to the rendezvous module. The basic etcd rendezvous configuration URL looks like the following

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa W605

-- example --

etcd://localhost:2379/1234?min_workers=1&max_workers=3


The URL above is interpreted as follows:

1. Use the rendezvous handler that is registered with the etcd scheme

2. The etcd endpoint to use is localhost:2379

3. job_id == 1234 is used as the prefix in etcd (this allows one to share a common etcd server for multiple jobs so long as the job_ids are guaranteed to be unique). Note that the job id can be any string (e.g. does not need to be a number) as long as it is unique.

4. min_workers=1 and max_workers=3 specifies a range for membership size - torchelastic starts running the job as long as the cluster size is greater than or equal to min_workers and admits up to max_workers into the cluster.

Below are a full list of the parameters that can be passed to etcd rendezvous:

Parameter

Description

min_workers

minimum number of workers for the rendezvous to be valid

max_workers

maximum number of workers to admit

timeout

total timeout within which next_rendezvous is expected to succeed (default 600s)

last_call_timeout

additional wait amount (“last call”) after min number of workers has been reached (defaults to 30s)

etcd_prefix

path prefix (from etcd root), inside which all etcd nodes will be created (defaults to /torchelastic/p2p)

class torchelastic.rendezvous.etcd_rendezvous.EtcdRendezvous(endpoints, prefix, run_id, num_min_workers, num_max_workers, timeout, last_call_timeout, **kwargs)[source]

A rendezvous implementation that uses etcd as the backend store.

announce_self_waiting(expected_version)[source]

Announce this worker is waiting (via num_workers_waiting counter) to join next rendezvous, but only if state and version match.

confirm_membership(expected_version, this_rank)[source]

Helper method for the confirm phase

confirm_phase(expected_version, this_rank)[source]

Once the rendezvous state trainsitions from ‘joinable’ to ‘frozen’, we have every participant confirm their membership and setup per-member keep-alive TTL keys, and then wait for all other participants to confirm, which would then successfully conclude this rendezvous.

handle_existing_rendezvous(expected_version)[source]

Handle the case when there’s an existing (state ‘final) rendezvous already in place, and we have to announce ourselves waiting, and wait until the next rendezvous opportunity.

handle_join_last_call(expected_version, deadline)[source]

After we reach min number of workers, one particular worker takes on the responsibility of waiting an additional timeout before closing the join window. If the worker responsible for this fails, the rendezvous will be destroyed due to expiring TTL, and the other participants will re-rendezvous.

Here we expect to see state <joinable, expected_version> Exit gracefully if either:

1. state becomes <frozen, expected_version>

2. timeout happens (reaching deadline), in which case we try the tranisiton to <frozen, expected_version>

Exit with exception otherwise.

init_phase()[source]

Initially, the rendezvous state is expected to be one of:

1. empty (non-existent) - in this case we try to create a new one.

2. joinable - we try to join it.

3. final - we announce ourselves as waiting, and go into monitoring mode

Any other state is considered transitional, and will be retried after a short delay.

Returns

(rdzv_version, rank, world_size)

Raises
• RendezvousClosedException - current rendezvous was/is closed

• EtcdRendezvousRetryableFailure - observed some intermediate – state, which is best handled by retrying later

join_phase(expected_version)[source]

We observed a rendezvous state in ‘joinable’ state, and attempt to join this particular version, and then wait for all other peers to join.

join_rendezvous(expected_version)[source]

Helper method for the join phase.

rendezvous_barrier()[source]

Main entry point for next rendezvous. This method is blocking until rendezvous succeeds or a timeout occurs.

Returns

(rdzv_version, rank, world_size)

Raises
• RendezvousTimeoutException - timeout waiting for rendezvous

• RendezvousNonRetryableError - other persistent errors that – render the rendezvous non-retryable

• RendezvousClosedException - rendezvous is or was closed while – waiting

set_closed()[source]

Mark rendezvous ‘closed’ for current run_id, which is used to signal other participants to not attempt to perform (re-)rendezvous. This is useful when one of the workers decides the job is complete.

try_create_rendezvous()[source]

Create new rendezvous state or raise an exception that indicates an unexpected state (e.g. already exists)

Raises

RendezvousNonRetryableError - on unexpected state

wait_for_final(expected_version)[source]

Helper method for the confirm phase

wait_for_peers(expected_version)[source]

Helper method for the join phase.

wait_for_rendezvous_to_free(expected_version)[source]

When there’s an existing valid rendezvous in state ‘final’, we have to wait until the next opportunity to join.

Such opportunity may come from:

1. rendezvous state changed by someone else, in which case we unblock and retry.

2. rendezvous becomes invalid because at least one member failed to renew their leased keep_alive node. We detect this, and destroy the rendezvous.

class torchelastic.rendezvous.etcd_rendezvous.EtcdStore(etcd_client, etcd_store_prefix, timeout: Optional[datetime.timedelta] = None)[source]

Implements a c10 Store interface by piggybacking on the rendezvous etcd instance. This is the store object returned by EtcdRendezvous

add(key, num: int) → int[source]

Atomically increment a value by an integer amount. The integer is represented as a string using base 10. If key is not present, a default value of 0 will be assumed.

Returns

the new (incremented) value

check(keys) → bool[source]

Check if all of the keys are immediately present (without waiting).

get(key) → bytes[source]

Get a value by key, possibly doing a blocking wait.

If key is not immediately present, will do a blocking wait for at most timeout duration or until the key is published.

Returns

value (bytes)

Raises

LookupError - If key still not published after timeout

set(key, value)[source]

Write a key/value pair into EtcdStore. Both key and value may be either Python str or bytes.

set_timeout(timeout: datetime.timedelta)[source]

Change the timeout used for all future operations.

wait(keys, override_timeout: Optional[datetime.timedelta] = None)[source]

Waits until all of the keys are published, or until timeout.

Raises

LookupError - if timeout occurs

### Etcd Server¶

The EtcdServer is a convenience class that makes it easy for you to start and stop an etcd server on a subprocess. This is useful for testing or single-node (multi-worker) deployments where manually setting up an etcd server on the side is cumbersome.

Warning

For production and multi-node deployments please consider properly deploying a highly available etcd server as this is the single point of failure for your distributed jobs.

class torchelastic.rendezvous.etcd_server.EtcdServer[source]

Note

tested on etcd server v3.4.3

Starts and stops a local standalone etcd server on a random free port. Useful for single node, multi-worker launches or testing, where a sidecar etcd server is more convenient than having to separately setup an etcd server.

This class registers a termination handler to shutdown the etcd subprocess on exit. This termination handler is NOT a substitute for calling the stop() method.

The following fallback mechanism is used to find the etcd binary:

1. Uses env var TORCHELASTIC_ETCD_BINARY_PATH

2. Uses <this file root>/bin/etcd if one exists

3. Uses etcd from PATH

Usage

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()

Parameters

etcd_binary_path – path of etcd server binary (see above for fallback path)