torchx.specs¶
This contains the TorchX AppDef and related component definitions. These are used by components to define the apps which can then be launched via a TorchX scheduler or pipeline adapter.
AppDef¶
- class torchx.specs.AppDef(name: str, roles: ~typing.List[~torchx.specs.api.Role] = <factory>, metadata: ~typing.Dict[str, str] = <factory>)[source]¶
Represents a distributed application made up of multiple
Roles
and metadata. Contains the necessary information for the driver to submit this app to the scheduler.- Parameters:
name – Name of application
roles – List of roles
metadata – metadata to the app (treatment of metadata is scheduler dependent)
Role¶
- class torchx.specs.Role(name: str, image: str, min_replicas: ~typing.Optional[int] = None, base_image: ~typing.Optional[str] = None, entrypoint: str = '<MISSING>', args: ~typing.List[str] = <factory>, env: ~typing.Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: ~typing.Dict[str, int] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, mounts: ~typing.List[~typing.Union[~torchx.specs.api.BindMount, ~torchx.specs.api.VolumeMount, ~torchx.specs.api.DeviceMount]] = <factory>)[source]¶
A set of nodes that perform a specific duty within the
AppDef
. Examples:Distributed data parallel app - made up of a single role (trainer).
App with parameter server - made up of multiple roles (trainer, ps).
Note
An
image
is a software bundle that is installed on the container scheduled by the scheduler. The container on the scheduler dictates what an image actually is. An image could be as simple as a tar-ball or map to a docker image. The scheduler typically knows how to “pull” the image given an image name (str), which could be a simple name (e.g. docker image) or a url e.g.s3://path/my_image.tar
).Usage:
trainer = Role(name="trainer", image = "pytorch/torch:1", entrypoint = "my_trainer.py" args = ["--arg", "foo", ENV_VAR="FOOBAR"], num_replicas = 4, resource = Resource(cpu=1, gpu=1, memMB=500), port_map={"tcp_store":8080, "tensorboard": 8081}, metadata={"local_cwd.property", value})
- Parameters:
name – name of the role
image – a software bundle that is installed on a container.
entrypoint – command (within the container) to invoke the role
args – commandline arguments to the entrypoint cmd
env – environment variable mappings
num_replicas – number of container replicas to run
min_replicas – minimum number of replicas for the job to start. When set the job size can automatically adjust between min_replicas and num_replicas depending on the cluster resources and policies. If the scheduler doesn’t support auto scaling this field is ignored and the job size will be num_replicas.
max_retries – max number of retries before giving up
retry_policy – retry behavior upon replica failures
resource – Resource requirement for the role. The role should be scheduled by the scheduler on
num_replicas
container, each of them should have at leastresource
guarantees.port_map – Port mapping for the role. The key is the unique identifier of the port e.g. “tensorboard”: 9090
metadata – Free form information that is associated with the role, for example scheduler specific data. The key should follow the pattern:
$scheduler.$key
mounts – a list of mounts on the machine
- pre_proc(scheduler: str, dryrun_info: AppDryRunInfo) AppDryRunInfo [source]¶
Modifies the scheduler request based on the role specific configuration. The method is invoked for each role during scheduler
submit_dryrun
. If there are multiple roles, the method is invoked for each role in order that is defined by theAppDef.roles
list.
- class torchx.specs.RetryPolicy(value)[source]¶
Defines the retry policy for the
Roles
in theAppDef
. The policy defines the behavior when the role replica encounters a failure:unsuccessful (non zero) exit code
hardware/host crashes
preemption
eviction
Note
Not all retry policies are supported by all schedulers. However all schedulers must support
RetryPolicy.APPLICATION
. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).- REPLICA: Replaces the replica instance. Surviving replicas are untouched.
Use with
dist.ddp
component to have torchelastic coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.
APPLICATION: Restarts the entire application.
- ROLE: Restarts the role when any error occurs in that role. This does not
restart the whole job.
Resource¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: ~typing.Dict[str, ~typing.Any] = <factory>, devices: ~typing.Dict[str, int] = <factory>)[source]¶
Represents resource requirements for a
Role
.- Parameters:
cpu – number of logical cpu cores. The definition of a CPU core depends on the scheduler. See your scheduler documentation for how a logical CPU core maps to physical cores and threads.
gpu – number of gpus
memMB – MB of ram
capabilities – additional hardware specs (interpreted by scheduler)
devices – a list of named devices with their quantities
Note: you should prefer to use named_resources instead of specifying the raw resource requirement directly.
- torchx.specs.resource(cpu: Optional[int] = None, gpu: Optional[int] = None, memMB: Optional[int] = None, h: Optional[str] = None) Resource [source]¶
Convenience method to create a
Resource
object from either the raw resource specs (cpu, gpu, memMB) or the registered named resource (h
). Note that the (cpu, gpu, memMB) is mutually exclusive withh
taking predecence if specified.If
h
is specified then it is used to look up the resource specs from the list of registered named resources. See registering named resource.Otherwise a
Resource
object is created from the raw resource specs.Example:
resource(cpu=1) # returns Resource(cpu=1) resource(named_resource="foobar") # returns registered named resource "foo" resource(cpu=1, named_resource="foobar") # returns registered named resource "foo" (cpu=1 ignored) resource() # returns default resource values resource(cpu=None, gpu=None, memMB=None) # throws
- torchx.specs.get_named_resources(res: str) Resource [source]¶
Get resource object based on the string definition registered via entrypoints.txt.
TorchX implements
named_resource
registration mechanism, which consists of the following steps:Create a module and define your resource retrieval function:
# my_module.resources from typing import Dict from torchx.specs import Resource def gpu_x_1() -> Dict[str, Resource]: return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
Register resource retrieval in the entrypoints section:
[torchx.named_resources] gpu_x_1 = my_module.resources:gpu_x_1
The
gpu_x_1
can be used as string argument to this function:from torchx.specs import named_resources resource = named_resources["gpu_x_1"]
AWS Named Resources¶
torchx.specs.named_resources_aws contains resource definitions that represent corresponding AWS instance types taken from https://aws.amazon.com/ec2/instance-types/. The resources are exposed via entrypoints after installing torchx lib. The mapping is stored in the setup.py file.
The named resources currently do not specify AWS instance type capabilities but merely represent the equvalent resource in mem, cpu and gpu numbers.
Note
These resource definitions may change in future. It is expected for each user to manage their own resources. Follow https://pytorch.org/torchx/latest/specs.html#torchx.specs.get_named_resources to set up named resources.
Usage:
from torchx.specs import named_resources print(named_resources["aws_t3.medium"]) print(named_resources["aws_m5.2xlarge"]) print(named_resources["aws_p3.2xlarge"]) print(named_resources["aws_p3.8xlarge"])
Macros¶
- class torchx.specs.macros[source]¶
Defines macros that can be used in the elements of
Role.args
values ofRole.env
. The macros will be substituted at runtime to their actual values.Warning
Macros used fields of
Role
other than the ones mentioned above, are NOT substituted.Available macros:
img_root
- root directory of the pulled container.imageapp_id
- application id as assigned by the schedulerreplica_id
- unique id for each instance of a replica of a Role,for instance a role with 3 replicas could have the 0, 1, 2 as replica ids. Note that when the container fails and is replaced, the new container will have the same
replica_id
as the one it is replacing. For instance if node 1 failed and was replaced by the scheduler the replacing node will also havereplica_id=1
.
Example:
# runs: hello_world.py --app_id ${app_id} trainer = Role( name="trainer", entrypoint="hello_world.py", args=["--app_id", macros.app_id], env={"IMAGE_ROOT_DIR": macros.img_root}) app = AppDef("train_app", roles=[trainer]) app_handle = session.run(app, scheduler="local_docker", cfg={})
Run Configs¶
- class torchx.specs.runopts[source]¶
Holds the accepted scheduler run configuration keys, default value (if any), and help message string. These options are provided by the
Scheduler
and validated inSession.run
against user provided run cfg. AllowsNone
default values. Required opts must NOT have a non-None default.Important
This class has no accessors because it is intended to be constructed and returned by
Scheduler.run_config_options
and printed out as a “help” tool or as part of an exception msg.Usage:
opts = runopts() opts.add("run_as_user", type_=str, help="user to run the job as") opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) opts.add("priority", type_=float, default=0.5, help="job priority") opts.add("preemptible", type_=bool, default=False, help="is the job preemptible") # invalid opts.add("illegal", default=10, required=True) opts.add("bad_type", type=str, default=10) opts.check(cfg) print(opts)
- add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]] = None, required: bool = False) None [source]¶
Adds the
config
option with the given help string anddefault
value (if any). If thedefault
is not specified then this option is a required option.
- cfg_from_json_repr(json_repr: str) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]] [source]¶
Converts the given dict to a valid cfg for this
runopts
object.
- cfg_from_str(cfg_str: str) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]] [source]¶
Parses scheduler
cfg
from a string literal and returns a cfg map where the cfg values have been cast into the appropriate types as specified by this runopts object. Unknown keys are ignored and not returned in the resulting map.Note
Unlike the method
resolve
, this method does NOT resolve default options or check that the required options are actually present in the givencfg_str
. This method is intended to be called before callingresolve()
when the input is a string encoded run cfg. That is to fully resolve the cfg, callopt.resolve(opt.cfg_from_str(cfg_literal))
.If the
cfg_str
is an empty string, then an emptycfg
is returned. Otherwise, at least one kv-pair delimited by"="
(equal) is expected.Either
","
(comma) or";"
(semi-colon) can be used to delimit multiple kv-pairs.CfgVal
allowsList
of primitives, which can be passed as either","
or";"
(semi-colon) delimited. Since the same delimiters are used to delimit between cfg kv pairs, this method interprets the last (trailing)","
or";"
as the delimiter between kv pairs. See example below.Examples:
opts = runopts() opts.add("FOO", type_=List[str], default=["a"], help="an optional list option") opts.add("BAR", type_=str, required=True, help="a required str option") # required and default options not checked # method returns strictly parsed cfg from the cfg literal string opts.cfg_from_str("") == {} # however, unknown options are ignored # since the value type is unknown hence cannot cast to the correct type opts.cfg_from_str("UNKNOWN=VALUE") == {} opts.cfg_from_str("FOO=v1") == {"FOO": "v1"} opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]} opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]} opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"} opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"} opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
- static is_type(obj: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]], tp: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]) bool [source]¶
Returns True if
obj
is type oftp
. Similar to isinstance() but supports tp = List[str], thus can be used to validate ConfigValue.
- resolve(cfg: Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]] [source]¶
Checks the given config against this
runopts
and sets default configs if not set.Note
Extra configs unknown to this run option are ignored.
Run Status¶
- class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>)[source]¶
The runtime status of the
AppDef
. The scheduler can return an arbitrary text message (msg field). If any error occurs, scheduler can populatestructured_error_msg
with json response.replicas
represent the statuses of the replicas in the job. If the job runs with multiple retries, the parameter will contain the statuses of the most recent retry. Note: if the previous retries failed, but the most recent retry succeeded or in progress,replicas
will not contain occurred errors.- format(filter_roles: Optional[List[str]] = None) str [source]¶
- Format logs for app status. The app status include:
State: State of the application.
Num Restarts: The number of application restarts.
Roles: List of roles.
Msg: Arbitrary text message the scheduler returned.
Structured Error Msg: Json response error msg.
UI URL: Application URL
- class torchx.specs.AppState(value)[source]¶
State of the application. An application starts from an initial
UNSUBMITTED
state and moves throughSUBMITTED
,PENDING
,RUNNING
states finally reaching a terminal state:SUCCEEDED
,``FAILED``,CANCELLED
.If the scheduler supports preemption, the app moves from a
RUNNING
state toPENDING
upon preemption.If the user stops the application, then the application state moves to
STOPPED
, then toCANCELLED
when the job is actually cancelled by the scheduler.UNSUBMITTED - app has not been submitted to the scheduler yet
SUBMITTED - app has been successfully submitted to the scheduler
PENDING - app has been submitted to the scheduler pending allocation
RUNNING - app is running
SUCCEEDED - app has successfully completed
FAILED - app has unsuccessfully completed
CANCELLED - app was cancelled before completing
UNKNOWN - app state is unknown
Mounts¶
- torchx.specs.parse_mounts(opts: List[str]) List[Union[BindMount, VolumeMount, DeviceMount]] [source]¶
parse_mounts parses a list of options into typed mounts following a similar format to Dockers bind mount.
Multiple mounts can be specified in the same list.
type
must be specified first in each.- Ex:
type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]
- Supported types:
BindMount: type=bind,src=<host path>,dst=<container path>[,readonly] VolumeMount: type=volume,src=<name/id>,dst=<container path>[,readonly] DeviceMount: type=device,src=/dev/<dev>[,dst=<container path>][,perm=rwm]
- class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[source]¶
Defines a bind mount to mount –bind a host path into the worker environment. See scheduler documentation on how bind mounts operate for each scheduler.
- Parameters:
src_path – the path on the host
dst_path – the path in the worker environment/container
read_only – whether the mount should be read only
- class torchx.specs.VolumeMount(src: str, dst_path: str, read_only: bool = False)[source]¶
Defines a persistent volume mount to mount into the worker environment. :param src: the name or ID of the volume to mount :param dst_path: the path in the worker environment/container :param read_only: whether the mount should be read only
- class torchx.specs.DeviceMount(src_path: str, dst_path: str, permissions: str = 'rwm')[source]¶
Defines a host device to mount into the container. :param src_path: the path on the host :param dst_path: the path in the worker environment/container :param permissions: the permissions to set on the device. Default: read, write, mknode
Component Linter¶
- torchx.specs.file_linter.validate(path: str, component_function: str) List[LinterMessage] [source]¶
Validates the function to make sure it complies the component standard.
validate
finds thecomponent_function
and vaidates it for according to the following rules:The function must have google-styple docs
All function parameters must be annotated
The function must return
torchx.specs.api.AppDef
- Parameters:
path – Path to python source file.
component_function – Name of the function to be validated.
- Returns:
List of validation errors
- Return type:
List[LinterMessage]
- torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) Tuple[str, Dict[str, str]] [source]¶
Parses the function and arguments description from the provided function. Docstring should be in google-style format
If function has no docstring, the function description will be the name of the function, TIP on how to improve the help message and arguments descriptions will be names of the arguments.
The arguments that are not present in the docstring will contain default/required information
- Parameters:
fn – Function with or without docstring
- Returns:
- function description, arguments description where key is the name of the argument and value
if the description
- class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[source]¶
- class torchx.specs.file_linter.TorchFunctionVisitor(component_function_name: str)[source]¶
Visitor that finds the component_function and runs registered validators on it. Current registered validators:
- TorchxFunctionArgsValidator - validates arguments of the function.
- Criteria:
Each argument should be annotated with the type
- The following types are supported:
primitive_types: {int, str, float},
Optional[primitive_types],
Dict[primitive_types, primitive_types],
List[primitive_types],
Optional[Dict[primitive_types, primitive_types]],
Optional[List[primitive_types]]
- class torchx.specs.file_linter.TorchXArgumentHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]¶
Help message formatter which adds default values and required to argument help.
If the argument is required, the class appends (required) at the end of the help message. If the argument has default value, the class appends (default: $DEFAULT) at the end. The formatter is designed to be used only for the torchx components functions. These functions do not have both required and default arguments.
- class torchx.specs.file_linter.TorchxFunctionArgsValidator[source]¶
- validate(app_specs_func_def: FunctionDef) List[LinterMessage] [source]¶
Method to call to validate the provided function def.