Shortcuts

torchx.runner

The runner lets you run components as a standalone job on one of the supported schedulers. The runner takes an specs.AppDef object, which is the result of evaluating the component function with a set of user-provided arguments, along with the scheduler name and scheduler args (aka runcfg or runopts) and submits the component as a job (see diagram below).

_images/runner_diagram.png

Runner Functions

torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner[source]

Convenience method to construct and get a Runner object. Usage:

with get_runner() as runner:
  app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
  print(runner.status(app_handle))

Alternatively,

runner = get_runner()
try:
   app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
   print(runner.status(app_handle))
finally:
   runner.close()
Parameters:
  • name – human readable name that will be included as part of all launched jobs.

  • scheduler_params – extra arguments that will be passed to the constructor of all available schedulers.

Runner Classes

class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[source]

TorchX individual component runner. Has the methods for the user to act upon AppDefs. The Runner will cache information about the launched apps if they were launched locally otherwise it’s up to the specific scheduler implementation.

cancel(app_handle: str) None[source]

Stops the application, effectively directing the scheduler to cancel the job. Does nothing if the app does not exist.

Note

This method returns as soon as the cancel request has been submitted to the scheduler. The application will be in a RUNNING state until the scheduler actually terminates the job. If the scheduler successfully interrupts the job and terminates it the final state will be CANCELLED otherwise it will be FAILED.

close() None[source]

Closes this runner and frees/cleans up any allocated resources. Transitively calls the close() method on all the schedulers. Once this method is called on the runner, the runner object is deemed invalid and any methods called on the runner object as well as the schedulers associated with this runner have undefined behavior. It is ok to call this method multiple times on the same runner object.

describe(app_handle: str) Optional[AppDef][source]

Reconstructs the application (to the best extent) given the app handle. Note that the reconstructed application may not be the complete app as it was submitted via the run API. How much of the app can be reconstructed is scheduler dependent.

Returns:

AppDef or None if the app does not exist anymore or if the scheduler does not support describing the app handle

dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

Dry runs an app on the given scheduler with the provided run configs. Does not actually submit the app but rather returns what would have been submitted. The returned AppDryRunInfo is pretty formatted and can be printed or logged directly.

Usage:

dryrun_info = session.dryrun(app, scheduler="local", cfg)
print(dryrun_info)
dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

Dryrun version of run_component(). Will not actually run the component, but just returns what “would” have run.

list(scheduler: str) List[ListAppResponse][source]

For apps launched on the scheduler, this API returns a list of ListAppResponse objects each of which have app id, app handle and its status. Note: This API is in prototype phase and is subject to change.

log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

Returns an iterator over the log lines of the specified job container.

Note

  1. k is the node (host) id NOT the rank.

  2. since and until need not always be honored (depends on scheduler).

Warning

The semantics and guarantees of the returned iterator is highly scheduler dependent. See torchx.specs.api.Scheduler.log_iter for the high-level semantics of this log iterator. For this reason it is HIGHLY DISCOURAGED to use this method for generating output to pass to downstream functions/dependencies. This method DOES NOT guarantee that 100% of the log lines are returned. It is totally valid for this method to return no or partial log lines if the scheduler has already totally or partially purged log records for the application.

Return lines will include whitespace characters such as \n or \r. When outputting the lines you should make sure to avoid adding extra newline characters.

Usage:

app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())

print("== trainer node 0 logs ==")
for line in session.log_lines(app_handle, "trainer", k=0):
   # for prints newlines will already be present in the line
   print(line, end="")

   # when writing to a file nothing extra is necessary
   f.write(line)

Discouraged anti-pattern:

# DO NOT DO THIS!
# parses accuracy metric from log and reports it for this experiment run
accuracy = -1
for line in session.log_lines(app_handle, "trainer", k=0):
   if matches_regex(line, "final model_accuracy:[0-9]*"):
       accuracy = parse_accuracy(line)
       break
report(experiment_name, accuracy)
Parameters:
  • app_handle – application handle

  • role_name – role within the app (e.g. trainer)

  • k – k-th replica of the role to fetch the logs for

  • regex – optional regex filter, returns all lines if left empty

  • since – datetime based start cursor. If left empty begins from the first log line (start of job).

  • until – datetime based end cursor. If left empty, follows the log output until the job completes and all log lines have been consumed.

Returns:

An iterator over the role k-th replica of the specified application.

Raises:

UnknownAppException – if the app does not exist in the scheduler

run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

Runs the given application in the specified mode.

Note

sub-classes of Runner should implement schedule method rather than overriding this method directly.

Returns:

An application handle that is used to call other action APIs on the app.

run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

Runs a component.

component has the following resolution order(high to low):
  • User-registered components. Users can register components via

    https://packaging.python.org/specifications/entry-points/. Method looks for entrypoints in the group torchx.components.

  • Builtin components relative to torchx.components. The path to the component should

    be module name relative to torchx.components and function name in a format: $module.$function.

  • File-based components in format: $FILE_PATH:FUNCTION_NAME. Both relative and

    absolute paths supported.

Usage:

# resolved to torchx.components.distributed.ddp()
runner.run_component("distributed.ddp", ...)

# resolved to my_component() function in ~/home/components.py
runner.run_component("~/home/components.py:my_component", ...)
Returns:

An application handle that is used to call other action APIs on the app

Raises:
  • ComponentValidationException – if component is invalid.

  • ComponentNotFoundException – if the component_path is failed to resolve.

schedule(dryrun_info: AppDryRunInfo) str[source]

Actually runs the application from the given dryrun info. Useful when one needs to overwrite a parameter in the scheduler request that is not configurable from one of the object APIs.

Warning

Use sparingly since abusing this method to overwrite many parameters in the raw scheduler request may lead to your usage of TorchX going out of compliance in the long term. This method is intended to unblock the user from experimenting with certain scheduler-specific features in the short term without having to wait until TorchX exposes scheduler features in its APIs.

Note

It is recommended that sub-classes of Session implement this method instead of directly implementing the run method.

Usage:

dryrun_info = session.dryrun(app, scheduler="default", cfg)

# overwrite parameter "foo" to "bar"
dryrun_info.request.foo = "bar"

app_handle = session.submit(dryrun_info)
scheduler_backends() List[str][source]

Returns a list of all supported scheduler backends.

scheduler_run_opts(scheduler: str) runopts[source]

Returns the runopts for the supported scheduler backends.

Usage:

local_runopts = session.scheduler_run_opts("local_cwd")
print("local scheduler run options: {local_runopts}")
Returns:

The runopts for the specified scheduler type.

status(app_handle: str) Optional[AppStatus][source]
Returns:

The status of the application, or None if the app does not exist anymore (e.g. was stopped in the past and removed from the scheduler’s backend).

stop(app_handle: str) None[source]

See method cancel.

Warning

This method will be deprecated in the future. It has been replaced with cancel which provides the same functionality. The change is to be consistent with the CLI and scheduler API.

wait(app_handle: str, wait_interval: float = 10) Optional[AppStatus][source]

Block waits (indefinitely) for the application to complete. Possible implementation:

while(True):
    app_status = status(app)
    if app_status.is_terminal():
        return
    sleep(10)
Parameters:
  • app_handle – the app handle to wait for completion

  • wait_interval – the minimum interval to wait before polling for status

Returns:

The terminal status of the application, or None if the app does not exist anymore

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources