torchx.runner¶
The runner lets you run components as a standalone job on one of the supported
schedulers. The runner takes an specs.AppDef
object, which
is the result of evaluating the component function with a set of user-provided
arguments, along with the scheduler name and scheduler args (aka runcfg
or runopts
)
and submits the component as a job (see diagram below).
Runner Functions¶
- torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner [source]¶
Convenience method to construct and get a Runner object. Usage:
with get_runner() as runner: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle))
Alternatively,
runner = get_runner() try: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle)) finally: runner.close()
- Parameters:
name – human readable name that will be included as part of all launched jobs.
scheduler_params – extra arguments that will be passed to the constructor of all available schedulers.
Runner Classes¶
- class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[source]¶
TorchX individual component runner. Has the methods for the user to act upon
AppDefs
. TheRunner
will cache information about the launched apps if they were launched locally otherwise it’s up to the specific scheduler implementation.- build_standalone_workspace(workspace_builder: WorkspaceBuilder[S, T], sync: bool = True) PkgInfo[S] [source]¶
Build a standalone workspace for the given role. This method is used to build a workspace for a role independent of the scheduler and also enables asynchronous workspace building using the Role overrides.
- cancel(app_handle: str) None [source]¶
Stops the application, effectively directing the scheduler to cancel the job. Does nothing if the app does not exist.
Note
This method returns as soon as the cancel request has been submitted to the scheduler. The application will be in a
RUNNING
state until the scheduler actually terminates the job. If the scheduler successfully interrupts the job and terminates it the final state will beCANCELLED
otherwise it will beFAILED
.
- close() None [source]¶
Closes this runner and frees/cleans up any allocated resources. Transitively calls the
close()
method on all the schedulers. Once this method is called on the runner, the runner object is deemed invalid and any methods called on the runner object as well as the schedulers associated with this runner have undefined behavior. It is ok to call this method multiple times on the same runner object.
- describe(app_handle: str) Optional[AppDef] [source]¶
Reconstructs the application (to the best extent) given the app handle. Note that the reconstructed application may not be the complete app as it was submitted via the run API. How much of the app can be reconstructed is scheduler dependent.
- Returns:
AppDef or None if the app does not exist anymore or if the scheduler does not support describing the app handle
- dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo [source]¶
Dry runs an app on the given scheduler with the provided run configs. Does not actually submit the app but rather returns what would have been submitted. The returned
AppDryRunInfo
is pretty formatted and can be printed or logged directly.Usage:
dryrun_info = session.dryrun(app, scheduler="local", cfg) print(dryrun_info)
- dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo [source]¶
Dryrun version of
run_component()
. Will not actually run the component, but just returns what “would” have run.
- list(scheduler: str) List[ListAppResponse] [source]¶
For apps launched on the scheduler, this API returns a list of ListAppResponse objects each of which have app id, app handle and its status. Note: This API is in prototype phase and is subject to change.
- log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str] [source]¶
Returns an iterator over the log lines of the specified job container.
Note
k
is the node (host) id NOT therank
.since
anduntil
need not always be honored (depends on scheduler).
Warning
The semantics and guarantees of the returned iterator is highly scheduler dependent. See
torchx.specs.api.Scheduler.log_iter
for the high-level semantics of this log iterator. For this reason it is HIGHLY DISCOURAGED to use this method for generating output to pass to downstream functions/dependencies. This method DOES NOT guarantee that 100% of the log lines are returned. It is totally valid for this method to return no or partial log lines if the scheduler has already totally or partially purged log records for the application.Return lines will include whitespace characters such as
\n
or\r
. When outputting the lines you should make sure to avoid adding extra newline characters.Usage:
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]()) print("== trainer node 0 logs ==") for line in session.log_lines(app_handle, "trainer", k=0): # for prints newlines will already be present in the line print(line, end="") # when writing to a file nothing extra is necessary f.write(line)
Discouraged anti-pattern:
# DO NOT DO THIS! # parses accuracy metric from log and reports it for this experiment run accuracy = -1 for line in session.log_lines(app_handle, "trainer", k=0): if matches_regex(line, "final model_accuracy:[0-9]*"): accuracy = parse_accuracy(line) break report(experiment_name, accuracy)
- Parameters:
app_handle – application handle
role_name – role within the app (e.g. trainer)
k – k-th replica of the role to fetch the logs for
regex – optional regex filter, returns all lines if left empty
since – datetime based start cursor. If left empty begins from the first log line (start of job).
until – datetime based end cursor. If left empty, follows the log output until the job completes and all log lines have been consumed.
- Returns:
An iterator over the role k-th replica of the specified application.
- Raises:
UnknownAppException – if the app does not exist in the scheduler
- run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str [source]¶
Runs the given application in the specified mode.
Note
sub-classes of
Runner
should implementschedule
method rather than overriding this method directly.- Returns:
An application handle that is used to call other action APIs on the app.
- run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str [source]¶
Runs a component.
component
has the following resolution order(high to low):- User-registered components. Users can register components via
https://packaging.python.org/specifications/entry-points/. Method looks for entrypoints in the group
torchx.components
.
- Builtin components relative to torchx.components. The path to the component should
be module name relative to torchx.components and function name in a format:
$module.$function
.
- File-based components in format:
$FILE_PATH:FUNCTION_NAME
. Both relative and absolute paths supported.
- File-based components in format:
Usage:
# resolved to torchx.components.distributed.ddp() runner.run_component("distributed.ddp", ...) # resolved to my_component() function in ~/home/components.py runner.run_component("~/home/components.py:my_component", ...)
- Returns:
An application handle that is used to call other action APIs on the app
- Raises:
ComponentValidationException – if component is invalid.
ComponentNotFoundException – if the
component_path
is failed to resolve.
- schedule(dryrun_info: AppDryRunInfo) str [source]¶
Actually runs the application from the given dryrun info. Useful when one needs to overwrite a parameter in the scheduler request that is not configurable from one of the object APIs.
Warning
Use sparingly since abusing this method to overwrite many parameters in the raw scheduler request may lead to your usage of TorchX going out of compliance in the long term. This method is intended to unblock the user from experimenting with certain scheduler-specific features in the short term without having to wait until TorchX exposes scheduler features in its APIs.
Note
It is recommended that sub-classes of
Session
implement this method instead of directly implementing therun
method.Usage:
dryrun_info = session.dryrun(app, scheduler="default", cfg) # overwrite parameter "foo" to "bar" dryrun_info.request.foo = "bar" app_handle = session.submit(dryrun_info)
- scheduler_run_opts(scheduler: str) runopts [source]¶
Returns the
runopts
for the supported scheduler backends.Usage:
local_runopts = session.scheduler_run_opts("local_cwd") print("local scheduler run options: {local_runopts}")
- Returns:
The
runopts
for the specified scheduler type.
- status(app_handle: str) Optional[AppStatus] [source]¶
- Returns:
The status of the application, or
None
if the app does not exist anymore (e.g. was stopped in the past and removed from the scheduler’s backend).
- stop(app_handle: str) None [source]¶
See method
cancel
.Warning
This method will be deprecated in the future. It has been replaced with
cancel
which provides the same functionality. The change is to be consistent with the CLI and scheduler API.
- wait(app_handle: str, wait_interval: float = 10) Optional[AppStatus] [source]¶
Block waits (indefinitely) for the application to complete. Possible implementation:
while(True): app_status = status(app) if app_status.is_terminal(): return sleep(10)
- Parameters:
app_handle – the app handle to wait for completion
wait_interval – the minimum interval to wait before polling for status
- Returns:
The terminal status of the application, or
None
if the app does not exist anymore