Shortcuts

AWS Batch

This contains the TorchX AWS Batch scheduler which can be used to run TorchX components directly on AWS Batch.

This scheduler is in prototype stage and may change without notice.

Prerequisites

You’ll need to create an AWS Batch queue configured for multi-node parallel jobs.

See https://docs.aws.amazon.com/batch/latest/userguide/Batch_GetStarted.html#first-run-step-2 for how to setup a job queue and compute environment. It needs to be backed by EC2 for multi-node parallel jobs.

See https://docs.aws.amazon.com/batch/latest/userguide/multi-node-parallel-jobs.html for more information on distributed jobs.

If you want to use workspaces and container patching you’ll also need to configure a docker registry to store the patched containers with your changes such as AWS ECR.

See https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-create-repository for how to create a image repository.

class torchx.schedulers.aws_batch_scheduler.AWSBatchScheduler(session_name: str, client: Optional[Any] = None, log_client: Optional[Any] = None, docker_client: Optional[DockerClient] = None)[source]

Bases: Scheduler[AWSBatchOpts], DockerWorkspace

AWSBatchScheduler is a TorchX scheduling interface to AWS Batch.

$ pip install torchx[kubernetes]
$ torchx run --scheduler aws_batch --scheduler_args queue=torchx utils.echo --image alpine:latest --msg hello
aws_batch://torchx_user/1234
$ torchx status aws_batch://torchx_user/1234
...

Authentication is loaded from the environment using the boto3 credential handling.

Config Options

    usage:
        queue=QUEUE,[image_repo=IMAGE_REPO],[share_id=SHARE_ID],[priority=PRIORITY]

    required arguments:
        queue=QUEUE (str)
            queue to schedule job in

    optional arguments:
        image_repo=IMAGE_REPO (str, None)
            The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container
        share_id=SHARE_ID (str, None)
            The share identifier for the job. This must be set if and only if the job queue has a scheduling policy.
        priority=PRIORITY (int, None)
            The scheduling priority for the job within the context of share_id. Higher number (between 0 and 9999) means higher priority. This will only take effect if the job queue has a scheduling policy.

Mounts

This class supports bind mounting host directories, efs volumes and host devices.

  • bind mount: type=bind,src=<host path>,dst=<container path>[,readonly]

  • efs volume: type=volume,src=<efs id>,dst=<container path>[,readonly]

  • devices: type=device,src=/dev/infiniband/uverbs0,[dst=<container path>][,perm=rwm]

See torchx.specs.parse_mounts() for more info.

For other filesystems such as FSx you can mount them onto the host and bind mount them into your job: https://aws.amazon.com/premiumsupport/knowledge-center/batch-fsx-lustre-file-system-mount/

For Elastic Fabric Adapter (EFA) you’ll need to use a device mount to mount them into the container: https://docs.aws.amazon.com/batch/latest/userguide/efa.html

Compatibility

Feature

Scheduler Support

Fetch Logs

✔️

Distributed Jobs

✔️

Cancel Job

✔️

Describe Job

Partial support. AWSBatchScheduler will return job and replica status but does not provide the complete original AppSpec.

Workspaces / Patching

✔️

Mounts

✔️

describe(app_id: str) Optional[DescribeAppResponse][source]

Describes the specified application.

Returns:

AppDef description or None if the app does not exist.

list() List[str][source]

Lists the app ids launched on the scheduler. Note: This API is in prototype phase and is subject to change.

log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

Returns an iterator to the log lines of the k``th replica of the ``role. The iterator ends end all qualifying log lines have been read.

If the scheduler supports time-based cursors fetching log lines for custom time ranges, then the since, until fields are honored, otherwise they are ignored. Not specifying since and until is equivalent to getting all available log lines. If the until is empty, then the iterator behaves like tail -f, following the log output until the job reaches a terminal state.

The exact definition of what constitutes a log is scheduler specific. Some schedulers may consider stderr or stdout as the log, others may read the logs from a log file.

Behaviors and assumptions:

  1. Produces an undefined-behavior if called on an app that does not exist The caller should check that the app exists using exists(app_id) prior to calling this method.

  2. Is not stateful, calling this method twice with same parameters returns a new iterator. Prior iteration progress is lost.

  3. Does not always support log-tailing. Not all schedulers support live log iteration (e.g. tailing logs while the app is running). Refer to the specific scheduler’s documentation for the iterator’s behavior.

3.1 If the scheduler supports log-tailing, it should be controlled

by``should_tail`` parameter.

  1. Does not guarantee log retention. It is possible that by the time this method is called, the underlying scheduler may have purged the log records for this application. If so this method raises an arbitrary exception.

  2. If should_tail is True, the method only raises a StopIteration exception when the accessible log lines have been fully exhausted and the app has reached a final state. For instance, if the app gets stuck and does not produce any log lines, then the iterator blocks until the app eventually gets killed (either via timeout or manually) at which point it raises a StopIteration.

    If should_tail is False, the method raises StopIteration when there are no more logs.

  3. Need not be supported by all schedulers.

  4. Some schedulers may support line cursors by supporting __getitem__ (e.g. iter[50] seeks to the 50th log line).

  5. Whitespace is preserved, each new line should include \n. To

    support interactive progress bars the returned lines don’t need to include \n but should then be printed without a newline to correctly handle \r carriage returns.

Parameters:

streams – The IO output streams to select. One of: combined, stdout, stderr. If the selected stream isn’t supported by the scheduler it will throw an ValueError.

Returns:

An Iterator over log lines of the specified role replica

Raises:

NotImplementedError – if the scheduler does not support log iteration

run_opts() runopts[source]

Returns the run configuration options expected by the scheduler. Basically a --help for the run API.

schedule(dryrun_info: AppDryRunInfo[BatchJob]) str[source]

Same as submit except that it takes an AppDryRunInfo. Implementors are encouraged to implement this method rather than directly implementing submit since submit can be trivially implemented by:

dryrun_info = self.submit_dryrun(app, cfg)
return schedule(dryrun_info)
class torchx.schedulers.aws_batch_scheduler.BatchJob(name: str, queue: str, share_id: Union[str, NoneType], job_def: Dict[str, object], images_to_push: Dict[str, Tuple[str, str]])[source]

Reference

torchx.schedulers.aws_batch_scheduler.create_scheduler(session_name: str, **kwargs: object) AWSBatchScheduler[source]

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources