Shortcuts

Ray

class torchx.schedulers.ray_scheduler.RayScheduler(session_name: str, ray_client: Optional[JobSubmissionClient] = None)[source]

Bases: TmpDirWorkspaceMixin, Scheduler[RayOpts]

RayScheduler is a TorchX scheduling interface to Ray. The job def workers will be launched as Ray actors

The job environment is specified by the TorchX workspace. Any files in the workspace will be present in the Ray job unless specified in .torchxignore. Python dependencies will be read from the requirements.txt file located at the root of the workspace unless it’s overridden via -c ...,requirements=foo/requirements.txt.

Config Options

    usage:
        [cluster_config_file=CLUSTER_CONFIG_FILE],[cluster_name=CLUSTER_NAME],[dashboard_address=DASHBOARD_ADDRESS],[requirements=REQUIREMENTS]

    optional arguments:
        cluster_config_file=CLUSTER_CONFIG_FILE (str, None)
            Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.
        cluster_name=CLUSTER_NAME (str, None)
            Override the configured cluster name.
        dashboard_address=DASHBOARD_ADDRESS (str, 127.0.0.1:8265)
            Use ray status to get the dashboard address you will submit jobs against
        requirements=REQUIREMENTS (str, None)
            Path to requirements.txt

Compatibility

Feature

Scheduler Support

Fetch Logs

Partial support. Ray only supports a single log stream so only a dummy “ray/0” combined log role is supported. Tailing and time seeking are not supported.

Distributed Jobs

✔️

Cancel Job

✔️

Describe Job

Partial support. RayScheduler will return job status but does not provide the complete original AppSpec.

Workspaces / Patching

✔️

Mounts

Elasticity

Partial support. Multi role jobs are not supported.

describe(app_id: str) Optional[DescribeAppResponse][source]

Describes the specified application.

Returns:

AppDef description or None if the app does not exist.

list() List[ListAppResponse][source]

For apps launched on the scheduler, this API returns a list of ListAppResponse objects each of which have app id and its status. Note: This API is in prototype phase and is subject to change.

log_iter(app_id: str, role_name: Optional[str] = None, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

Returns an iterator to the log lines of the k``th replica of the ``role. The iterator ends when all qualifying log lines have been read.

If the scheduler supports time-based cursors fetching log lines for custom time ranges, then the since, until fields are honored, otherwise they are ignored. Not specifying since and until is equivalent to getting all available log lines. If the until is empty, then the iterator behaves like tail -f, following the log output until the job reaches a terminal state.

The exact definition of what constitutes a log is scheduler specific. Some schedulers may consider stderr or stdout as the log, others may read the logs from a log file.

Behaviors and assumptions:

  1. Produces an undefined-behavior if called on an app that does not exist The caller should check that the app exists using exists(app_id) prior to calling this method.

  2. Is not stateful, calling this method twice with same parameters returns a new iterator. Prior iteration progress is lost.

  3. Does not always support log-tailing. Not all schedulers support live log iteration (e.g. tailing logs while the app is running). Refer to the specific scheduler’s documentation for the iterator’s behavior.

3.1 If the scheduler supports log-tailing, it should be controlled

by should_tail parameter.

  1. Does not guarantee log retention. It is possible that by the time this method is called, the underlying scheduler may have purged the log records for this application. If so this method raises an arbitrary exception.

  2. If should_tail is True, the method only raises a StopIteration exception when the accessible log lines have been fully exhausted and the app has reached a final state. For instance, if the app gets stuck and does not produce any log lines, then the iterator blocks until the app eventually gets killed (either via timeout or manually) at which point it raises a StopIteration.

    If should_tail is False, the method raises StopIteration when there are no more logs.

  3. Need not be supported by all schedulers.

  4. Some schedulers may support line cursors by supporting __getitem__ (e.g. iter[50] seeks to the 50th log line).

  5. Whitespace is preserved, each new line should include \n. To

    support interactive progress bars the returned lines don’t need to include \n but should then be printed without a newline to correctly handle \r carriage returns.

Parameters:

streams – The IO output streams to select. One of: combined, stdout, stderr. If the selected stream isn’t supported by the scheduler it will throw an ValueError.

Returns:

An Iterator over log lines of the specified role replica

Raises:

NotImplementedError – if the scheduler does not support log iteration

schedule(dryrun_info: AppDryRunInfo[RayJob]) str[source]

Same as submit except that it takes an AppDryRunInfo. Implementers are encouraged to implement this method rather than directly implementing submit since submit can be trivially implemented by:

dryrun_info = self.submit_dryrun(app, cfg)
return schedule(dryrun_info)
wait_until_finish(app_id: str, timeout: int = 30) None[source]

wait_until_finish waits until the specified job has finished with a given timeout. This is intended for testing. Programmatic usage should use the runner wait method instead.

torchx.schedulers.ray_scheduler.create_scheduler(session_name: str, ray_client: Optional[JobSubmissionClient] = None, **kwargs: Any) RayScheduler[source]
torchx.schedulers.ray_scheduler.has_ray() bool[source]

Indicates whether Ray is installed in the current Python environment.

torchx.schedulers.ray_scheduler.serialize(actors: List[RayActor], dirpath: str, output_filename: str = 'actors.json') None[source]
class torchx.schedulers.ray_scheduler.RayJob(app_id: str, working_dir: str, cluster_config_file: ~typing.Optional[str] = None, cluster_name: ~typing.Optional[str] = None, dashboard_address: ~typing.Optional[str] = None, requirements: ~typing.Optional[str] = None, actors: ~typing.List[~torchx.schedulers.ray.ray_common.RayActor] = <factory>)[source]

Represents a job that should be run on a Ray cluster.

Variables:
  • app_id (str) – The unique ID of the application (a.k.a. job).

  • cluster_config_file (Optional[str]) – The Ray cluster configuration file.

  • cluster_name (Optional[str]) – The cluster name to use.

  • dashboard_address (Optional[str]) – The existing dashboard IP address to connect to

  • working_dir (str) – The working directory to copy to the cluster

  • requirements (Optional[str]) – The libraries to install on the cluster per requirements.txt

  • actors (List[torchx.schedulers.ray.ray_common.RayActor]) – The Ray actors which represent the job to be run. This attribute is dumped to a JSON file and copied to the cluster where ray_main.py uses it to initiate the job.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources