Shortcuts

Source code for torchx.tracker.api

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from __future__ import annotations

import logging
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import lru_cache
from typing import Iterable, Mapping, Optional

from torchx.util.entrypoints import load_group
from torchx.util.modules import load_module

logger: logging.Logger = logging.getLogger(__name__)

ENV_TORCHX_TRACKERS = "TORCHX_TRACKERS"
ENV_TORCHX_PARENT_RUN_ID = "TORCHX_PARENT_RUN_ID"
ENV_TORCHX_JOB_ID = "TORCHX_JOB_ID"


@dataclass
class TrackerSource:
    """
    Dataclass to represent sources at backend tracker level

    Args:
        source_run_id(str): source ID that can be either other TorchX handle or ID of external entity such as experiment
        artifact_name(Optional[str]): type of of source. Can be interpreted as type of relationship that can be used for filtering.
    """

    source_run_id: str
    artifact_name: Optional[str]


@dataclass
class TrackerArtifact:
    """
    Dataclass to represent artifacts at backend tracker level

    Args:
        name(str): Name of the artifact
        path(str): Path to actual artifact
        metadata(Optional[Mapping[str, object]]): Additional metadata to store about artifact
    """

    name: str
    path: str
    metadata: Optional[Mapping[str, object]]


@dataclass
class AppRunTrackableSource:
    """
    Dataclass to represent sources at user API level

    Args:
        parent(AppRun): source AppRun that current run is derived from.
        artifact_name(Optional[str]): type of artifact represening parent AppRun.
    """

    parent: AppRun
    artifact_name: Optional[str]


class Lineage: ...


[docs]class TrackerBase(ABC): """ Abstraction of tracking solution implementations/services. This API is stil experimental and may change in the future to a large extend. """ @abstractmethod def add_artifact( self, run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None, ) -> None: """ Adds an artifact to the tracker with the specified name, path and any arbitrary metadata. """ ... @abstractmethod def artifacts(self, run_id: str) -> Mapping[str, TrackerArtifact]: """ Fetches all of the artifacts for the specified run or the current one if not specified. """ ... @abstractmethod def add_metadata(self, run_id: str, **kwargs: object) -> None: """ Adds any arbitrary metadata values to the specified experiment. """ ... @abstractmethod def metadata(self, run_id: str) -> Mapping[str, object]: """ Fetches the metadata for the specified experiment. """ ... @abstractmethod def add_source( self, run_id: str, source_id: str, artifact_name: Optional[str] = None, ) -> None: """ Adds a link to a different job identifying the lineage of the current experiment and the specific artifact that's being used. """ ... @abstractmethod def sources( self, run_id: str, artifact_name: Optional[str] = None, ) -> Iterable[TrackerSource]: """ Returns sources for the specified run. Artifact name can be used to filter the sources. """ ... @abstractmethod def lineage(self, run_id: str) -> Lineage: """ Returns the lineage for the specified experiment. The lineage includes all parent jobs and artifacts this job depends on as well as any jobs that are consuming artifacts from this job. """ ... @abstractmethod def run_ids(self, **kwargs: str) -> Iterable[str]: """ Returns list of experiment run ids. Optionally includes filter parameters. """ ...
def tracker_config_env_var_name(entrypoint_key: str) -> str: """Utility method to derive tracker config env variable name given tracker name""" return f"TORCHX_TRACKER_{entrypoint_key.upper()}_CONFIG" def _extract_tracker_name_and_config_from_environ() -> Mapping[str, Optional[str]]: if ENV_TORCHX_TRACKERS not in os.environ: logger.info("No trackers were configured, skipping setup.") return {} tracker_backend_entrypoints = os.environ[ENV_TORCHX_TRACKERS] logger.info(f"Trackers: {ENV_TORCHX_TRACKERS}={tracker_backend_entrypoints}") entries = {} for entrypoint_key in tracker_backend_entrypoints.split(","): config = None config_env_name = tracker_config_env_var_name(entrypoint_key) if config_env_name in os.environ: config = os.environ[config_env_name] entries[entrypoint_key] = config return entries def build_trackers( factory_and_config: Mapping[str, Optional[str]] ) -> Iterable[TrackerBase]: trackers = [] entrypoint_factories = load_group("torchx.tracker") or {} if not entrypoint_factories: logger.warning("No 'torchx.tracker' entry_points are defined.") for factory_name, config in factory_and_config.items(): factory = entrypoint_factories.get(factory_name) or load_module(factory_name) if not factory: logger.warning( f"No tracker factory `{factory_name}` found in entry_points or modules. See https://pytorch.org/torchx/main/tracker.html#module-torchx.tracker" ) continue if config: logger.info(f"Tracker config found for `{factory_name}` as `{config}`") else: logger.info(f"No tracker config specified for `{factory_name}`") tracker = factory(config) trackers.append(tracker) return trackers def trackers_from_environ() -> Iterable[TrackerBase]: """ Builds list of Trackers that will be used to persist tracking information and will be used by AppRun to delegate calls to the each instance. Expects `TORCHX_TRACKERS` env variable to contain list of entry-point factory keys, separated by comma. Optionally, for each tracker key user can pass config string value under `TORCHX_TRACKER_<ENTRYPOINT_NAME>_CONFIG` env variable. It is up to each implementation to interpret the value, eg it can an encoded data or path to richer config properties file. Entry-points(factory methods) must exist in runtime when job is running since this runs within user-job space. """ entrypoint_and_config = _extract_tracker_name_and_config_from_environ() if entrypoint_and_config: return build_trackers(entrypoint_and_config) return []
[docs]@dataclass class AppRun: """ Exposes tracker API to at the job level and should the only API that encapsulates that module implementation. This API is stil experimental and may change in the future. Args: id(str): identity of the job used by tracker API backends(Iterable[TrackerBase]): list of TrackerBase implementations that will be used to persist the data. """ id: str backends: Iterable[TrackerBase] @staticmethod @lru_cache(maxsize=1) # noqa: B019 def run_from_env() -> AppRun: """ Creates an :py:class:`AppRun` from environment variables. The environment variables are set by the torchx runner. Hence if the application is launched via the torchx CLI as: .. code-block:: shell-session $ torchx run utils_python --script main.py And the tracker settings are configured in ``.torchxconfig``, then this function returns an ``AppRun`` with the configured tracker backends. This function returns a singleton ``AppRun`` hence the same instances of the tracker backend objects. .. note:: When the application is NOT launched via torchx, this function will return an "empty" ``AppRun`` with the ``job_id`` set to a constant ``<UNDEFINED>`` since the app was not "launched" (e.g. submitted as a job) and hence no canonical ``job_id`` exists. No trackers are hooked up to the ``AppRun`` hence calling ``add_*()`` (write) methods on the returned apprun will be a no-op. Usage: .. doctest:: >>> from torchx.mock_tracker.api import AppRun >>> apprun = AppRun.run_from_env() >>> apprun.add_metadata(md_1 = "foo", md_2 = "bar") """ torchx_job_id = os.getenv(ENV_TORCHX_JOB_ID, default="<UNDEFINED>") trackers = trackers_from_environ() if ENV_TORCHX_PARENT_RUN_ID in os.environ: parent_run_id = os.environ[ENV_TORCHX_PARENT_RUN_ID] logger.info(f"Tracker parent run ID: '{parent_run_id}'") for tracker in trackers: tracker.add_source(torchx_job_id, parent_run_id, artifact_name=None) return AppRun(id=torchx_job_id, backends=trackers) def add_metadata(self, **kwargs: object) -> None: """Stores metadata for the current run""" for backend in self.backends: backend.add_metadata(self.id, **kwargs) def add_artifact( self, name: str, path: str, metadata: Optional[Mapping[str, object]] = None ) -> None: """Stores artifacts for the current run Args: name(str): name of the artifact path(str): path of the artifact that is stored metadata(Optional[Mapping[str, object]]): optional metadata attached to artifact information """ for backend in self.backends: backend.add_artifact(self.id, name, path, metadata) def job_id(self) -> str: """Current Id of the run""" return self.id def add_source(self, source_id: str, artifact_name: Optional[str] = None) -> None: """ Attaches source to this run. Sources can be either other TorchX runs or external entities such as experiments that may or may not be queriable. Args: source_id(str): identity of the source artifact_name(Optional[str]): optional value of type of source """ for backend in self.backends: backend.add_source(self.id, source_id, artifact_name) def sources(self) -> Iterable[AppRunTrackableSource]: """ Returns `AppRunTrackableSource` for the run. Uses first backend to query this information, although it supports list of trackers in order to persist tracking information. """ model_run_sources = [] if self.backends: backend = next(iter(self.backends)) sources = backend.sources(self.id) for source in sources: parent = AppRun(source.source_run_id, backends=self.backends) model_run_source = AppRunTrackableSource(parent, source.artifact_name) model_run_sources.append(model_run_source) return model_run_sources def children(self) -> Iterable[AppRun]: ...

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources