
Source code for torchx.components.utils

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

This contains TorchX utility components that are `ready-to-use` out of the box. These are
components that simply execute well known binaries (e.g. ``cp``)
and are meant to be used as tutorial materials or glue operations between
meaningful stages in a workflow.

import os
import shlex
from typing import Dict, List, Optional

import torchx
import torchx.specs as specs

[docs]def echo( msg: str = "hello world", image: str = torchx.IMAGE, num_replicas: int = 1 ) -> specs.AppDef: """ Echos a message to stdout (calls echo) Args: msg: message to echo image: image to use num_replicas: number of replicas to run """ return specs.AppDef( name="echo", roles=[ specs.Role( name="echo", image=image, entrypoint="echo", args=[msg], num_replicas=num_replicas, resource=specs.Resource(cpu=1, gpu=0, memMB=1024), ) ], )
[docs]def touch(file: str, image: str = torchx.IMAGE) -> specs.AppDef: """ Touches a file (calls touch) Args: file: file to create image: the image to use """ return specs.AppDef( name="touch", roles=[ specs.Role( name="touch", image=image, entrypoint="touch", args=[file], num_replicas=1, resource=specs.Resource(cpu=1, gpu=0, memMB=1024), ) ], )
[docs]def sh( *args: str, image: str = torchx.IMAGE, num_replicas: int = 1, cpu: int = 1, gpu: int = 0, memMB: int = 1024, h: Optional[str] = None, env: Optional[Dict[str, str]] = None, max_retries: int = 0, mounts: Optional[List[str]] = None, ) -> specs.AppDef: """ Runs the provided command via sh. Currently sh does not support environment variable substitution. Args: args: bash arguments image: image to use num_replicas: number of replicas to run cpu: number of cpus per replica gpu: number of gpus per replica memMB: cpu memory in MB per replica h: a registered named resource (if specified takes precedence over cpu, gpu, memMB) env: environment varibles to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3) max_retries: the number of scheduler retries allowed mounts: mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]). See scheduler documentation for more info. """ escaped_args = " ".join(shlex.quote(arg) for arg in args) if env is None: env = {} env.setdefault("LOGLEVEL", os.getenv("LOGLEVEL", "WARNING")) return specs.AppDef( name="sh", roles=[ specs.Role( name="sh", image=image, entrypoint="sh", args=["-c", escaped_args], num_replicas=num_replicas, resource=specs.resource(cpu=cpu, gpu=gpu, memMB=memMB, h=h), env=env, max_retries=max_retries, mounts=specs.parse_mounts(mounts) if mounts else [], ) ], )
[docs]def python( *args: str, m: Optional[str] = None, c: Optional[str] = None, script: Optional[str] = None, image: str = torchx.IMAGE, name: str = "torchx_utils_python", cpu: int = 1, gpu: int = 0, memMB: int = 1024, h: Optional[str] = None, num_replicas: int = 1, ) -> specs.AppDef: """ Runs ``python`` with the specified module, command or script on the specified image and host. Use ``--`` to separate component args and program args (e.g. ``torchx run utils.python --m foo.main -- --args to --main``) Note: (cpu, gpu, memMB) parameters are mutually exclusive with ``h`` (named resource) where ``h`` takes precedence if specified for setting resource requirements. See `registering named resources <>`_. Args: args: arguments passed to the program in sys.argv[1:] (ignored with `--c`) m: run library module as a script c: program passed as string (may error if scheduler has a length limit on args) script: .py script to run image: image to run on name: name of the job cpu: number of cpus per replica gpu: number of gpus per replica memMB: cpu memory in MB per replica h: a registered named resource (if specified takes precedence over cpu, gpu, memMB) num_replicas: number of copies to run (each on its own container) :return: """ if sum([m is not None, c is not None, script is not None]) != 1: raise ValueError( "exactly one of `-m`, `-c` and `--script` needs to be specified" ) if script: cmd = [script] elif m: cmd = ["-m", m] elif c: cmd = ["-c", c] else: raise ValueError("no program specified") return specs.AppDef( name=name, roles=[ specs.Role( name="python", image=image, entrypoint="python", num_replicas=num_replicas, resource=specs.resource(cpu=cpu, gpu=gpu, memMB=memMB, h=h), args=[*cmd, *args], env={"HYDRA_MAIN_MODULE": m} if m else {}, ) ], )
[docs]def binary( *args: str, entrypoint: str, name: str = "torchx_utils_binary", num_replicas: int = 1, cpu: int = 1, gpu: int = 0, memMB: int = 1024, h: Optional[str] = None, ) -> specs.AppDef: """ Test component Args: args: arguments passed to the program in sys.argv[1:] (ignored with `--c`) name: name of the job num_replicas: number of copies to run (each on its own container) cpu: number of cpus per replica gpu: number of gpus per replica memMB: cpu memory in MB per replica h: a registered named resource (if specified takes precedence over cpu, gpu, memMB) :return: """ return specs.AppDef( name=name, roles=[ specs.Role( name="binary", image="<NONE>", entrypoint=entrypoint, num_replicas=num_replicas, args=[*args], resource=specs.resource(cpu=cpu, gpu=gpu, memMB=memMB, h=h), ) ], )
[docs]def copy(src: str, dst: str, image: str = torchx.IMAGE) -> specs.AppDef: """ copy copies the file from src to dst. src and dst can be any valid fsspec url. This does not support recursive copies or directories. Args: src: the source fsspec file location dst: the destination fsspec file location image: the image that contains the copy app """ return specs.AppDef( name="torchx-utils-copy", roles=[ specs.Role( name="torchx-utils-copy", image=image, entrypoint="python", args=[ "-m", "torchx.apps.utils.copy_main", "--src", src, "--dst", dst, ], resource=specs.Resource(cpu=1, gpu=0, memMB=1024), ), ], )
[docs]def booth( x1: float, x2: float, trial_idx: int = 0, tracker_base: str = "/tmp/torchx-util-booth", image: str = torchx.IMAGE, ) -> specs.AppDef: """ Evaluates the booth function, ``f(x1, x2) = (x1 + 2*x2 - 7)^2 + (2*x1 + x2 - 5)^2``. Output result is accessible via ``FsspecResultTracker(outdir)[trial_idx]`` Args: x1: x1 x2: x2 trial_idx: ignore if not running hpo tracker_base: URI of the tracker's base output directory (e.g. s3://foo/bar) image: the image that contains the booth app """ return specs.AppDef( name="torchx-utils-booth", roles=[ specs.Role( name="torchx-utils-booth", image=image, entrypoint="python", args=[ "-m", "torchx.apps.utils.booth_main", "--x1", str(x1), "--x2", str(x2), "--trial_idx", str(trial_idx), "--tracker_base", tracker_base, ], resource=specs.Resource(cpu=1, gpu=0, memMB=1024), ) ], )


Access comprehensive developer documentation for PyTorch

View Docs


Get in-depth tutorials for beginners and advanced developers

View Tutorials


Find development resources and get your questions answered

View Resources