
Source code for torchx.tracker.backend.fsspec

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from __future__ import annotations

import json
import os
import time
from base64 import b32decode, b32encode
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Mapping, Optional

import fsspec

from torchx.tracker.api import Lineage, TrackerArtifact, TrackerBase, TrackerSource

def generate_filename() -> str:
    timestamp = time.time_ns()
    return str(timestamp)

def _encode_torchx_run_id(run_id: str) -> str:
    Encodes run_id that can be POSIX compatible, case-aware filename

    Note: TorchX run_id scheme is: <scheduler_backend>://<session_name>/<app_id>
    return b32encode(run_id.encode("ascii")).decode("utf-8")

def _decode_torchx_run_id(s: str) -> str:
    Symmetric operation for `_encode_torchx_run_id` function to decode filename as a run_id
    return b32decode(s).decode("utf-8")

class _FsspecTrackerPathBuilder:
    Encapsulation of the path logic used by `FsspecTracker`

    root_dir: str

    def with_run_id(self, run_id: str) -> _FsspecTrackerPathBuilder:
        encoded_run_id = _encode_torchx_run_id(run_id)
        return self.with_subpath(encoded_run_id)

    def with_subpath(self, subpath: str) -> _FsspecTrackerPathBuilder:
        os.path.join(self.root_dir, subpath)
        return _FsspecTrackerPathBuilder(root_dir=os.path.join(self.root_dir, subpath))

    def with_unique_filename(self) -> _FsspecTrackerPathBuilder:
        filename = generate_filename()
        return _FsspecTrackerPathBuilder(root_dir=os.path.join(self.root_dir, filename))

    def path(self) -> str:
        return self.root_dir

[docs]class FsspecTracker(TrackerBase): """ Implements `TrackerBase` using Fsspec abstraction and has an advantage of using various storage options for persisting the data. Important: `torchx.tracker.api` API is still experimental, hence there are no backwards compatibility gurantees with future releases yet. Each run will have a directory with subdirs for metadata, artifact, source and descendants data. """ def __init__(self, fs: fsspec.AbstractFileSystem, root_dir: str) -> None: assert fs.exists( root_dir ), f"Expects FSSpec tracker directory '{root_dir}' to exist." self.fs = fs self._path_builder = _FsspecTrackerPathBuilder(root_dir) def _persist( self, run_id: str, category: str, content: Mapping[str, object] ) -> None: category_path_builder = self._path_builder.with_run_id(run_id).with_subpath( category ) self.fs.mkdirs(category_path_builder.path(), exist_ok=True) filename = category_path_builder.with_unique_filename().path() with, mode="w") as f: json.dump(content, f) def _load(self, run_id: str, category: str) -> Iterable[Mapping[str, object]]: data = [] category_path_builder = self._path_builder.with_run_id(run_id).with_subpath( category ) if not self.fs.exists(category_path_builder.path()): return data for listing in self.fs.listdir(category_path_builder.path()): file_name = listing["name"] with as f: content = json.load(f) data.append(content) return data def add_artifact( self, run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None, ) -> None: entry = {"name": name, "path": path, "metadata": metadata} self._persist(run_id, "artifacts", entry) def artifacts(self, run_id: str) -> Mapping[str, TrackerArtifact]: artifacts = {} entries = self._load(run_id, "artifacts") for entry in entries: name = str(entry["name"]) path = str(entry["path"]) metadata = entry["metadata"] artifact = TrackerArtifact(name, path, metadata) # pyre-ignore artifacts[name] = artifact return artifacts def add_metadata(self, run_id: str, **metadata: object) -> None: entry = {"metadata": metadata} self._persist(run_id, "metadata", entry) def metadata(self, run_id: str) -> Mapping[str, object]: metadata = {} entries = self._load(run_id, "metadata") for entry in entries: stored_metadata = entry["metadata"] for k, v in stored_metadata.items(): # pyre-ignore metadata[k] = v return metadata def add_source( self, run_id: str, source_id: str, artifact_name: Optional[str] = None, ) -> None: sources_path_builder = self._path_builder.with_run_id(run_id).with_subpath( "sources" ) self.fs.mkdirs(sources_path_builder.path(), exist_ok=True) parent_ref_file = sources_path_builder.with_run_id(source_id).path() artifact_name = artifact_name or "" with, mode="w") as f: artifact_name = artifact_name or "" f.write(f"{artifact_name}\n") # write into parent as well (if exists) that will allow traversing descendants parent_descendants_path_builder = self._path_builder.with_run_id( source_id ).with_subpath("descendants") parent_descendants_ref_path = parent_descendants_path_builder.path() if self.fs.exists(parent_descendants_ref_path): if not self.fs.exists(parent_descendants_ref_path): self.fs.mkdirs(parent_descendants_ref_path) descendant_ref_path = parent_descendants_path_builder.with_run_id( run_id ).path() with, mode="a") as f: f.write(f"{artifact_name}\n") def _read_source_file( self, source_file: str, artifact_name: Optional[str] = None ) -> Iterable[TrackerSource]: entries = [] name = os.path.basename(source_file) with as f: lines = [l.strip() for l in f.readlines()] lines = [line.decode() for line in lines if line] source_run_id = _decode_torchx_run_id(name) if lines: for artifact_type in lines: if artifact_name and artifact_type != artifact_name: continue tracker_source = TrackerSource(source_run_id, artifact_type) entries.append(tracker_source) elif not artifact_name: tracker_source = TrackerSource(source_run_id, None) entries.append(tracker_source) return entries def sources( self, run_id: str, artifact_name: Optional[str] = None, ) -> Iterable[TrackerSource]: entries = [] sources_path_builder = self._path_builder.with_run_id(run_id).with_subpath( "sources" ) sources_path = sources_path_builder.path() if self.fs.exists(sources_path): source_files = self.fs.listdir(sources_path) for source_file in source_files: source = source_file["name"] entries.extend(self._read_source_file(source, artifact_name)) return entries # pyre-ignore[14]: def run_ids(self, parent_run_id: Optional[str] = None) -> Iterable[str]: all_sources = [] root_dir = self._path_builder.path() if self.fs.exists(root_dir): source_files = self.fs.listdir(root_dir) for source_file in source_files: if source_file["type"] != "directory": continue if parent_run_id: parent_id_file = f"{source_file['name']}/sources/{_encode_torchx_run_id(parent_run_id)}" if not self.fs.exists(parent_id_file): continue source = source_file["name"] encoded_source_run_id = os.path.basename(source) all_sources.append(_decode_torchx_run_id(encoded_source_run_id)) return all_sources def lineage(self, run_id: str) -> Lineage: raise NotImplementedError("") def __repr__(self) -> str: return f"<FsspecTracker: root_path={self._path_builder.path()}>"
def _put_config(key: str, value: str, config: Dict[str, Any]) -> None: idx = key.find(".") if idx < 0: # not a nested key -> set key = val config[key] = value elif idx == len(key) - 1: # key ends with "." -> illegal raise ValueError( f"Illegal config key `{key}`. Key should not have a `.` suffix" ) else: first_key = key[:idx] rest_keys = key[idx + 1 :] nested_config = config.setdefault(first_key, {}) _put_config(rest_keys, value, nested_config) def _read_config(config_file: str) -> Mapping[str, Any]: # TODO add support for resource based config data: Dict[str, Any] = {} with, "rt") as f: for line in f: if line.startswith("#"): # skip comments continue k, sep, v = line.partition("=") if k and sep and v: _put_config(k.strip(), v.strip(), data) return data def create(config_file: str) -> TrackerBase: """ Entry-point to build Tracker. Note the configuration itself expects fsspec URI with at least two entries: - protocol: FSSpec protocol (eg. S3, local) - root_path: path to use to store the data (in hiearchical fashion) In addition any other non-comment entries will be passed to a constructor (eg. authentication data) """ config = _read_config(config_file) if "protocol" not in config or "root_path" not in config: raise Exception(f"Please specify 'protocol' and 'root_path' in {config_file}") protocol = config["protocol"] del config["protocol"] root = config["root_path"] del config["root_path"] fs = fsspec.filesystem(protocol, **config) tracker = FsspecTracker(fs, root) return tracker


Access comprehensive developer documentation for PyTorch

View Docs


Get in-depth tutorials for beginners and advanced developers

View Tutorials


Find development resources and get your questions answered

View Resources