Source code for torch.distributed.elastic.rendezvous.c10d_rendezvous_backend
# Copyright (c) Facebook, Inc. and its affiliates.# All rights reserved.## This source code is licensed under the BSD-style license found in the# LICENSE file in the root directory of this source tree.importbinasciiimportloggingimportosimporttempfilefrombase64importb64decode,b64encodefromdatetimeimporttimedeltafromtypingimportAny,Optional,Tuple,castfromtorch.distributedimportFileStore,Store,TCPStorefromtorch.distributed.elastic.eventsimport(NodeState,construct_and_record_rdzv_event,)from.apiimport(RendezvousConnectionError,RendezvousError,RendezvousParameters,RendezvousStateError,)from.dynamic_rendezvousimportRendezvousBackend,Tokenfrom.utilsimport_matches_machine_hostname,parse_rendezvous_endpointlog=logging.getLogger(__name__)
[docs]classC10dRendezvousBackend(RendezvousBackend):"""Represents a C10d-backed rendezvous backend. Args: store: The :py:class:`torch.distributed.Store` instance to use to communicate with the C10d store. run_id: The run id of the rendezvous. """# See the explanation in the __init__ method._NULL_SENTINEL="Y2FuaW1hZGFt"_store:Store_key:strdef__init__(self,store:Store,run_id:str)->None:ifnotrun_id:raiseValueError("The run id must be a non-empty string.")self._store=storeself._key="torch.rendezvous."+run_id# The read operation of a store blocks the caller until the specified# key becomes available. This behavior makes it tricky to use a store# as a regular key-value dictionary.## As a workaround we initially set a sentinel value as the rendezvous# state. Whenever this value gets returned we treat it as a None.self._call_store("compare_set",self._key,"",self._NULL_SENTINEL)@propertydefname(self)->str:"""See base class."""return"c10d"
[docs]defget_state(self)->Optional[Tuple[bytes,Token]]:"""See base class."""base64_state:bytes=self._call_store("get",self._key)returnself._decode_state(base64_state)
[docs]defset_state(self,state:bytes,token:Optional[Token]=None)->Optional[Tuple[bytes,Token,bool]]:"""See base class."""base64_state_str:str=b64encode(state).decode()iftoken:# Shortcut if we know for sure that the token is not valid.ifnotisinstance(token,bytes):result=self.get_state()ifresultisnotNone:tmp=*result,False# Python 3.6 does not support tuple unpacking in return# statements.returntmpreturnNonetoken=token.decode()else:token=self._NULL_SENTINELbase64_state:bytes=self._call_store("compare_set",self._key,token,base64_state_str)state_token_pair=self._decode_state(base64_state)ifstate_token_pairisNone:returnNonenew_state,new_token=state_token_pair# C10d Store's compare_set method does not offer an easy way to find out# whether our write attempt was successful. As a brute-force solution we# perform a bitwise comparison of our local state and the remote state.returnnew_state,new_token,new_state==state
def_call_store(self,store_op:str,*args,**kwargs)->Any:try:returngetattr(self._store,store_op)(*args,**kwargs)except(ValueError,RuntimeError)asexc:raiseRendezvousConnectionError("The connection to the C10d store has failed. See inner exception for details.")fromexcdef_decode_state(self,base64_state:bytes)->Optional[Tuple[bytes,Token]]:ifbase64_state==self._NULL_SENTINEL.encode():returnNonetry:state=b64decode(base64_state)exceptbinascii.Errorasexc:raiseRendezvousStateError("The state object is corrupt. See inner exception for details.")fromexcreturnstate,base64_state
def_create_tcp_store(params:RendezvousParameters)->TCPStore:host,port=parse_rendezvous_endpoint(params.endpoint,default_port=29400)cfg_is_host=params.get_as_bool("is_host")# If the user has explicitly specified whether our process should host the# the store, respect it.ifcfg_is_hostisnotNone:is_host=cfg_is_host# Otherwise try to determine whether we are the host based on our hostname# and IP address.else:is_host=_matches_machine_hostname(host)# The timeoutread_timeout=cast(int,params.get_as_int("read_timeout",60))ifread_timeout<=0:raiseValueError("The read timeout must be a positive integer.")# In specific cases we attempt to instantiate the store twice. For details# see the explanation in the except clause below.foris_serverin[is_host,False]:try:store=TCPStore(host,port,is_master=is_server,timeout=timedelta(seconds=read_timeout))ifis_server:msg=f"Process {os.getpid()} hosts the TCP store for the C10d rendezvous backend."construct_and_record_rdzv_event(run_id=params.run_id,message=msg,node_state=NodeState.INIT)log.info(msg)breakexcept(ValueError,RuntimeError)asexc:# If we heuristically inferred the value of is_host as True and our# first attempt to instantiate the TCP store has failed, try it one# more time with is_host set to False. As an edge case there can be# more than one process that is part of the same rendezvous on this# machine and only one of them will eventually host the store.ifnotis_serverorcfg_is_hostisnotNone:raiseRendezvousConnectionError("The connection to the C10d store has failed. See inner exception for details.")fromexcreturnstoredef_create_file_store(params:RendezvousParameters)->FileStore:# If a user specifies an endpoint, we treat it as a path to a file.ifparams.endpoint:path=params.endpointelse:try:# The temporary file is readable and writable only by the user of# this process._,path=tempfile.mkstemp()exceptOSErrorasexc:raiseRendezvousError("The file creation for C10d store has failed. See inner exception for details.")fromexctry:store=FileStore(path)except(ValueError,RuntimeError)asexc:raiseRendezvousConnectionError("The connection to the C10d store has failed. See inner exception for details.")fromexcreturnstore
[docs]defcreate_backend(params:RendezvousParameters)->Tuple[C10dRendezvousBackend,Store]:"""Creates a new :py:class:`C10dRendezvousBackend` from the specified parameters. +--------------+-----------------------------------------------------------+ | Parameter | Description | +==============+===========================================================+ | store_type | The type of the C10d store. The currently supported types | | | are "tcp" and "file" which correspond to | | | :py:class:`torch.distributed.TCPStore` and | | | :py:class:`torch.distributed.FileStore`, respectively. | | | Defaults to "tcp". | +--------------+-----------------------------------------------------------+ | read_timeout | The read timeout, in seconds, for store operations. | | | Defaults to 60 seconds. | | | | | | Note this only applies to | | | :py:class:`torch.distributed.TCPStore`. It is not relevant| | | to :py:class:`torch.distributed.FileStore` which does not | | | take in timeout as a parameter. | +--------------+-----------------------------------------------------------+ | is_host | A boolean value indicating whether this backend instance | | | will host the C10d store. If not specified it will be | | | inferred heuristically by matching the hostname or the IP | | | address of this machine against the specified rendezvous | | | endpoint. Defaults to ``None``. | | | | | | Note that this configuration option only applies to | | | :py:class:`torch.distributed.TCPStore`. In normal | | | circumstances you can safely skip it; the only time when | | | it is needed is if its value cannot be correctly | | | determined (e.g. the rendezvous endpoint has a CNAME as | | | the hostname or does not match the FQDN of the machine). | +--------------+-----------------------------------------------------------+ """# As of today we only support TCPStore and FileStore. Other store types do# not have the required functionality (e.g. compare_set) yet.store_type=params.get("store_type","tcp").strip().lower()store:Storetry:ifstore_type=="file":store=_create_file_store(params)elifstore_type=="tcp":store=_create_tcp_store(params)else:raiseValueError("Invalid store type given. Currently only supports file and tcp.")backend=C10dRendezvousBackend(store,params.run_id)exceptExceptionase:construct_and_record_rdzv_event(message=f"{type(e).__name__}: {str(e)}",run_id=params.run_id,node_state=NodeState.FAILED,)raisereturnbackend,store
Docs
Access comprehensive developer documentation for PyTorch
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. As the current maintainers of this site, Facebook’s Cookies Policy applies. Learn more, including about available controls: Cookies Policy.