Source code for torch.distributed.elastic.rendezvous.etcd_rendezvous_backend
# mypy: allow-untyped-defs# Copyright (c) Facebook, Inc. and its affiliates.# All rights reserved.## This source code is licensed under the BSD-style license found in the# LICENSE file in the root directory of this source tree.importbinasciifrombase64importb64decode,b64encodefromtypingimportcast,Optional,Tupleimporturllib3.exceptions# type: ignore[import]frometcdimport(# type: ignore[import]ClientasEtcdClient,EtcdAlreadyExist,EtcdCompareFailed,EtcdException,EtcdKeyNotFound,EtcdResult,)fromtorch.distributedimportStorefrom.apiimportRendezvousConnectionError,RendezvousParameters,RendezvousStateErrorfrom.dynamic_rendezvousimportRendezvousBackend,Tokenfrom.etcd_storeimportEtcdStorefrom.utilsimportparse_rendezvous_endpoint
[docs]classEtcdRendezvousBackend(RendezvousBackend):"""Represents an etcd-based rendezvous backend. Args: client: The ``etcd.Client`` instance to use to communicate with etcd. run_id: The run id of the rendezvous. key_prefix: The path under which to store the rendezvous state in etcd. ttl: The TTL of the rendezvous state. If not specified, defaults to two hours. """_DEFAULT_TTL=7200# 2 hours_client:EtcdClient_key:str_ttl:intdef__init__(self,client:EtcdClient,run_id:str,key_prefix:Optional[str]=None,ttl:Optional[int]=None,)->None:ifnotrun_id:raiseValueError("The run id must be a non-empty string.")self._client=clientifkey_prefix:self._key=key_prefix+"/"+run_idelse:self._key=run_idifttlandttl>0:self._ttl=ttlelse:self._ttl=self._DEFAULT_TTL@propertydefname(self)->str:"""See base class."""return"etcd-v2"
[docs]defget_state(self)->Optional[Tuple[bytes,Token]]:"""See base class."""try:result=self._client.read(self._key)exceptEtcdKeyNotFound:returnNoneexcept(EtcdException,urllib3.exceptions.TimeoutError)asexc:raiseRendezvousConnectionError("The connection to etcd has failed. See inner exception for details.")fromexcreturnself._decode_state(result)
[docs]defset_state(self,state:bytes,token:Optional[Token]=None)->Optional[Tuple[bytes,Token,bool]]:"""See base class."""base64_state=b64encode(state).decode()kwargs={}defget_state():result=self.get_state()ifresultisnotNone:tmp=*result,False# Python 3.6 does not support tuple unpacking in return# statements.returntmpreturnNoneiftoken:try:token=int(token)exceptValueError:returnget_state()iftoken:kwargs["prevIndex"]=tokenelse:kwargs["prevExist"]=Falsetry:result=self._client.write(self._key,base64_state,self._ttl,**kwargs)except(EtcdAlreadyExist,EtcdCompareFailed):result=Noneexcept(EtcdException,urllib3.exceptions.TimeoutError)asexc:raiseRendezvousConnectionError("The connection to etcd has failed. See inner exception for details.")fromexcifresultisNone:returnget_state()tmp=*self._decode_state(result),Truereturntmp
def_decode_state(self,result:EtcdResult)->Tuple[bytes,Token]:base64_state=result.value.encode()try:state=b64decode(base64_state)exceptbinascii.Errorasexc:raiseRendezvousStateError("The state object is corrupt. See inner exception for details.")fromexcreturnstate,result.modifiedIndex
def_create_etcd_client(params:RendezvousParameters)->EtcdClient:host,port=parse_rendezvous_endpoint(params.endpoint,default_port=2379)# The timeoutread_timeout=cast(int,params.get_as_int("read_timeout",60))ifread_timeout<=0:raiseValueError("The read timeout must be a positive integer.")# The communication protocolprotocol=params.get("protocol","http").strip().lower()ifprotocol!="http"andprotocol!="https":raiseValueError("The protocol must be HTTP or HTTPS.")# The SSL client certificatessl_cert=params.get("ssl_cert")ifssl_cert:ssl_cert_key=params.get("ssl_cert_key")ifssl_cert_key:# The etcd client expects the certificate key as the second element# of the `cert` tuple.ssl_cert=(ssl_cert,ssl_cert_key)# The root certificateca_cert=params.get("ca_cert")try:returnEtcdClient(host,port,read_timeout=read_timeout,protocol=protocol,cert=ssl_cert,ca_cert=ca_cert,allow_reconnect=True,)except(EtcdException,urllib3.exceptions.TimeoutError)asexc:raiseRendezvousConnectionError("The connection to etcd has failed. See inner exception for details.")fromexc
[docs]defcreate_backend(params:RendezvousParameters)->Tuple[EtcdRendezvousBackend,Store]:"""Create a new :py:class:`EtcdRendezvousBackend` from the specified parameters. +--------------+-----------------------------------------------------------+ | Parameter | Description | +==============+===========================================================+ | read_timeout | The read timeout, in seconds, for etcd operations. | | | Defaults to 60 seconds. | +--------------+-----------------------------------------------------------+ | protocol | The protocol to use to communicate with etcd. Valid | | | values are "http" and "https". Defaults to "http". | +--------------+-----------------------------------------------------------+ | ssl_cert | The path to the SSL client certificate to use along with | | | HTTPS. Defaults to ``None``. | +--------------+-----------------------------------------------------------+ | ssl_cert_key | The path to the private key of the SSL client certificate | | | to use along with HTTPS. Defaults to ``None``. | +--------------+-----------------------------------------------------------+ | ca_cert | The path to the rool SSL authority certificate. Defaults | | | to ``None``. | +--------------+-----------------------------------------------------------+ """client=_create_etcd_client(params)backend=EtcdRendezvousBackend(client,params.run_id,key_prefix="/torch/elastic/rendezvous")store=EtcdStore(client,"/torch/elastic/store")returnbackend,store
Docs
Access comprehensive developer documentation for PyTorch
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. As the current maintainers of this site, Facebook’s Cookies Policy applies. Learn more, including about available controls: Cookies Policy.