Shortcuts

Source code for torchx.schedulers.lsf_scheduler

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
This contains the TorchX LSF scheduler which can be used to run TorchX
components on a LSF cluster.

This scheduler is in prototype stage and may change without notice. If you run
into any issues or have feedback please submit an issue.

Prerequisites
==============

You'll need either an existing LSF cluster to run your jobs or for individuals
you can install LSF Community Edition.

See the LSF documentation for more details:
https://www.ibm.com/docs/en/cloud-private/3.2.x?topic=paks-spectrum-lsf-community-edition
"""
import os.path
import re
import subprocess
import tempfile
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional

import torchx
from torchx.schedulers.api import (
    AppDryRunInfo,
    DescribeAppResponse,
    filter_regex,
    ListAppResponse,
    Scheduler,
    split_lines_iterator,
    Stream,
)
from torchx.schedulers.ids import make_unique
from torchx.schedulers.local_scheduler import LogIterator
from torchx.specs import (
    AppDef,
    AppState,
    BindMount,
    DeviceMount,
    macros,
    NONE,
    ReplicaStatus,
    Role,
    RoleStatus,
    runopts,
    VolumeMount,
)
from torchx.util import shlex
from typing_extensions import TypedDict

JOB_STATE: Dict[str, AppState] = {
    "DONE": AppState.SUCCEEDED,
    "EXIT": AppState.FAILED,
    "PEND": AppState.PENDING,
    "RUN": AppState.RUNNING,
    "PSUSP": AppState.PENDING,
    "USUSP": AppState.PENDING,
    "SSUSP": AppState.PENDING,
}


def get_job_state(state_str: str, exit_code: str) -> AppState:
    state = AppState.UNKNOWN
    if state_str in JOB_STATE.keys():
        state = JOB_STATE[state_str]
    if state == AppState.FAILED and exit_code == "130":  # likely SIGINT
        state = AppState.CANCELLED
    return state


class LsfOpts(TypedDict, total=False):
    lsf_queue: Optional[str]
    jobdir: Optional[str]
    container_workdir: Optional[str]
    host_network: Optional[bool]
    shm_size: Optional[str]


def get_docker_command(job_name: str, role: Role, cfg: LsfOpts) -> str:
    cmds = ["docker", "run", f"--name={job_name}"]
    for mount in role.mounts:
        if isinstance(mount, BindMount):
            rw = "rw"
            if mount.read_only:
                rw = "ro"
            cmds += ["-v", f"{mount.src_path}:{mount.dst_path}:{rw}"]
        elif isinstance(mount, VolumeMount):
            ro = ""
            if mount.read_only:
                ro = ",ro"
            cmds += [
                "--mount",
                f"type=volume,src={mount.src},dst={mount.dst_path}{ro}",
            ]
        elif isinstance(mount, DeviceMount):
            cmds += [f"--device={mount.src_path}:{mount.dst_path}:{mount.permissions}"]
    container_workdir = cfg.get("container_workdir")
    if container_workdir:
        cmds += ["-w", container_workdir]
    host_network = cfg.get("host_network")
    if host_network:
        cmds += ["--net=host", "--ipc=host"]
    else:
        for name, port in role.port_map.items():
            cmds += ["-p", str(port)]
    shm_size = cfg.get("shm_size")
    if shm_size:
        cmds += [f"--shm-size={shm_size}"]
    for key, value in dict(role.env).items():
        cmds += ["-e", f"{key}={value}"]

    resource = role.resource
    if resource != NONE:
        if resource.cpu > 0:
            cmds += [f"--cpus={str(resource.cpu)}"]
        if resource.memMB > 0:
            cmds += [f"--memory={str(resource.memMB * 1024 * 1024)}"]
        if resource.gpu > 0:
            cmds += ["--gpus", "all"]
    cmds += ["--entrypoint", role.entrypoint, "--rm", role.image] + [
        arg.replace("$", "\\$") for arg in role.args
    ]
    return shlex.join(cmds)


def get_command(job_name: str, role: Role, cfg: LsfOpts) -> str:
    return get_docker_command(job_name, role, cfg)  # TODO: add get_singularity_command


def get_bsub(
    app_id: str,
    job_name: str,
    role: Role,
    cfg: LsfOpts,
    head_job_name: str,
    head_job_host: str,
) -> str:
    bsub_args = ["bsub", "-P", app_id, "-J", job_name]
    if head_job_name != "":
        bsub_args += [
            "-w",
            f'"started({head_job_name})"',
            "-R",
            f"\"select[hname!='{head_job_host}']\"",
        ]
    else:
        bsub_args += ["-m", head_job_host]
    jobdir = cfg.get("jobdir")
    if jobdir:
        bsub_args += [
            "-cwd",
            jobdir,
            "-outdir",
            jobdir,
            "-oo",
            f"{jobdir}/{job_name}.submit.out",
            "-eo",
            f"{jobdir}/{job_name}.submit.err",
        ]
    queue = cfg.get("lsf_queue")
    if queue:
        bsub_args += ["-q", queue]
    resource = role.resource

    if resource is not None:
        if resource.cpu > 0:
            bsub_args += ["-n", str(resource.cpu)]
        if resource.memMB > 0:
            bsub_args += ["-R", f'"span[hosts=1] rusage[mem={str(resource.memMB)}]"']
        else:
            bsub_args += ["-R", '"span[hosts=1]]"']
        if resource.gpu > 0:
            bsub_args += [
                "-gpu",
                f'"num={str(resource.gpu)}:mode=shared:j_exclusive=yes"',
            ]
    bsub_line = " ".join(bsub_args)
    if jobdir:
        return f"{bsub_line} << EOF\n{get_command(job_name, role, cfg)} > {jobdir}/{job_name}.out 2> {jobdir}/{job_name}.err\nEOF"
    else:
        return f"{bsub_line} << EOF\n{get_command(job_name, role, cfg)}\nEOF"


def cleanup_str(data: str) -> str:
    """
    Invokes ``lower`` on thes string and removes all
    characters that do not satisfy ``[a-z0-9]`` pattern.
    This method is mostly used to make sure kubernetes scheduler gets
    the job name that does not violate its validation.
    """
    if data.startswith("-"):
        data = data[1:]
    pattern = r"[a-z0-9\-_]"
    return "".join(re.findall(pattern, data.lower()))


def find_rank0_host_from_bhosts_stdout(msg: str, role: Role) -> str:
    resource = role.resource
    cpu = 1
    gpu = 0

    if resource != NONE:
        if resource.cpu > 0:
            cpu = resource.cpu
        if resource.gpu > 0:
            gpu = resource.gpu

    for line in msg.split("\n"):
        split = line.split(" ")
        if len(split) >= 3 and cpu <= int(split[1]) and gpu <= int(split[2]):
            return split[0]
    raise ValueError(
        f"cannot find a host with {cpu} CPUs, and {gpu} GPUs. Try again with enough available resource."
    )


def find_rank0_host(role: Role) -> str:
    p = subprocess.run(
        ["bhosts", "-noheader", "-o", "hname max ng"],
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        check=True,
    )
    return find_rank0_host_from_bhosts_stdout(p.stdout.decode("utf-8"), role)


def get_submit_script(
    app_id: str, cmd: List[str], app: AppDef, cfg: LsfOpts, rank0_host: str
) -> str:
    bsubs = []
    head_job_name = ""
    for role_idx, role in enumerate(app.roles):
        for replica_id in range(role.num_replicas):
            values = macros.Values(
                img_root="",
                app_id=app_id,
                replica_id=str(replica_id),
                rank0_env="TORCHX_RANK0_HOST",
            )
            replica_role = values.apply(role)
            name = cleanup_str(f"{role.name}-{replica_id}")
            replica_role.env["TORCHX_RANK0_HOST"] = rank0_host
            job_name = app_id + "-" + name
            bsubs.append(
                get_bsub(app_id, job_name, replica_role, cfg, head_job_name, rank0_host)
            )
            if role_idx == 0 and replica_id == 0:
                head_job_name = job_name
    script = f"""#!/bin/bash
#
# Generated by TorchX {torchx.__version__}
# Run with: {shlex.join(cmd)}
#
"""
    return script + "\n".join(bsubs) + "\n"


def bjobs_msg_to_describe(app_id: str, msg: str) -> Optional[DescribeAppResponse]:
    if msg == "":
        return None
    roles = {}
    roles_statuses = {}
    app_state = AppState.RUNNING
    success_count = 0
    total_count = 0
    for line in msg.split("\n"):
        split = line.split(" ")
        if len(split) < 2:
            continue
        proj = split[0]
        role, _, idx = split[1][len(proj) + 1 :].rpartition("-")
        idx = int(idx)
        if role not in roles:
            roles[role] = Role(name=role, num_replicas=0, image="")
            roles_statuses[role] = RoleStatus(role, [])
        roles[role].num_replicas += 1
        state = get_job_state(split[2], split[3])
        roles_statuses[role].replicas.append(
            ReplicaStatus(id=idx, role=role, state=state, hostname="")
        )
        if (
            state == AppState.FAILED
            or state == AppState.CANCELLED
            or state == AppState.PENDING
            or state == AppState.UNKNOWN
        ):
            app_state = state
        elif state == AppState.SUCCEEDED:
            success_count += 1
        total_count += 1
    if success_count == total_count:
        app_state = AppState.SUCCEEDED
    # set roles, roles_statuses, app_state
    return DescribeAppResponse(
        app_id=app_id,
        roles=list(roles.values()),
        roles_statuses=list(roles_statuses.values()),
        state=app_state,
        msg=msg,
    )


def bjobs_msg_to_log_file(
    app_id: str,
    role_name: str,
    k: int = 0,
    streams: Optional[Stream] = None,
    msg: str = "",
) -> str:
    if streams == Stream.COMBINED:
        raise ValueError(
            "LsfScheduler does not support COMBINED log stream."
            " Use `stdout` or `stderr`"
        )

    extension = "err" if streams == Stream.STDERR else "out"

    lines = msg.split("\n")
    jobs = {}
    log_file = ""
    for line in lines:
        if line != "":
            split = line.split(" ")
            if split[2] == "-":
                continue
            proj = split[0]
            role, _, idx = split[1][len(proj) + 1 :].rpartition("-")
            if app_id == proj and role == role_name and idx == str(k):
                log_file = split[2] + f"/{split[1]}.{extension}"
    if log_file == "":
        raise ValueError(
            f"cannot find log directory for {app_id}. Note: need to specify -cfg jobdir to use this functionality."
        )
    return log_file


def bjobs_msg_to_list(msg: str) -> List[ListAppResponse]:
    ret = []
    lines = msg.split("\n")
    apps = {}
    for line in lines:
        if line != "":
            split = line.split(" ")
            state = get_job_state(split[1], split[2])
            if split[0] not in apps.keys():
                apps[split[0]] = []
            apps[split[0]].append(state)
    for app_id, states in apps.items():
        success_count = 0
        app_state = AppState.RUNNING
        for state in states:
            if (
                state == AppState.FAILED
                or state == AppState.CANCELLED
                or state == AppState.PENDING
                or state == AppState.UNKNOWN
            ):
                app_state = state
                break
            elif state == AppState.SUCCEEDED:
                success_count += 1
        if success_count == len(states):
            app_state = AppState.SUCCEEDED
        ret.append(ListAppResponse(app_id=app_id, state=app_state))
    return ret


[docs]@dataclass class LsfBsub: jobdir: Optional[str] app_id: str app: AppDef cfg: LsfOpts cmd: List[str] def materialize(self, rank0_host: str = "RANK0_HOST") -> str: return get_submit_script(self.app_id, self.cmd, self.app, self.cfg, rank0_host) def __repr__(self) -> str: return f"""{' '.join(self.cmd + ['$BSUB_SCRIPT'])} #------------ # BSUB_SCRIPT #------------ {self.materialize()}"""
[docs]class LsfScheduler(Scheduler[LsfOpts]): """ **Example: hello_world** .. code-block:: bash $ torchx run -s lsf -cfg jobdir=/mnt/data/torchx utils.echo --image alpine:latest --msg hello_world --num_replicas 3 ... **Example: Gloo** .. code-block:: bash $ cp dist_app.py /mnt/data/dist/ $ torchx run -s lsf -cfg "jobdir=/mnt/data/torchx,host_network=True" dist.ddp -j 2x2 --gpu 2 \ --script /data/dist_app.py --mount "type=bind,src=/mnt/data/dist,dst=/data" ... **Config Options** .. runopts:: class: torchx.schedulers.lsf_scheduler.create_scheduler **Compatibility** .. compatibility:: type: scheduler features: cancel: true logs: true distributed: true describe: | LsfScheduler will return job. mounts: true workspaces: false elasticity: false **TOFIX** - On host_network=False, tasks cannot reoslve static names such as /etc/hosts (containers cannot reach it without host network) - Image downloads should be separated jobs """ def __init__(self, session_name: str) -> None: # NOTE: make sure any new init options are supported in create_scheduler(...) super().__init__("lsf", session_name) def _run_opts(self) -> runopts: opts = runopts() opts.add( "lsf_queue", type_=str, default=None, help="queue name to submit jobs", ) opts.add( "jobdir", type_=str, default=None, help="The directory to place the job code and outputs. The directory must not exist and will be created.", ) opts.add( "container_workdir", type_=str, default=None, help="working directory in container jobs", ) opts.add( "host_network", type_=bool, default=False, help="True if using the host network for jobs", ) opts.add( "shm_size", type_=str, default="64m", help="size of shared memory (/dev/shm) for jobs", ) return opts
[docs] def schedule(self, dryrun_info: AppDryRunInfo[LsfBsub]) -> str: req = dryrun_info.request with tempfile.TemporaryDirectory() as tempdir: path = os.path.join(req.jobdir or tempdir, f"{req.app_id}.sh") req.cmd += [path] with open(path, "w") as f: f.write(req.materialize(find_rank0_host(req.app.roles[0]))) subprocess.run(req.cmd, stdout=subprocess.PIPE, check=True) return req.app_id
def _validate(self, app: AppDef, scheduler: str) -> None: # Skip validation step for lsf pass def _cancel_existing(self, app_id: str) -> None: p = subprocess.run( ["bjobs", "-noheader", "-a", "-P", app_id, "-o", "id"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) msg = p.stdout.decode("utf-8") if msg != "": subprocess.run( ["bkill"] + msg.strip().split("\n"), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) def _submit_dryrun(self, app: AppDef, cfg: LsfOpts) -> AppDryRunInfo[LsfBsub]: jobdir = cfg.get("jobdir") assert jobdir is None or isinstance(jobdir, str), "jobdir must be str" app_id = cleanup_str(make_unique(app.name)) return AppDryRunInfo( LsfBsub(app_id=app_id, cmd=["/bin/bash"], jobdir=jobdir, app=app, cfg=cfg), repr, )
[docs] def describe(self, app_id: str) -> Optional[DescribeAppResponse]: p = subprocess.run( [ "bjobs", "-noheader", "-a", "-P", app_id, "-o", "proj name stat exit_code", ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) return bjobs_msg_to_describe(app_id=app_id, msg=p.stdout.decode("utf-8"))
[docs] def log_iter( self, app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None, ) -> Iterable[str]: p = subprocess.run( ["bjobs", "-noheader", "-a", "-P", app_id, "-o", "proj name output_dir"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) log_file = bjobs_msg_to_log_file( app_id=app_id, role_name=role_name, k=k, streams=streams, msg=p.stdout.decode("utf-8"), ) iterator = split_lines_iterator( LogIterator(app_id, log_file, self, should_tail=should_tail) ) if regex: iterator = filter_regex(regex, iterator) return iterator
[docs] def list(self) -> List[ListAppResponse]: p = subprocess.run( ["bjobs", "-noheader", "-a", "-o", "proj stat exit_code"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True, ) return bjobs_msg_to_list(p.stdout.decode("utf-8"))
[docs]def create_scheduler(session_name: str, **kwargs: Any) -> LsfScheduler: return LsfScheduler( session_name=session_name, )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources