Shortcuts

torchx.specs

This contains the TorchX AppDef and related component definitions. These are used by components to define the apps which can then be launched via a TorchX scheduler or pipeline adapter.

AppDef

class torchx.specs.AppDef(name: str, roles: ~typing.List[~torchx.specs.api.Role] = <factory>, metadata: ~typing.Dict[str, str] = <factory>)[source]

Represents a distributed application made up of multiple Roles and metadata. Contains the necessary information for the driver to submit this app to the scheduler.

Parameters:
  • name – Name of application

  • roles – List of roles

  • metadata – metadata to the app (treatment of metadata is scheduler dependent)

Role

class torchx.specs.Role(name: str, image: str, min_replicas: ~typing.Optional[int] = None, base_image: ~typing.Optional[str] = None, entrypoint: str = '<MISSING>', args: ~typing.List[str] = <factory>, env: ~typing.Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: ~typing.Dict[str, int] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, mounts: ~typing.List[~typing.Union[~torchx.specs.api.BindMount, ~torchx.specs.api.VolumeMount, ~torchx.specs.api.DeviceMount]] = <factory>)[source]

A set of nodes that perform a specific duty within the AppDef. Examples:

  1. Distributed data parallel app - made up of a single role (trainer).

  2. App with parameter server - made up of multiple roles (trainer, ps).

Note

An image is a software bundle that is installed on the container scheduled by the scheduler. The container on the scheduler dictates what an image actually is. An image could be as simple as a tar-ball or map to a docker image. The scheduler typically knows how to “pull” the image given an image name (str), which could be a simple name (e.g. docker image) or a url e.g. s3://path/my_image.tar).

Usage:

trainer = Role(name="trainer",
               image = "pytorch/torch:1",
               entrypoint = "my_trainer.py"
               args = ["--arg", "foo", ENV_VAR="FOOBAR"],
               num_replicas = 4,
               resource = Resource(cpu=1, gpu=1, memMB=500),
               port_map={"tcp_store":8080, "tensorboard": 8081},
               metadata={"local_cwd.property", value})
Parameters:
  • name – name of the role

  • image – a software bundle that is installed on a container.

  • entrypoint – command (within the container) to invoke the role

  • args – commandline arguments to the entrypoint cmd

  • env – environment variable mappings

  • num_replicas – number of container replicas to run

  • min_replicas – minimum number of replicas for the job to start. When set the job size can automatically adjust between min_replicas and num_replicas depending on the cluster resources and policies. If the scheduler doesn’t support auto scaling this field is ignored and the job size will be num_replicas. EXPERIMENTAL: For HOT_SPARE restart policy this field is used to indicate the quorum required for the job to run.

  • max_retries – max number of retries before giving up

  • retry_policy – retry behavior upon replica failures

  • resource – Resource requirement for the role. The role should be scheduled by the scheduler on num_replicas container, each of them should have at least resource guarantees.

  • port_map – Port mapping for the role. The key is the unique identifier of the port e.g. “tensorboard”: 9090

  • metadata – Free form information that is associated with the role, for example scheduler specific data. The key should follow the pattern: $scheduler.$key

  • mounts – a list of mounts on the machine

pre_proc(scheduler: str, dryrun_info: AppDryRunInfo) AppDryRunInfo[source]

Modifies the scheduler request based on the role specific configuration. The method is invoked for each role during scheduler submit_dryrun. If there are multiple roles, the method is invoked for each role in order that is defined by the AppDef.roles list.

class torchx.specs.RetryPolicy(value)[source]

Defines the retry policy for the Roles in the AppDef. The policy defines the behavior when the role replica encounters a failure:

  1. unsuccessful (non zero) exit code

  2. hardware/host crashes

  3. preemption

  4. eviction

Note

Not all retry policies are supported by all schedulers. However all schedulers must support RetryPolicy.APPLICATION. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).

  1. REPLICA: Replaces the replica instance. Surviving replicas are untouched.

    Use with dist.ddp component to have torchelastic coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.

  2. APPLICATION: Restarts the entire application.

  3. HOT_SPARE: Restarts the replicas for a role as long as quorum (min_replicas)

    is not violated using extra hosts as spares. It does not really support elasticity and just uses the delta between num_replicas and min_replicas as spares (EXPERIMENTAL).

Resource

class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: ~typing.Dict[str, ~typing.Any] = <factory>, devices: ~typing.Dict[str, int] = <factory>)[source]

Represents resource requirements for a Role.

Parameters:
  • cpu – number of logical cpu cores. The definition of a CPU core depends on the scheduler. See your scheduler documentation for how a logical CPU core maps to physical cores and threads.

  • gpu – number of gpus

  • memMB – MB of ram

  • capabilities – additional hardware specs (interpreted by scheduler)

  • devices – a list of named devices with their quantities

Note: you should prefer to use named_resources instead of specifying the raw resource requirement directly.

static copy(original: Resource, **capabilities: Any) Resource[source]

Copies a resource and applies new capabilities. If the same capabilities are present in the original resource and as parameter, the one from parameter will be used.

torchx.specs.resource(cpu: Optional[int] = None, gpu: Optional[int] = None, memMB: Optional[int] = None, h: Optional[str] = None) Resource[source]

Convenience method to create a Resource object from either the raw resource specs (cpu, gpu, memMB) or the registered named resource (h). Note that the (cpu, gpu, memMB) is mutually exclusive with h taking predecence if specified.

If h is specified then it is used to look up the resource specs from the list of registered named resources. See registering named resource.

Otherwise a Resource object is created from the raw resource specs.

Example:

resource(cpu=1) # returns Resource(cpu=1)
resource(named_resource="foobar") # returns registered named resource "foo"
resource(cpu=1, named_resource="foobar") # returns registered named resource "foo" (cpu=1 ignored)
resource() # returns default resource values
resource(cpu=None, gpu=None, memMB=None) # throws
torchx.specs.get_named_resources(res: str) Resource[source]

Get resource object based on the string definition registered via entrypoints.txt.

TorchX implements named_resource registration mechanism, which consists of the following steps:

  1. Create a module and define your resource retrieval function:

# my_module.resources
from typing import Dict
from torchx.specs import Resource

def gpu_x_1() -> Dict[str, Resource]:
    return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
  1. Register resource retrieval in the entrypoints section:

[torchx.named_resources]
gpu_x_1 = my_module.resources:gpu_x_1

The gpu_x_1 can be used as string argument to this function:

from torchx.specs import named_resources
resource = named_resources["gpu_x_1"]

AWS Named Resources

torchx.specs.named_resources_aws contains resource definitions that represent corresponding AWS instance types taken from https://aws.amazon.com/ec2/instance-types/. The resources are exposed via entrypoints after installing torchx lib. The mapping is stored in the setup.py file.

The named resources currently do not specify AWS instance type capabilities but merely represent the equvalent resource in mem, cpu and gpu numbers.

Note

These resource definitions may change in future. It is expected for each user to manage their own resources. Follow https://pytorch.org/torchx/latest/specs.html#torchx.specs.get_named_resources to set up named resources.

Usage:

from torchx.specs import named_resources
print(named_resources["aws_t3.medium"])
print(named_resources["aws_m5.2xlarge"])
print(named_resources["aws_p3.2xlarge"])
print(named_resources["aws_p3.8xlarge"])
torchx.specs.named_resources_aws.aws_m5_2xlarge() Resource[source]
torchx.specs.named_resources_aws.aws_p3_2xlarge() Resource[source]
torchx.specs.named_resources_aws.aws_p3_8xlarge() Resource[source]
torchx.specs.named_resources_aws.aws_t3_medium() Resource[source]

Macros

class torchx.specs.macros[source]

Defines macros that can be used in the elements of Role.args values of Role.env. The macros will be substituted at runtime to their actual values.

Warning

Macros used fields of Role other than the ones mentioned above, are NOT substituted.

Available macros:

  1. img_root - root directory of the pulled container.image

  2. app_id - application id as assigned by the scheduler

  3. replica_id - unique id for each instance of a replica of a Role,

    for instance a role with 3 replicas could have the 0, 1, 2 as replica ids. Note that when the container fails and is replaced, the new container will have the same replica_id as the one it is replacing. For instance if node 1 failed and was replaced by the scheduler the replacing node will also have replica_id=1.

Example:

# runs: hello_world.py --app_id ${app_id}
trainer = Role(
           name="trainer",
           entrypoint="hello_world.py",
           args=["--app_id", macros.app_id],
           env={"IMAGE_ROOT_DIR": macros.img_root})
app = AppDef("train_app", roles=[trainer])
app_handle = session.run(app, scheduler="local_docker", cfg={})
class Values(img_root: str, app_id: str, replica_id: str, rank0_env: str, base_img_root: str = 'DEPRECATED')[source]
apply(role: Role) Role[source]

apply applies the values to a copy the specified role and returns it.

substitute(arg: str) str[source]

substitute applies the values to the template arg.

Run Configs

class torchx.specs.runopts[source]

Holds the accepted scheduler run configuration keys, default value (if any), and help message string. These options are provided by the Scheduler and validated in Session.run against user provided run cfg. Allows None default values. Required opts must NOT have a non-None default.

Important

This class has no accessors because it is intended to be constructed and returned by Scheduler.run_config_options and printed out as a “help” tool or as part of an exception msg.

Usage:

opts = runopts()

opts.add("run_as_user", type_=str, help="user to run the job as")
opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True)
opts.add("priority", type_=float, default=0.5, help="job priority")
opts.add("preemptible", type_=bool, default=False, help="is the job preemptible")

# invalid
opts.add("illegal", default=10, required=True)
opts.add("bad_type", type=str, default=10)

opts.check(cfg)
print(opts)
add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]] = None, required: bool = False) None[source]

Adds the config option with the given help string and default value (if any). If the default is not specified then this option is a required option.

cfg_from_str(cfg_str: str) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]][source]

Parses scheduler cfg from a string literal and returns a cfg map where the cfg values have been cast into the appropriate types as specified by this runopts object. Unknown keys are ignored and not returned in the resulting map.

Note

Unlike the method resolve, this method does NOT resolve default options or check that the required options are actually present in the given cfg_str. This method is intended to be called before calling resolve() when the input is a string encoded run cfg. That is to fully resolve the cfg, call opt.resolve(opt.cfg_from_str(cfg_literal)).

If the cfg_str is an empty string, then an empty cfg is returned. Otherwise, at least one kv-pair delimited by "=" (equal) is expected.

Either "," (comma) or ";" (semi-colon) can be used to delimit multiple kv-pairs.

CfgVal allows List of primitives, which can be passed as either "," or ";" (semi-colon) delimited. Since the same delimiters are used to delimit between cfg kv pairs, this method interprets the last (trailing) "," or ";" as the delimiter between kv pairs. See example below.

Examples:

opts = runopts()
opts.add("FOO", type_=List[str], default=["a"], help="an optional list option")
opts.add("BAR", type_=str, required=True, help="a required str option")

# required and default options not checked
# method returns strictly parsed cfg from the cfg literal string
opts.cfg_from_str("") == {}

# however, unknown options are ignored
# since the value type is unknown hence cannot cast to the correct type
opts.cfg_from_str("UNKNOWN=VALUE") == {}

opts.cfg_from_str("FOO=v1") == {"FOO": "v1"}

opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]}
opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]}

opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
get(name: str) Optional[runopt][source]

Returns option if any was registered, or None otherwise

static is_type(obj: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]], tp: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]) bool[source]

Returns True if obj is type of tp. Similar to isinstance() but supports tp = List[str], thus can be used to validate ConfigValue.

resolve(cfg: Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]][source]

Checks the given config against this runopts and sets default configs if not set.

Note

Extra configs unknown to this run option are ignored.

Run Status

class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>)[source]

The runtime status of the AppDef. The scheduler can return an arbitrary text message (msg field). If any error occurs, scheduler can populate structured_error_msg with json response.

replicas represent the statuses of the replicas in the job. If the job runs with multiple retries, the parameter will contain the statuses of the most recent retry. Note: if the previous retries failed, but the most recent retry succeeded or in progress, replicas will not contain occurred errors.

format(filter_roles: Optional[List[str]] = None) str[source]
Format logs for app status. The app status include:
  1. State: State of the application.

  2. Num Restarts: The number of application restarts.

  3. Roles: List of roles.

  4. Msg: Arbitrary text message the scheduler returned.

  5. Structured Error Msg: Json response error msg.

  6. UI URL: Application URL

raise_for_status() None[source]

raise_for_status will raise an AppStatusError if the state is not SUCCEEDED.

class torchx.specs.AppState(value)[source]

State of the application. An application starts from an initial UNSUBMITTED state and moves through SUBMITTED, PENDING, RUNNING states finally reaching a terminal state: SUCCEEDED,``FAILED``, CANCELLED.

If the scheduler supports preemption, the app moves from a RUNNING state to PENDING upon preemption.

If the user stops the application, then the application state moves to STOPPED, then to CANCELLED when the job is actually cancelled by the scheduler.

  1. UNSUBMITTED - app has not been submitted to the scheduler yet

  2. SUBMITTED - app has been successfully submitted to the scheduler

  3. PENDING - app has been submitted to the scheduler pending allocation

  4. RUNNING - app is running

  5. SUCCEEDED - app has successfully completed

  6. FAILED - app has unsuccessfully completed

  7. CANCELLED - app was cancelled before completing

  8. UNKNOWN - app state is unknown

torchx.specs.ReplicaState

alias of AppState

Mounts

torchx.specs.parse_mounts(opts: List[str]) List[Union[BindMount, VolumeMount, DeviceMount]][source]

parse_mounts parses a list of options into typed mounts following a similar format to Dockers bind mount.

Multiple mounts can be specified in the same list. type must be specified first in each.

Ex:

type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]

Supported types:

BindMount: type=bind,src=<host path>,dst=<container path>[,readonly] VolumeMount: type=volume,src=<name/id>,dst=<container path>[,readonly] DeviceMount: type=device,src=/dev/<dev>[,dst=<container path>][,perm=rwm]

class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[source]

Defines a bind mount to mount –bind a host path into the worker environment. See scheduler documentation on how bind mounts operate for each scheduler.

Parameters:
  • src_path – the path on the host

  • dst_path – the path in the worker environment/container

  • read_only – whether the mount should be read only

class torchx.specs.VolumeMount(src: str, dst_path: str, read_only: bool = False)[source]

Defines a persistent volume mount to mount into the worker environment. :param src: the name or ID of the volume to mount :param dst_path: the path in the worker environment/container :param read_only: whether the mount should be read only

class torchx.specs.DeviceMount(src_path: str, dst_path: str, permissions: str = 'rwm')[source]

Defines a host device to mount into the container. :param src_path: the path on the host :param dst_path: the path in the worker environment/container :param permissions: the permissions to set on the device. Default: read, write, mknode

Component Linter

torchx.specs.file_linter.validate(path: str, component_function: str) List[LinterMessage][source]

Validates the function to make sure it complies the component standard.

validate finds the component_function and vaidates it for according to the following rules:

  1. The function must have google-styple docs

  2. All function parameters must be annotated

  3. The function must return torchx.specs.api.AppDef

Parameters:
  • path – Path to python source file.

  • component_function – Name of the function to be validated.

Returns:

List of validation errors

Return type:

List[LinterMessage]

torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) Tuple[str, Dict[str, str]][source]

Parses the function and arguments description from the provided function. Docstring should be in google-style format

If function has no docstring, the function description will be the name of the function, TIP on how to improve the help message and arguments descriptions will be names of the arguments.

The arguments that are not present in the docstring will contain default/required information

Parameters:

fn – Function with or without docstring

Returns:

function description, arguments description where key is the name of the argument and value

if the description

class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[source]
class torchx.specs.file_linter.TorchFunctionVisitor(component_function_name: str)[source]

Visitor that finds the component_function and runs registered validators on it. Current registered validators:

  • TorchxFunctionArgsValidator - validates arguments of the function.
    Criteria:
    • Each argument should be annotated with the type

    • The following types are supported:
      • primitive_types: {int, str, float},

      • Optional[primitive_types],

      • Dict[primitive_types, primitive_types],

      • List[primitive_types],

      • Optional[Dict[primitive_types, primitive_types]],

      • Optional[List[primitive_types]]

visit_FunctionDef(node: FunctionDef) None[source]

Validates the function def with the child validators.

class torchx.specs.file_linter.TorchXArgumentHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]

Help message formatter which adds default values and required to argument help.

If the argument is required, the class appends (required) at the end of the help message. If the argument has default value, the class appends (default: $DEFAULT) at the end. The formatter is designed to be used only for the torchx components functions. These functions do not have both required and default arguments.

class torchx.specs.file_linter.TorchxFunctionArgsValidator[source]
validate(app_specs_func_def: FunctionDef) List[LinterMessage][source]

Method to call to validate the provided function def.

class torchx.specs.file_linter.TorchxFunctionValidator[source]
abstract validate(app_specs_func_def: FunctionDef) List[LinterMessage][source]

Method to call to validate the provided function def.

class torchx.specs.file_linter.TorchxReturnValidator[source]
validate(app_specs_func_def: FunctionDef) List[LinterMessage][source]
Validates return annotation of the torchx function. Current allowed annotations:
  • AppDef

  • specs.AppDef

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources