Local¶
This contains the TorchX local scheduler which can be used to run TorchX components locally via subprocesses.
- class torchx.schedulers.local_scheduler.LocalScheduler(session_name: str, image_provider_class: Callable[[LocalOpts], ImageProvider], cache_size: int = 100, extra_paths: Optional[List[str]] = None)[source]¶
Bases:
Scheduler
[LocalOpts
]Schedules on localhost. Containers are modeled as processes and certain properties of the container that are either not relevant or that cannot be enforced for localhost runs are ignored. Properties that are ignored:
Resource requirements
Resource limit enforcements
Retry policies
Retry counts (no retries supported)
Deployment preferences
Scheduler support orphan processes cleanup on receiving SIGTERM or SIGINT. The scheduler will terminate the spawned processes.
This is exposed via the scheduler local_cwd.
local_cwd runs the provided app relative to the current working directory and ignores the images field for faster iteration and testing purposes.
Note
The orphan cleanup only works if LocalScheduler is instantiated from the main thread.
Config Options
usage: [log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES] optional arguments: log_dir=LOG_DIR (str, None) dir to write stdout/stderr log files of replicas prepend_cwd=PREPEND_CWD (bool, False) if set, prepends CWD to replica's PATH env var making any binaries in CWD take precedence over those in PATH auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False) sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources. Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.
Compatibility
Note
Due to scheduler differences jobs that run locally may not work when using a different scheduler due to network or software dependencies.
Feature
Scheduler Support
Fetch Logs
✔️
Distributed Jobs
LocalScheduler supports multiple replicas but all replicas will execute on the local host.
Cancel Job
✔️
Describe Job
✔️
Workspaces / Patching
Partial support. LocalScheduler runs the app from a local directory but does not support programmatic workspaces.
Mounts
❌
Elasticity
❌
- auto_set_CUDA_VISIBLE_DEVICES(role_params: Dict[str, List[ReplicaParam]], app: AppDef, cfg: LocalOpts) None [source]¶
If the run option
auto_set_cuda_visible_devices = True
, then sets theCUDA_VISIBLE_DEVICES
env var to each replica’s (node) env var according to the number of gpus specified in each role’s resource specifications, overwriting any existingCUDA_VISIBLE_DEVICES
in the role’senv
field. To manually setCUDA_VISIBLE_DEVICES
, run withauto_set_cuda_visible_devices = False
in the scheduler runcfg.Note
If the host’s device count is less than the total number of requested GPUs, then
CUDA_VISIBLE_DEVICES
is NOT set (even ifauto_set_cuda_visible_devices=True
).Note
This method either sets
CUDA_VISIBLE_DEVICES
on all gpu roles or doesn’tExamples (all examples assume running on a host with 8 GPUs):
Role(num_replicas=2, resource=Resource(gpus=2))
replica_0’s
CUDA_VISIBLE_DEVICES=0,1
replica_1’s
CUDA_VISIBLE_DEVICES=2,3
Role(num_replicas=3, resource=Resource(gpus=4))
Error - `` 3 * 4 = 12 >= 8``
[Role(num_replicas=1, resource=Resource(gpus=2)), Role(num_replicas=3, resource=Resource(gpus=1))]
role_0, replica_0’s
CUDA_VISIBLE_DEVICES=0,1
role_1, replica_0’s
CUDA_VISIBLE_DEVICES=2
role_1, replica_1’s
CUDA_VISIBLE_DEVICES=3
role_1, replica_2’s
CUDA_VISIBLE_DEVICES=4
- close() None [source]¶
Only for schedulers that have local state! Closes the scheduler freeing any allocated resources. Once closed, the scheduler object is deemed to no longer be valid and any method called on the object results in undefined behavior.
This method should not raise exceptions and is allowed to be called multiple times on the same object.
Note
Override only for scheduler implementations that have local state (
torchx/schedulers/local_scheduler.py
). Schedulers simply wrapping a remote scheduler’s client need not implement this method.
- describe(app_id: str) Optional[DescribeAppResponse] [source]¶
Describes the specified application.
- Returns:
AppDef description or
None
if the app does not exist.
- list() List[ListAppResponse] [source]¶
For apps launched on the scheduler, this API returns a list of ListAppResponse objects each of which have app id and its status. Note: This API is in prototype phase and is subject to change.
- log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str] [source]¶
Returns an iterator to the log lines of the
k``th replica of the ``role
. The iterator ends when all qualifying log lines have been read.If the scheduler supports time-based cursors fetching log lines for custom time ranges, then the
since
,until
fields are honored, otherwise they are ignored. Not specifyingsince
anduntil
is equivalent to getting all available log lines. If theuntil
is empty, then the iterator behaves liketail -f
, following the log output until the job reaches a terminal state.The exact definition of what constitutes a log is scheduler specific. Some schedulers may consider stderr or stdout as the log, others may read the logs from a log file.
Behaviors and assumptions:
Produces an undefined-behavior if called on an app that does not exist The caller should check that the app exists using
exists(app_id)
prior to calling this method.Is not stateful, calling this method twice with same parameters returns a new iterator. Prior iteration progress is lost.
Does not always support log-tailing. Not all schedulers support live log iteration (e.g. tailing logs while the app is running). Refer to the specific scheduler’s documentation for the iterator’s behavior.
- 3.1 If the scheduler supports log-tailing, it should be controlled
by
should_tail
parameter.
Does not guarantee log retention. It is possible that by the time this method is called, the underlying scheduler may have purged the log records for this application. If so this method raises an arbitrary exception.
If
should_tail
is True, the method only raises aStopIteration
exception when the accessible log lines have been fully exhausted and the app has reached a final state. For instance, if the app gets stuck and does not produce any log lines, then the iterator blocks until the app eventually gets killed (either via timeout or manually) at which point it raises aStopIteration
.If
should_tail
is False, the method raisesStopIteration
when there are no more logs.Need not be supported by all schedulers.
Some schedulers may support line cursors by supporting
__getitem__
(e.g.iter[50]
seeks to the 50th log line).- Whitespace is preserved, each new line should include
\n
. To support interactive progress bars the returned lines don’t need to include
\n
but should then be printed without a newline to correctly handle\r
carriage returns.
- Whitespace is preserved, each new line should include
- Parameters:
streams – The IO output streams to select. One of: combined, stdout, stderr. If the selected stream isn’t supported by the scheduler it will throw an ValueError.
- Returns:
An
Iterator
over log lines of the specified role replica- Raises:
NotImplementedError – if the scheduler does not support log iteration
- schedule(dryrun_info: AppDryRunInfo[PopenRequest]) str [source]¶
Same as
submit
except that it takes anAppDryRunInfo
. Implementers are encouraged to implement this method rather than directly implementingsubmit
sincesubmit
can be trivially implemented by:dryrun_info = self.submit_dryrun(app, cfg) return schedule(dryrun_info)
Image Providers¶
- class torchx.schedulers.local_scheduler.ImageProvider[source]¶
Manages downloading and setting up an image on localhost. This is only needed for
LocalhostScheduler
since typically real schedulers will do this on-behalf of the user.- abstract fetch(image: str) str [source]¶
Pulls the given image and returns a path to the pulled image on the local host or empty string if no op
- fetch_role(role: Role) str [source]¶
Identical to
fetch(image)
in that it fetches the role’s image and returns the path to the image root, except that it allows the role to be updated by this provider. Useful when additional environment variables need to be set on the role to comply with the image provider’s way of fetching and managing images on localhost. By default this method simply delegates tofetch(role.image)
. Override if necessary.
- get_cwd(image: str) Optional[str] [source]¶
Returns the absolute path of the mounted img directory. Used as a working directory for starting child processes.
- get_replica_param(img_root: str, role: Role, stdout: Optional[str] = None, stderr: Optional[str] = None, combined: Optional[str] = None) ReplicaParam [source]¶
Given the role replica’s specs returns
ReplicaParam
holder which hold the arguments to eventually pass tosubprocess.Popen
to actually invoke and run each role’s replica. Theimg_root
is expected to be the return value ofself.fetch(role.image)
. Since the role’s image need only be fetched once (not for each replica) it is expected that the caller call thefetch
method once per role and call this method for eachrole.num_replicas
.
- class torchx.schedulers.local_scheduler.CWDImageProvider(cfg: LocalOpts)[source]¶
Similar to LocalDirectoryImageProvider however it ignores the image name and uses the current working directory as the image path.
Example:
fetch(Image(name="/tmp/foobar"))
returns os.getcwd()fetch(Image(name="foobar:latest"))
returns os.getcwd()
- fetch(image: str) str [source]¶
Pulls the given image and returns a path to the pulled image on the local host or empty string if no op
- class torchx.schedulers.local_scheduler.LocalDirectoryImageProvider(cfg: LocalOpts)[source]¶
Interprets the image name as the path to a directory on local host. Does not “fetch” (e.g. download) anything. Used in conjunction with
LocalScheduler
to run local binaries.The image name must be an absolute path and must exist.
Example:
fetch(Image(name="/tmp/foobar"))
returns/tmp/foobar
fetch(Image(name="foobar"))
raisesValueError
fetch(Image(name="/tmp/dir/that/does/not_exist"))
raisesValueError
- fetch(image: str) str [source]¶
- Raises:
ValueError – if the image name is not an absolute dir and if it does not exist or is not a directory
- get_cwd(image: str) Optional[str] [source]¶
Returns the absolute working directory. Used as a working directory for the child process.
- get_entrypoint(img_root: str, role: Role) str [source]¶
Returns the role entrypoint. When local scheduler is executed with image_type=dir, the childprocess working directory will be set to the img_root. If role.entrypoint is relative path, it would be resolved as img_root/role.entrypoint, if role.entrypoint is absolute path, it will be executed as provided.
Reference¶
- torchx.schedulers.local_scheduler.create_scheduler(session_name: str, cache_size: int = 100, extra_paths: ~typing.Optional[~typing.List[str]] = None, image_provider_class: ~typing.Callable[[~torchx.schedulers.local_scheduler.LocalOpts], ~torchx.schedulers.local_scheduler.ImageProvider] = <class 'torchx.schedulers.local_scheduler.CWDImageProvider'>, **kwargs: ~typing.Any) LocalScheduler [source]¶
- class torchx.schedulers.local_scheduler.LogIterator(app_id: str, log_file: str, scheduler: Scheduler, should_tail: bool = True)[source]¶
- class torchx.schedulers.local_scheduler.PopenRequest(app_id: str, log_dir: str, role_params: Dict[str, List[ReplicaParam]], role_log_dirs: Dict[str, List[str]])[source]¶
Holds parameters to create a subprocess for each replica of each role of an application.