Source code for torch.distributed.checkpoint.fsspec
# Mypy will not try inferring the types of any 3rd party libraries installed.# mypy: ignore-errorsimportioimportosfromcontextlibimportcontextmanagerfrompathlibimportPathfromtypingimportGenerator,Optional,UnionimportfsspecfromfsspecimportAbstractFileSystemfromfsspec.coreimporturl_to_fsfromtorch.distributed.checkpoint.filesystemimport(FileSystemBase,FileSystemReader,FileSystemWriter,)__all__=["FsspecWriter","FsspecReader",]classFileSystem(FileSystemBase):def__init__(self)->None:self.fs:Optional[AbstractFileSystem]=None@contextmanagerdefcreate_stream(self,path:Union[str,os.PathLike],mode:str)->Generator[io.IOBase,None,None]:assertself.fsisnotNonewithself.fs.transaction:withfsspec.open(str(path),mode)asstream:yieldstreamdefconcat_path(self,path:Union[str,os.PathLike],suffix:str)->Union[str,os.PathLike]:returnos.path.join(path,suffix)definit_path(self,path:Union[str,os.PathLike])->Union[str,os.PathLike]:self.fs,_=url_to_fs(path)returnpathdefrename(self,path:Union[str,os.PathLike],new_path:Union[str,os.PathLike])->None:self.fs.rename(path,new_path)defmkdir(self,path:[str,os.PathLike])->None:self.fs.makedirs(path,exist_ok=True)@classmethoddefvalidate_checkpoint_id(cls,checkpoint_id:Union[str,os.PathLike])->bool:ifisinstance(checkpoint_id,Path):returnFalsetry:url_to_fs(checkpoint_id)exceptValueErrorase:returnFalsereturnTrue
[docs]classFsspecWriter(FileSystemWriter):""" Basic implementation of StorageWriter using FFspec. This implementation makes the following assumptions and simplifications: * The checkpoint path is an empty or non-existing directory. * File creation is atomic The checkpoint consist of one file per write request plus a `.metadata` file with the serialized metadata. """def__init__(self,path:Union[str,os.PathLike],single_file_per_rank:bool=True,sync_files:bool=True,thread_count:int=1,per_thread_copy_ahead:int=10_000_000,)->None:""" Initialize the writer pointing to `path`. Args: path: directory where the checkpoint will be written to. single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True. sync_files : force files to be synced to permanent storage. Default to True. thread_count: Number of IO threads to use to write. Default to 1. per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb. N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure. """super().__init__(path,single_file_per_rank,sync_files,thread_count,per_thread_copy_ahead)self.fs=FileSystem()self.path=self.fs.init_path(path)@classmethoddefvalidate_checkpoint_id(cls,checkpoint_id:Union[str,os.PathLike])->bool:returnFileSystem.validate_checkpoint_id(checkpoint_id)
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. As the current maintainers of this site, Facebook’s Cookies Policy applies. Learn more, including about available controls: Cookies Policy.