Shortcuts

Source code for torchx.runner.api

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import json
import logging
import os
import time
import warnings
from datetime import datetime
from types import TracebackType
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type

from torchx.runner.events import log_event
from torchx.schedulers import get_scheduler_factories, SchedulerFactory
from torchx.schedulers.api import ListAppResponse, Scheduler, Stream
from torchx.specs import (
    AppDef,
    AppDryRunInfo,
    AppHandle,
    AppStatus,
    CfgVal,
    macros,
    make_app_handle,
    materialize_appdef,
    parse_app_handle,
    runopts,
    UnknownAppException,
)
from torchx.specs.finder import get_component
from torchx.tracker.api import (
    ENV_TORCHX_JOB_ID,
    ENV_TORCHX_PARENT_RUN_ID,
    ENV_TORCHX_TRACKERS,
    tracker_config_env_var_name,
)

from torchx.util.types import none_throws
from torchx.workspace.api import WorkspaceMixin

from .config import get_config, get_configs

logger: logging.Logger = logging.getLogger(__name__)


NONE: str = "<NONE>"


def get_configured_trackers() -> Dict[str, Optional[str]]:
    tracker_names = list(get_configs(prefix="torchx", name="tracker").keys())
    if ENV_TORCHX_TRACKERS in os.environ:
        logger.info(f"Using TORCHX_TRACKERS={tracker_names} as tracker names")
        tracker_names = os.environ[ENV_TORCHX_TRACKERS].split(",")

    tracker_names_with_config = {}
    for tracker_name in tracker_names:
        config_value = get_config(prefix="tracker", name=tracker_name, key="config")

        config_env_name = tracker_config_env_var_name(tracker_name)
        if config_env_name in os.environ:
            config_value = os.environ[config_env_name]
            logger.info(
                f"Using {config_env_name}={config_value} for `{tracker_name}` tracker"
            )

        tracker_names_with_config[tracker_name] = config_value
    logger.info(f"Tracker configurations: {tracker_names_with_config}")
    return tracker_names_with_config


[docs]class Runner: """ TorchX individual component runner. Has the methods for the user to act upon ``AppDefs``. The ``Runner`` will cache information about the launched apps if they were launched locally otherwise it's up to the specific scheduler implementation. """ def __init__( self, name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None, ) -> None: """ Creates a new runner instance. Args: name: the human readable name for this session. Jobs launched will inherit this name. schedulers: a list of schedulers the runner can use. """ self._name: str = name self._scheduler_factories = scheduler_factories self._scheduler_params: Dict[str, object] = scheduler_params or {} # pyre-ignore[24]: Scheduler opts self._scheduler_instances: Dict[str, Scheduler] = {} self._apps: Dict[AppHandle, AppDef] = {} # component_name -> map of component_fn_param_name -> user-specified default val encoded as str self._component_defaults: Dict[str, Dict[str, str]] = component_defaults or {} def __enter__(self) -> "Runner": return self def __exit__( self, type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType], ) -> bool: # This method returns False so that if an error is raise within the # ``with`` statement, it is reraised properly # see: https://docs.python.org/3/reference/compound_stmts.html#with # see also: torchx/runner/test/api_test.py#test_context_manager_with_error # self.close() return False
[docs] def close(self) -> None: """ Closes this runner and frees/cleans up any allocated resources. Transitively calls the ``close()`` method on all the schedulers. Once this method is called on the runner, the runner object is deemed invalid and any methods called on the runner object as well as the schedulers associated with this runner have undefined behavior. It is ok to call this method multiple times on the same runner object. """ for name, scheduler in self._scheduler_instances.items(): scheduler.close()
[docs] def run_component( self, component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, CfgVal]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None, ) -> AppHandle: """ Runs a component. ``component`` has the following resolution order(high to low): * User-registered components. Users can register components via https://packaging.python.org/specifications/entry-points/. Method looks for entrypoints in the group ``torchx.components``. * Builtin components relative to `torchx.components`. The path to the component should be module name relative to `torchx.components` and function name in a format: ``$module.$function``. * File-based components in format: ``$FILE_PATH:FUNCTION_NAME``. Both relative and absolute paths supported. Usage: .. code-block:: python # resolved to torchx.components.distributed.ddp() runner.run_component("distributed.ddp", ...) # resolved to my_component() function in ~/home/components.py runner.run_component("~/home/components.py:my_component", ...) Returns: An application handle that is used to call other action APIs on the app Raises: ComponentValidationException: if component is invalid. ComponentNotFoundException: if the ``component_path`` is failed to resolve. """ dryrun_info = self.dryrun_component( component, component_args, scheduler, cfg=cfg, workspace=workspace, parent_run_id=parent_run_id, ) return self.schedule(dryrun_info)
[docs] def dryrun_component( self, component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, CfgVal]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None, ) -> AppDryRunInfo: """ Dryrun version of :py:func:`run_component`. Will not actually run the component, but just returns what "would" have run. """ component_def = get_component(component) app = materialize_appdef( component_def.fn, component_args, self._component_defaults.get(component, None), ) return self.dryrun( app, scheduler, cfg=cfg, workspace=workspace, parent_run_id=parent_run_id, )
[docs] def run( self, app: AppDef, scheduler: str, cfg: Optional[Mapping[str, CfgVal]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None, ) -> AppHandle: """ Runs the given application in the specified mode. .. note:: sub-classes of ``Runner`` should implement ``schedule`` method rather than overriding this method directly. Returns: An application handle that is used to call other action APIs on the app. """ dryrun_info = self.dryrun( app, scheduler, cfg=cfg, workspace=workspace, parent_run_id=parent_run_id ) return self.schedule(dryrun_info)
[docs] def schedule(self, dryrun_info: AppDryRunInfo) -> AppHandle: """ Actually runs the application from the given dryrun info. Useful when one needs to overwrite a parameter in the scheduler request that is not configurable from one of the object APIs. .. warning:: Use sparingly since abusing this method to overwrite many parameters in the raw scheduler request may lead to your usage of TorchX going out of compliance in the long term. This method is intended to unblock the user from experimenting with certain scheduler-specific features in the short term without having to wait until TorchX exposes scheduler features in its APIs. .. note:: It is recommended that sub-classes of ``Session`` implement this method instead of directly implementing the ``run`` method. Usage: :: dryrun_info = session.dryrun(app, scheduler="default", cfg) # overwrite parameter "foo" to "bar" dryrun_info.request.foo = "bar" app_handle = session.submit(dryrun_info) """ scheduler = none_throws(dryrun_info._scheduler) app_image = none_throws(dryrun_info._app).roles[0].image cfg = dryrun_info._cfg with log_event( "schedule", scheduler, app_image=app_image, runcfg=json.dumps(cfg) if cfg else None, ) as ctx: sched = self._scheduler(scheduler) app_id = sched.schedule(dryrun_info) app_handle = make_app_handle(scheduler, self._name, app_id) app = none_throws(dryrun_info._app) self._apps[app_handle] = app _, _, app_id = parse_app_handle(app_handle) ctx._torchx_event.app_id = app_id return app_handle
def name(self) -> str: return self._name
[docs] def dryrun( self, app: AppDef, scheduler: str, cfg: Optional[Mapping[str, CfgVal]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None, ) -> AppDryRunInfo: """ Dry runs an app on the given scheduler with the provided run configs. Does not actually submit the app but rather returns what would have been submitted. The returned ``AppDryRunInfo`` is pretty formatted and can be printed or logged directly. Usage: :: dryrun_info = session.dryrun(app, scheduler="local", cfg) print(dryrun_info) """ # input validation if not app.roles: raise ValueError( f"No roles for app: {app.name}. Did you forget to add roles to AppDef?" ) if ENV_TORCHX_PARENT_RUN_ID in os.environ: parent_run_id = os.environ[ENV_TORCHX_PARENT_RUN_ID] logger.info( f"Using {ENV_TORCHX_PARENT_RUN_ID}={parent_run_id} env variable as tracker parent run id" ) configured_trackers = get_configured_trackers() for role in app.roles: if not role.entrypoint: raise ValueError( f"No entrypoint for role: {role.name}." f" Did you forget to call role.runs(entrypoint, args, env)?" ) if role.num_replicas <= 0: raise ValueError( f"Non-positive replicas for role: {role.name}." f" Did you forget to set role.num_replicas?" ) # Setup tracking # 1. Inject parent identifier # 2. Inject this run's job ID # 3. Get the list of backends to support from .torchconfig # - inject it as TORCHX_TRACKERS=names (it is expected that entrypoints are defined) # - for each backend check configuration file, if exists: # - inject it as TORCHX_TRACKER_<name>_CONFIGFILE=filename role.env[ENV_TORCHX_JOB_ID] = make_app_handle( scheduler, self._name, macros.app_id ) if parent_run_id: role.env[ENV_TORCHX_PARENT_RUN_ID] = parent_run_id if configured_trackers: role.env[ENV_TORCHX_TRACKERS] = ",".join(configured_trackers.keys()) for name, config in configured_trackers.items(): if config: role.env[tracker_config_env_var_name(name)] = config cfg = cfg or dict() with log_event("dryrun", scheduler, runcfg=json.dumps(cfg) if cfg else None): sched = self._scheduler(scheduler) resolved_cfg = sched.run_opts().resolve(cfg) if workspace and isinstance(sched, WorkspaceMixin): role = app.roles[0] old_img = role.image logger.info(f"Checking for changes in workspace `{workspace}`...") logger.info( 'To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.' ) sched.build_workspace_and_update_role(role, workspace, resolved_cfg) if old_img != role.image: logger.info( f"Built new image `{role.image}` based on original image `{old_img}`" f" and changes in workspace `{workspace}` for role[0]={role.name}." ) else: logger.info( f"Reusing original image `{old_img}` for role[0]={role.name}." " Either a patch was built or no changes to workspace was detected." ) sched._validate(app, scheduler) dryrun_info = sched.submit_dryrun(app, resolved_cfg) dryrun_info._scheduler = scheduler return dryrun_info
[docs] def scheduler_run_opts(self, scheduler: str) -> runopts: """ Returns the ``runopts`` for the supported scheduler backends. Usage: :: local_runopts = session.scheduler_run_opts("local_cwd") print("local scheduler run options: {local_runopts}") Returns: The ``runopts`` for the specified scheduler type. """ return self._scheduler(scheduler).run_opts()
[docs] def scheduler_backends(self) -> List[str]: """ Returns a list of all supported scheduler backends. """ return list(self._scheduler_factories.keys())
[docs] def status(self, app_handle: AppHandle) -> Optional[AppStatus]: """ Returns: The status of the application, or ``None`` if the app does not exist anymore (e.g. was stopped in the past and removed from the scheduler's backend). """ scheduler, scheduler_backend, app_id = self._scheduler_app_id( app_handle, check_session=False ) with log_event("status", scheduler_backend, app_id): desc = scheduler.describe(app_id) if not desc: # app does not exist on the scheduler # remove it from apps cache if it exists # effectively removes this app from the list() API self._apps.pop(app_handle, None) return None app_status = AppStatus( desc.state, desc.num_restarts, msg=desc.msg, structured_error_msg=desc.structured_error_msg, roles=desc.roles_statuses, ) if app_status: app_status.ui_url = desc.ui_url return app_status
[docs] def wait( self, app_handle: AppHandle, wait_interval: float = 10 ) -> Optional[AppStatus]: """ Block waits (indefinitely) for the application to complete. Possible implementation: :: while(True): app_status = status(app) if app_status.is_terminal(): return sleep(10) Args: app_handle: the app handle to wait for completion wait_interval: the minimum interval to wait before polling for status Returns: The terminal status of the application, or ``None`` if the app does not exist anymore """ scheduler, scheduler_backend, app_id = self._scheduler_app_id( app_handle, check_session=False ) with log_event("wait", scheduler_backend, app_id): while True: app_status = self.status(app_handle) if not app_status: return None if app_status.is_terminal(): return app_status else: time.sleep(wait_interval)
[docs] def cancel(self, app_handle: AppHandle) -> None: """ Stops the application, effectively directing the scheduler to cancel the job. Does nothing if the app does not exist. .. note:: This method returns as soon as the cancel request has been submitted to the scheduler. The application will be in a ``RUNNING`` state until the scheduler actually terminates the job. If the scheduler successfully interrupts the job and terminates it the final state will be ``CANCELLED`` otherwise it will be ``FAILED``. """ scheduler, scheduler_backend, app_id = self._scheduler_app_id(app_handle) with log_event("cancel", scheduler_backend, app_id): status = self.status(app_handle) if status is not None and not status.is_terminal(): scheduler.cancel(app_id)
[docs] def stop(self, app_handle: AppHandle) -> None: """ See method ``cancel``. .. warning:: This method will be deprecated in the future. It has been replaced with ``cancel`` which provides the same functionality. The change is to be consistent with the CLI and scheduler API. """ warnings.warn( "This method will be deprecated in the future, please use `cancel` instead.", PendingDeprecationWarning, ) self.cancel(app_handle)
[docs] def describe(self, app_handle: AppHandle) -> Optional[AppDef]: """ Reconstructs the application (to the best extent) given the app handle. Note that the reconstructed application may not be the complete app as it was submitted via the run API. How much of the app can be reconstructed is scheduler dependent. Returns: AppDef or None if the app does not exist anymore or if the scheduler does not support describing the app handle """ scheduler, scheduler_backend, app_id = self._scheduler_app_id( app_handle, check_session=False ) with log_event("describe", scheduler_backend, app_id): # if the app is in the apps list, then short circuit everything and return it app = self._apps.get(app_handle, None) if not app: desc = scheduler.describe(app_id) if desc: app = AppDef(name=app_id, roles=desc.roles) return app
[docs] def log_lines( self, app_handle: AppHandle, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None, ) -> Iterable[str]: """ Returns an iterator over the log lines of the specified job container. .. note:: #. ``k`` is the node (host) id NOT the ``rank``. #. ``since`` and ``until`` need not always be honored (depends on scheduler). .. warning:: The semantics and guarantees of the returned iterator is highly scheduler dependent. See ``torchx.specs.api.Scheduler.log_iter`` for the high-level semantics of this log iterator. For this reason it is HIGHLY DISCOURAGED to use this method for generating output to pass to downstream functions/dependencies. This method DOES NOT guarantee that 100% of the log lines are returned. It is totally valid for this method to return no or partial log lines if the scheduler has already totally or partially purged log records for the application. Return lines will include whitespace characters such as ``\\n`` or ``\\r``. When outputting the lines you should make sure to avoid adding extra newline characters. Usage: .. code:: python app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]()) print("== trainer node 0 logs ==") for line in session.log_lines(app_handle, "trainer", k=0): # for prints newlines will already be present in the line print(line, end="") # when writing to a file nothing extra is necessary f.write(line) Discouraged anti-pattern: .. code:: python # DO NOT DO THIS! # parses accuracy metric from log and reports it for this experiment run accuracy = -1 for line in session.log_lines(app_handle, "trainer", k=0): if matches_regex(line, "final model_accuracy:[0-9]*"): accuracy = parse_accuracy(line) break report(experiment_name, accuracy) Args: app_handle: application handle role_name: role within the app (e.g. trainer) k: k-th replica of the role to fetch the logs for regex: optional regex filter, returns all lines if left empty since: datetime based start cursor. If left empty begins from the first log line (start of job). until: datetime based end cursor. If left empty, follows the log output until the job completes and all log lines have been consumed. Returns: An iterator over the role k-th replica of the specified application. Raise: UnknownAppException: if the app does not exist in the scheduler """ scheduler, scheduler_backend, app_id = self._scheduler_app_id( app_handle, check_session=False ) with log_event("log_lines", scheduler_backend, app_id): if not self.status(app_handle): raise UnknownAppException(app_handle) log_iter = scheduler.log_iter( app_id, role_name, k, regex, since, until, should_tail, streams=streams, ) return log_iter
[docs] def list( self, scheduler: str, ) -> List[ListAppResponse]: """ For apps launched on the scheduler, this API returns a list of ListAppResponse objects each of which have app id, app handle and its status. Note: This API is in prototype phase and is subject to change. """ with log_event("list", scheduler): sched = self._scheduler(scheduler) apps = sched.list() for app in apps: app.app_handle = make_app_handle(scheduler, self._name, app.app_id) return apps
# pyre-fixme: Scheduler opts def _scheduler(self, scheduler: str) -> Scheduler: sched = self._scheduler_instances.get(scheduler) if not sched: factory = self._scheduler_factories.get(scheduler) if factory: sched = factory(self._name, **self._scheduler_params) self._scheduler_instances[scheduler] = sched if not sched: raise KeyError( f"Undefined scheduler backend: {scheduler}. Use one of: {self._scheduler_factories.keys()}" ) return sched def _scheduler_app_id( self, app_handle: AppHandle, check_session: bool = True # pyre-fixme: Scheduler opts ) -> Tuple[Scheduler, str, str]: """ Returns the scheduler and app_id from the app_handle. Set ``check_session`` to validate that the session name in the app handle is the same as this session. Raises: ValueError: if ``check_session=True`` and the session in the app handle does not match this session's name KeyError: if no such scheduler backend exists """ scheduler_backend, _, app_id = parse_app_handle(app_handle) scheduler = self._scheduler(scheduler_backend) return scheduler, scheduler_backend, app_id def __repr__(self) -> str: return f"Runner(name={self._name}, schedulers={self._scheduler_factories}, apps={self._apps})"
[docs]def get_runner( name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any, ) -> Runner: """ Convenience method to construct and get a Runner object. Usage: .. code-block:: python with get_runner() as runner: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle)) Alternatively, .. code-block:: python runner = get_runner() try: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle)) finally: runner.close() Args: name: human readable name that will be included as part of all launched jobs. scheduler_params: extra arguments that will be passed to the constructor of all available schedulers. """ if name: warnings.warn( f"Custom session names are deprecated (detected explicitly set session name={name}). \ To prevent this warning from showing again call `get_runner()` without the `name` param. \ As an alternative, you can prefix the app name with the session name.", FutureWarning, ) if not name: name = "torchx" scheduler_factories = get_scheduler_factories() return Runner( name, scheduler_factories, component_defaults, scheduler_params=scheduler_params )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources