Shortcuts

Source code for torchx.schedulers.kubernetes_mcad_scheduler

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""

This contains the TorchX Kubernetes_MCAD scheduler which can be used to run TorchX
components on a Kubernetes cluster via the Multi-Cluster-Application-Dispatcher (MCAD).

Prerequisites
==============

TorchX Kubernetes_MCAD scheduler depends on AppWrapper + MCAD.

Install MCAD: 
See deploying Multi-Cluster-Application-Dispatcher guide 
https://github.com/project-codeflare/multi-cluster-app-dispatcher/blob/main/doc/deploy/deployment.md

This implementation requires MCAD v1.34.1 or higher.

TorchX uses `torch.distributed.run <https://pytorch.org/docs/stable/elastic/run.html>`_ to run distributed training.

Learn more about running distributed trainers :py:mod:`torchx.components.dist`

"""

import json
import logging
import re

import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import (
    Any,
    cast,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Tuple,
    TYPE_CHECKING,
)

import torchx
import yaml
from torchx.schedulers.api import (
    AppDryRunInfo,
    DescribeAppResponse,
    filter_regex,
    ListAppResponse,
    Scheduler,
    split_lines,
    Stream,
)
from torchx.schedulers.ids import make_unique
from torchx.specs.api import (
    AppDef,
    AppState,
    BindMount,
    CfgVal,
    DeviceMount,
    macros,
    ReplicaState,
    ReplicaStatus,
    Resource,
    RetryPolicy,
    Role,
    RoleStatus,
    runopts,
    VolumeMount,
)

from torchx.workspace.docker_workspace import DockerWorkspaceMixin
from typing_extensions import TypedDict

if TYPE_CHECKING:
    from docker import DockerClient
    from kubernetes.client import ApiClient, CustomObjectsApi
    from kubernetes.client.models import (  # noqa: F401 imported but unused
        V1Container,
        V1ContainerPort,
        V1EnvVar,
        V1Pod,
        V1PodSpec,
        V1ResourceRequirements,
        V1Service,
    )
    from kubernetes.client.rest import ApiException

logger: logging.Logger = logging.getLogger(__name__)


# Kubernetes reserves a small amount of resources per host for the system. For
# TorchX we always assume the entire host is being requested so we adjust the
# requested numbers account for the node reserved resources.
#
# https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/
RESERVED_MILLICPU = 100
RESERVED_MEMMB = 1024

RETRY_POLICIES: Mapping[str, Iterable[Mapping[str, str]]] = {
    RetryPolicy.REPLICA: [],
    RetryPolicy.APPLICATION: [
        {"event": "PodEvicted", "action": "RestartJob"},
        {"event": "PodFailed", "action": "RestartJob"},
    ],
}

# The AppWrapper Status - holistic view of pods/services
JOB_STATE: Dict[str, AppState] = {
    # Pending is the AppWrapper condition waiting for scheduling by MCAD
    "Pending": AppState.PENDING,
    # Running is the AppWrapper condition in Running.
    "Running": AppState.RUNNING,
    # Deleted is the AppWrapper condition where the torchX job is cancelled
    # by the user, the AppWrapper is deleted by the user, or error handling
    "Deleted": AppState.CANCELLED,
    # Failed is the job finishes unexpectedly
    "Failed": AppState.FAILED,
}

TASK_STATE: Dict[str, ReplicaState] = {
    # Pending dispatch means the AppWrapped task is not yet scheduled by MCAD
    "Pending dispatch": ReplicaState.PENDING,
    # Pending means the task is scheduled by MCAD
    "pending": ReplicaState.PENDING,
    # Running means a task is running on the host.
    "running": ReplicaState.RUNNING,
    # Succeeded means that all containers in the pod have voluntarily
    # terminated with a container exit code of 0, and the system is not
    # going to restart any of these containers.
    "Succeeded": ReplicaState.SUCCEEDED,
    # Failed means that all containers in the pod have terminated, and at
    # least one container has terminated in a failure (exited with a
    # non-zero exit code or was stopped by the system).
    "failed": ReplicaState.FAILED,
    # Unknown means the status of task/pod is unknown to the scheduler.
    "Unknown": ReplicaState.UNKNOWN,
}

LABEL_VERSION = "torchx.pytorch.org/version"
LABEL_APP_NAME = "torchx.pytorch.org/app-name"
LABEL_ROLE_INDEX = "torchx.pytorch.org/role-index"
LABEL_ROLE_NAME = "torchx.pytorch.org/role-name"
LABEL_REPLICA_ID = "torchx.pytorch.org/replica-id"

LABEL_KUBE_APP_NAME = "app.kubernetes.io/name"
LABEL_ORGANIZATION = "app.kubernetes.io/managed-by"
LABEL_UNIQUE_NAME = "app.kubernetes.io/instance"

ANNOTATION_ISTIO_SIDECAR = "sidecar.istio.io/inject"

LABEL_INSTANCE_TYPE = "node.kubernetes.io/instance-type"


[docs]def sanitize_for_serialization(obj: object) -> object: from kubernetes import client api = client.ApiClient() return api.sanitize_for_serialization(obj)
[docs]def role_to_pod( name: str, unique_app_id: str, namespace: str, role: Role, service_account: Optional[str], image_secret: Optional[str], coscheduler_name: Optional[str], priority_class_name: Optional[str], network: Optional[str], ) -> "V1Pod": from kubernetes.client.models import ( # noqa: F811 redefinition of unused V1Container, V1ContainerPort, V1EmptyDirVolumeSource, V1EnvVar, V1HostPathVolumeSource, V1LocalObjectReference, V1ObjectMeta, V1PersistentVolumeClaimVolumeSource, V1Pod, V1PodSpec, V1ResourceRequirements, V1SecurityContext, V1Volume, V1VolumeMount, ) # limits puts an upper cap on the resources a pod may consume. # requests is how much the scheduler allocates. We assume that the jobs will # be allocation the whole machine so requests is slightly lower than the # requested resources to account for the Kubernetes node reserved resources. limits = {} requests = {} resource = role.resource if resource.cpu > 0: mcpu = int(resource.cpu * 1000) limits["cpu"] = f"{mcpu}m" request_mcpu = max(mcpu - RESERVED_MILLICPU, 0) requests["cpu"] = f"{request_mcpu}m" if resource.memMB > 0: limits["memory"] = f"{int(resource.memMB)}M" request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0) requests["memory"] = f"{request_memMB}M" if resource.gpu > 0: requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu) for device_name, device_limit in resource.devices.items(): limits[device_name] = str(device_limit) resources = V1ResourceRequirements( limits=limits, requests=requests, ) node_selector: Dict[str, str] = {} if LABEL_INSTANCE_TYPE in resource.capabilities: node_selector[LABEL_INSTANCE_TYPE] = resource.capabilities[LABEL_INSTANCE_TYPE] # To support PyTorch dataloaders we need to set /dev/shm to larger than the # 64M default so we mount an unlimited sized tmpfs directory on it. SHM_VOL = "dshm" volumes = [ V1Volume( name=SHM_VOL, empty_dir=V1EmptyDirVolumeSource( medium="Memory", ), ), ] volume_mounts = [ V1VolumeMount(name=SHM_VOL, mount_path="/dev/shm"), ] security_context = V1SecurityContext() for i, mount in enumerate(role.mounts): mount_name = f"mount-{i}" if isinstance(mount, BindMount): volumes.append( V1Volume( name=mount_name, host_path=V1HostPathVolumeSource( path=mount.src_path, ), ) ) volume_mounts.append( V1VolumeMount( name=mount_name, mount_path=mount.dst_path, read_only=mount.read_only, ) ) elif isinstance(mount, VolumeMount): volumes.append( V1Volume( name=mount_name, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource( claim_name=mount.src, ), ) ) volume_mounts.append( V1VolumeMount( name=mount_name, mount_path=mount.dst_path, read_only=mount.read_only, ) ) elif isinstance(mount, DeviceMount): volumes.append( V1Volume( name=mount_name, host_path=V1HostPathVolumeSource( path=mount.src_path, ), ) ) volume_mounts.append( V1VolumeMount( name=mount_name, mount_path=mount.dst_path, read_only=( "w" not in mount.permissions and "m" not in mount.permissions ), ) ) security_context.privileged = True else: raise TypeError(f"unknown mount type {mount}") torchx_env_var = [ V1EnvVar( name=name, value=value, ) for name, value in role.env.items() ] my_env_var = [ V1EnvVar( name=f"TORCHX_MCAD_{cleanup_str(role.name)}_0_HOSTS".upper().replace( "-", "" ), value=f"{unique_app_id}-0.{unique_app_id}", ) ] container = V1Container( command=[role.entrypoint] + role.args, image=role.image, name=name, env=torchx_env_var + my_env_var, resources=resources, ports=[ V1ContainerPort( name=name, container_port=port, ) for name, port in role.port_map.items() ], volume_mounts=volume_mounts, security_context=security_context, ) # Get correct formatting for image secret imagesecret = V1LocalObjectReference(name=image_secret) metadata = V1ObjectMeta( name=name, annotations={ # Disable the istio sidecar as it prevents the containers from # exiting once finished. ANNOTATION_ISTIO_SIDECAR: "false", }, labels={}, namespace=namespace, ) if network is not None: metadata.annotations.update({"k8s.v1.cni.cncf.io/networks": network}) return V1Pod( api_version="v1", kind="Pod", spec=V1PodSpec( containers=[container], hostname=name, subdomain=unique_app_id, image_pull_secrets=[imagesecret], restart_policy="Never", service_account_name=service_account, volumes=volumes, node_selector=node_selector, scheduler_name=coscheduler_name, priority_class_name=priority_class_name, ), metadata=metadata, )
def create_pod_group( app: AppDef, role: Role, role_idx: int, namespace: str, app_id: str ) -> "Dict[str, Any]": pod_group_name = app_id + "-pg" + str(role_idx) labels = object_labels(app, app_id) labels.update({"appwrapper.workload.codeflare.dev": app_id}) pod_group: Dict[str, Any] = { "apiVersion": "scheduling.sigs.k8s.io/v1alpha1", "kind": "PodGroup", "metadata": { "name": pod_group_name, "namespace": namespace, "labels": labels, }, "spec": { "minMember": role.num_replicas, }, } genericitem_pod_group: Dict[str, Any] = { "replicas": 1, "generictemplate": pod_group, } return genericitem_pod_group
[docs]def mcad_svc( app: AppDef, svc_name: str, namespace: str, service_port: str ) -> "V1Service": from kubernetes.client.models import ( # noqa: F401, F811 V1Container, V1ContainerPort, V1EmptyDirVolumeSource, V1EnvVar, V1HostPathVolumeSource, V1ObjectMeta, V1PersistentVolumeClaimVolumeSource, V1Pod, V1PodSpec, V1ResourceRequirements, V1SecurityContext, V1Service, V1ServicePort, V1ServiceSpec, V1ServiceStatus, V1Volume, V1VolumeMount, ) labels = object_labels(app, svc_name) return V1Service( api_version="v1", kind="Service", metadata=V1ObjectMeta( name=svc_name, namespace=namespace, labels=labels, ), spec=V1ServiceSpec( cluster_ip="None", publish_not_ready_addresses=True, ports=[ V1ServicePort( protocol="TCP", port=int(service_port), target_port=int(service_port), ) ], selector={LABEL_UNIQUE_NAME: svc_name}, session_affinity="None", type="ClusterIP", ), status=V1ServiceStatus( load_balancer={}, ), )
def cleanup_str(data: str) -> str: """ Invokes ``lower`` on thes string and removes all characters that do not satisfy ``[a-z0-9]`` pattern. This method is mostly used to make sure kubernetes scheduler gets the job name that does not violate its validation. """ if data.startswith("-"): data = data[1:] pattern = r"[a-z0-9\-]" return "".join(re.findall(pattern, data.lower())).lstrip("0123456789") def get_unique_truncated_appid(app: AppDef) -> str: """ Some Kubernetes objects need to have names that are 63 characters or less. When creating the unique app_id, this function calculates the max size to pass to make_unique. The PodGroup name includes 3 characters plus the role_id characters. The minimum number of characters for the unique identifier is 4. These amounts are taken into account. """ default_size = 14 uid_chars = 4 pg_chars = 3 + len(app.roles) size = 63 - (len(app.name) + uid_chars + pg_chars) unique_id_size = default_size if size > default_size else size if unique_id_size <= 3: msg = "Name size has too many characters for some Kubernetes objects. Truncating \ application name." warnings.warn(msg) end = 63 - uid_chars - pg_chars substring = app.name[0:end] app.name = substring unique_id_size = 3 unique_app_id = cleanup_str(make_unique(app.name, unique_id_size)) return unique_app_id
[docs]def get_port_for_service(app: AppDef) -> str: # Initialize port to default port = "29500" for role_idx, role in enumerate(app.roles): if role.port_map is None: continue for value in role.port_map.values(): port = str(value) if not (0 < int(port) <= 65535): msg = """Warning: port_map set to invalid port number. Value must be between 1-65535, with torchx default = 29500. Setting port to default = 29500""" port = "29500" warnings.warn(msg) return port
def enable_retry( job_spec: Dict[str, Any], appwrapper_retries: int, total_pods: int ) -> None: requeue_dict = { "timeInSeconds": 300, "maxTimeInSeconds": 0, "growthType": "exponential", "maxNumRequeuings": appwrapper_retries, } nested_specs = {"minAvailable": total_pods, "requeuing": requeue_dict} job_spec["schedulingSpec"] = nested_specs
[docs]def app_to_resource( app: AppDef, namespace: str, service_account: Optional[str], image_secret: Optional[str], coscheduler_name: Optional[str], priority_class_name: Optional[str], network: Optional[str], priority: Optional[int] = None, ) -> Dict[str, Any]: """ app_to_resource creates a AppWrapper/MCAD Kubernetes resource definition from the provided AppDef. The resource definition can be used to launch the app on Kubernetes. MCAD supports retries at the APPLICATION level. In the case of multiple TorchX Roles, the AppWrapper maximum number of retries count is set to the minimum of the max_retries of the roles. """ genericitems = [] unique_app_id = get_unique_truncated_appid(app) if coscheduler_name is not None: for role_idx, role in enumerate(app.roles): genericitem_pod_group = create_pod_group( app, role, role_idx, namespace, unique_app_id ) genericitems.append(genericitem_pod_group) for role_idx, role in enumerate(app.roles): for replica_id in range(role.num_replicas): values = macros.Values( img_root="", app_id=unique_app_id, replica_id=str(replica_id), rank0_env=f"TORCHX_MCAD_{cleanup_str(app.roles[0].name)}_0_HOSTS".upper().replace( "-", "" ), ) if role_idx == 0 and replica_id == 0: values.rank0_env = "TORCHX_RANK0_HOST" name = cleanup_str(f"{unique_app_id}-{replica_id}") replica_role = values.apply(role) if role_idx == 0 and replica_id == 0: replica_role.env["TORCHX_RANK0_HOST"] = "localhost" pod = role_to_pod( name, unique_app_id, namespace, replica_role, service_account, image_secret, coscheduler_name, priority_class_name, network, ) pod.metadata.labels.update( pod_labels( app=app, role_idx=role_idx, role=role, replica_id=replica_id, coscheduler_name=coscheduler_name, app_id=unique_app_id, ) ) genericitem: Dict[str, Any] = { "replicas": 1, "generictemplate": pod, } genericitems.append(genericitem) """ Create Service: The selector will have the key 'appwrapper.workload.codeflare.dev', and the value will be the appwrapper name """ service_port = get_port_for_service(app) svc_obj = mcad_svc( app=app, svc_name=unique_app_id, namespace=namespace, service_port=service_port ) genericitem_svc: Dict[str, Any] = { "replicas": 1, "generictemplate": svc_obj, } genericitems.append(genericitem_svc) job_spec: Dict[str, Any] = { "resources": { "GenericItems": genericitems, }, } if priority is not None: job_spec["priority"] = priority appwrapper_retries = min(role.max_retries for role in app.roles) if appwrapper_retries > 0: total_pods = sum(role.num_replicas for role in app.roles) enable_retry(job_spec, appwrapper_retries, total_pods) resource: Dict[str, object] = { "apiVersion": "workload.codeflare.dev/v1beta1", "kind": "AppWrapper", "metadata": {"name": unique_app_id, "namespace": namespace}, "spec": job_spec, } return resource
# Helper function for MCAD generic items information -> TorchX Role
[docs]def get_role_information(generic_items: Iterable[Dict[str, Any]]) -> Dict[str, Any]: # Store unique role information roles = {} # nested dictionary keys # meta data information GT_KEY = "generictemplate" METADATA_KEY = "metadata" LABEL_KEY = "labels" ROLE_KEY = "torchx.pytorch.org/role-name" # containers information SPEC_KEY = "spec" CONTAINER_KEY = "containers" IMAGE_KEY = "image" ARGS_KEY = "command" ENV_KEY = "env" RESOURCE_KEY = "resources" PORTS_KEY = "ports" MOUNTS_KEY = "volumeMounts" # resource keys CPU_KEY = "cpu" GPU_KEY = "gpu" REQUEST_KEY = "requests" MEM_KEY = "memory" for generic_item in generic_items: if GT_KEY not in generic_item.keys(): continue gt_result = generic_item[GT_KEY] if METADATA_KEY not in gt_result.keys(): continue # Note: options in meta data : annotations, labels, name, namespace metadata_result = gt_result[METADATA_KEY] if LABEL_KEY not in metadata_result.keys(): continue label_result = metadata_result[LABEL_KEY] if ROLE_KEY not in label_result.keys(): continue role_name: str = label_result[ROLE_KEY] # save role if role_name not in roles: roles[role_name] = Role(name=role_name, num_replicas=0, image="") roles[role_name].num_replicas += 1 # Only get specs for first instance of TorchX role if SPEC_KEY not in gt_result.keys(): continue # Note: options in spec data: containers, hostname, imagePullSecrets, nodeSelector # restartPolicy, subdomain, volumes spec_result = gt_result[SPEC_KEY] if CONTAINER_KEY not in spec_result.keys(): continue container_result = spec_result[CONTAINER_KEY] if IMAGE_KEY not in container_result[0].keys(): continue roles[role_name].image = container_result[0][IMAGE_KEY] if ARGS_KEY not in container_result[0].keys(): continue roles[role_name].args = container_result[0][ARGS_KEY] if ENV_KEY not in container_result[0].keys(): continue roles[role_name].env = container_result[0][ENV_KEY] if RESOURCE_KEY not in container_result[0].keys(): continue roles[role_name].resources = container_result[0][RESOURCE_KEY] resource_req = Resource(cpu=-1, gpu=-1, memMB=-1) if CPU_KEY not in container_result[0][RESOURCE_KEY][REQUEST_KEY]: continue resource_req.cpu = container_result[0][RESOURCE_KEY][REQUEST_KEY][CPU_KEY] # Substring matching to accomodate different gpu types gpu_key_values = dict( filter( lambda item: GPU_KEY in item[0], container_result[0][RESOURCE_KEY][REQUEST_KEY].items(), ) ) if len(gpu_key_values) != 0: for key, value in gpu_key_values.items(): resource_req.gpu = value if MEM_KEY not in container_result[0][RESOURCE_KEY][REQUEST_KEY]: continue resource_req.memMB = container_result[0][RESOURCE_KEY][REQUEST_KEY][MEM_KEY] roles[role_name].resource = resource_req if PORTS_KEY not in container_result[0].keys(): continue roles[role_name].port_map = container_result[0][PORTS_KEY] if MOUNTS_KEY not in container_result[0].keys(): continue roles[role_name].mounts = container_result[0][MOUNTS_KEY] else: roles[role_name].num_replicas += 1 return roles
[docs]def get_appwrapper_status(app: Dict[str, str]) -> AppState: if "status" in app.keys(): # pyre-fixme return JOB_STATE[app["status"]["state"]] else: # Handle case where appwrapper is created but pending dispatch return JOB_STATE["Pending"]
# Does not handle not ready to dispatch case
[docs]def get_tasks_status_description(status: Dict[str, str]) -> Dict[str, int]: results = {} # Keys related to tasks and status KEY_RUN = "running" KEY_PEND = "pending" KEY_FAIL = "failed" KEY_SUCCESS = "Succeeded" if KEY_RUN in status.keys(): results[KEY_RUN] = status[KEY_RUN] if KEY_PEND in status.keys(): results[KEY_PEND] = status[KEY_PEND] if KEY_FAIL in status.keys(): results[KEY_FAIL] = status[KEY_FAIL] if KEY_SUCCESS in status.keys(): results[KEY_SUCCESS] = status[KEY_SUCCESS] return results
[docs]@dataclass class KubernetesMCADJob: images_to_push: Dict[str, Tuple[str, str]] resource: Dict[str, object] def __str__(self) -> str: return yaml.dump(sanitize_for_serialization(self.resource)) def __repr__(self) -> str: return str(self)
class KubernetesMCADOpts(TypedDict, total=False): namespace: Optional[str] image_repo: Optional[str] service_account: Optional[str] priority: Optional[int] priority_class_name: Optional[str] image_secret: Optional[str] coscheduler_name: Optional[str] network: Optional[str]
[docs]class KubernetesMCADScheduler(DockerWorkspaceMixin, Scheduler[KubernetesMCADOpts]): """ KubernetesMCADScheduler is a TorchX scheduling interface to Kubernetes. Important: AppWrapper/MCAD is required to be installed on the Kubernetes cluster. TorchX requires gang scheduling for multi-replica/multi-role execution. Note that AppWrapper/MCAD supports gang scheduling among any app-wrapped jobs on Kubernetes. However, for true gang scheduling AppWrapper/MCAD needs to be used with an additional Kubernetes co-scheduler. For installation instructions see: https://github.com/project-codeflare/multi-cluster-app-dispatcher/blob/main/doc/deploy/deployment.md This has been confirmed to work with MCAD main branch v1.34.1 or higher and OpenShift Kubernetes Client Version: 4.10.13 Server Version: 4.9.18 Kubernetes Version: v1.22.3+e790d7f .. code-block:: bash $ torchx run --scheduler kubernetes_mcad --scheduler_args namespace=default,image_repo=<your_image_repo> utils.echo --image alpine:latest --msg hello ... The TorchX-MCAD scheduler can be used with a secondary scheduler on Kubernetes. To enable this, the user must provide the name of the coscheduler. With this feature, a PodGroup is defined for each TorchX role and the coscheduler handles secondary scheduling on the Kubernetes cluster. For additional resources, see: 1. PodGroups and Coscheduling: https://github.com/kubernetes-sigs/scheduler-plugins/tree/release-1.24/pkg/coscheduling 2. Installing Secondary schedulers: https://github.com/kubernetes-sigs/scheduler-plugins/blob/release-1.24/doc/install.md 3. PodGroup CRD: https://github.com/kubernetes-sigs/scheduler-plugins/blob/release-1.24/config/crd/bases/scheduling.sigs.k8s.io_podgroups.yaml The MCAD scheduler supports priorities at the AppWrapper level and optionally at the pod level on clusters with PriorityClass definitions. At the AppWrapper level, higher integer values means higher priorities. Kubernetes clusters may have additional priorityClass definitions that can be applied at the pod level. While these different levels of priorities can be set independently, it is recommended to check with your Kubernetes cluster admin to see if additional guidance is in place. For more on Kubernetes PriorityClass, see: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/ . In order to use the network option, the Kubernetes cluster must have multus installed. For multus installation instructions and how to set up a network custom network attachment definition, see: https://github.com/k8snetworkplumbingwg/multus-cni/blob/master/docs/how-to-use.md **Config Options** .. runopts:: class: torchx.schedulers.kubernetes_mcad_scheduler.KubernetesMCADScheduler **Mounts** Mounting external filesystems/volumes is via the HostPath and PersistentVolumeClaim support. * hostPath volumes: ``type=bind,src=<host path>,dst=<container path>[,readonly]`` * PersistentVolumeClaim: ``type=volume,src=<claim>,dst=<container path>[,readonly]`` * host devices: ``type=device,src=/dev/foo[,dst=<container path>][,perm=rwm]`` If you specify a host device the job will run in privileged mode since Kubernetes doesn't expose a way to pass `--device` to the underlying container runtime. Users should prefer to use device plugins. See :py:func:`torchx.specs.parse_mounts` for more info. External docs: https://kubernetes.io/docs/concepts/storage/persistent-volumes/ **Resources / Allocation** To select a specific machine type you can add a capability to your resources with ``node.kubernetes.io/instance-type`` which will constrain the launched jobs to nodes of that instance type. >>> from torchx import specs >>> specs.Resource( ... cpu=4, ... memMB=16000, ... gpu=2, ... capabilities={ ... "node.kubernetes.io/instance-type": "<cloud instance type>", ... }, ... ) Resource(...) Kubernetes may reserve some memory for the host. TorchX assumes you're scheduling on whole hosts and thus will automatically reduce the resource request by a small amount to account for the node reserved CPU and memory. If you run into scheduling issues you may need to reduce the requested CPU and memory from the host values. **Compatibility** .. compatibility:: type: scheduler features: cancel: true logs: true distributed: true describe: true workspaces: true mounts: true elasticity: false """ def __init__( self, session_name: str, client: Optional["ApiClient"] = None, docker_client: Optional["DockerClient"] = None, ) -> None: # NOTE: make sure any new init options are supported in create_scheduler(...) super().__init__("kubernetes_mcad", session_name, docker_client=docker_client) self._client = client def _api_client(self) -> "ApiClient": from kubernetes import client, config c = self._client if c is None: configuration = client.Configuration() try: config.load_kube_config(client_configuration=configuration) except config.ConfigException as e: warnings.warn(f"failed to load kube config: {e}") c = self._client = client.ApiClient(configuration) return c def _custom_objects_api(self) -> "CustomObjectsApi": from kubernetes import client api_client = client.CustomObjectsApi(self._api_client()) return api_client def _get_job_name_from_exception(self, e: "ApiException") -> Optional[str]: try: return json.loads(e.body)["details"]["name"] except Exception as e: logger.exception("Unable to retrieve job name, got exception", e) return None def _get_active_context(self) -> Dict[str, Any]: from kubernetes import config contexts, active_context = config.list_kube_config_contexts() return active_context
[docs] def schedule(self, dryrun_info: AppDryRunInfo[KubernetesMCADJob]) -> str: from kubernetes.client.rest import ApiException cfg = dryrun_info._cfg assert cfg is not None, f"{dryrun_info} missing cfg" namespace = cfg.get("namespace") or "default" images_to_push = dryrun_info.request.images_to_push self.push_images(images_to_push) resource = dryrun_info.request.resource try: resp = self._custom_objects_api().create_namespaced_custom_object( group="workload.codeflare.dev", version="v1beta1", namespace=namespace, plural="appwrappers", body=resource, ) except ApiException as e: if e.status == 409 and e.reason == "Conflict": job_name = self._get_job_name_from_exception(e) raise ValueError( f"Job `{job_name}` already exists. This seems like a transient exception, try resubmitting job" ) from e else: raise return f'{namespace}:{resp["metadata"]["name"]}'
def _submit_dryrun( self, app: AppDef, cfg: KubernetesMCADOpts ) -> AppDryRunInfo[KubernetesMCADJob]: # map any local images to the remote image # images_to_push = self._update_app_images(app, cfg.get("image_repo")) images_to_push = self.dryrun_push_images(app, cast(Mapping[str, CfgVal], cfg)) service_account = cfg.get("service_account") assert service_account is None or isinstance( service_account, str ), "service_account must be a str" priority = cfg.get("priority") assert priority is None or isinstance(priority, int), "priority must be a int" image_secret = cfg.get("image_secret") assert image_secret is None or isinstance( image_secret, str ), "image_secret must be a str" if image_secret is not None and service_account is not None: msg = """Service Account and Image Secret names are both provided. Depending on the Service Account configuration, an ImagePullSecret may be defined in your Service Account. If this is the case, check service account and image secret configurations to understand the expected behavior for patched image push access.""" warnings.warn(msg) namespace = cfg.get("namespace") assert isinstance(namespace, str), "namespace must be a str" coscheduler_name = cfg.get("coscheduler_name") assert coscheduler_name is None or isinstance( coscheduler_name, str ), "coscheduler_name must be a string" priority_class_name = cfg.get("priority_class_name") assert priority_class_name is None or isinstance( priority_class_name, str ), "priority_class_name must be a string" network = cfg.get("network") assert network is None or isinstance(network, str), "network must be a string" resource = app_to_resource( app=app, namespace=namespace, service_account=service_account, image_secret=image_secret, coscheduler_name=coscheduler_name, priority_class_name=priority_class_name, network=network, priority=priority, ) req = KubernetesMCADJob( resource=resource, images_to_push=images_to_push, ) info = AppDryRunInfo(req, repr) info._app = app # pyre-fixme info._cfg = cfg return info def _validate(self, app: AppDef, scheduler: str) -> None: # Skip validation step pass def _cancel_existing(self, app_id: str) -> None: namespace, name = app_id.split(":") self._custom_objects_api().delete_namespaced_custom_object( group="workload.codeflare.dev", version="v1beta1", namespace=namespace, plural="appwrappers", name=name, )
[docs] def run_opts(self) -> runopts: opts = runopts() opts.add( "namespace", type_=str, help="Kubernetes namespace to schedule job in", default="default", ) opts.add( "image_repo", type_=str, help="The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container", ) opts.add( "service_account", type_=str, help="The service account name to set on the pod specs", ) opts.add( "priority", type_=int, help="The priority level to set on the job specs. Higher integer value means higher priority", ) opts.add( "priority_class_name", type_=str, help="Pod specific priority level. Check with your Kubernetes cluster admin if Priority classes are defined on your system", ) opts.add( "image_secret", type_=str, help="The name of the Kubernetes/OpenShift secret set up for private images", ) opts.add( "coscheduler_name", type_=str, help="Option to run TorchX-MCAD with a co-scheduler. User must provide the co-scheduler name.", ) opts.add( "network", type_=str, help="Name of additional pod-to-pod network beyond default Kubernetes network", ) return opts
[docs] def describe(self, app_id: str) -> Optional[DescribeAppResponse]: namespace, name = app_id.split(":") from kubernetes.client.rest import ApiException roles = {} roles_statuses = {} # Production section api_instance = self._custom_objects_api group = "workload.codeflare.dev" version = "v1beta1" plural = "appwrappers" try: api_resp = api_instance().get_namespaced_custom_object( group, version, namespace, plural, name ) except ApiException as e: api_resp = {} if e.status == 404 and e.reason == "Not Found": raise ValueError( "Kubernetes client not found. Check access to Kubernetes cluster." ) from e elif e.status == 401 and e.reason == "Unauthorized": raise ValueError("Unauthorized Kubernetes access error.") from e else: raise task_status = [] if "status" in api_resp.keys(): status = api_resp["status"] tasks_results = get_tasks_status_description(status) # Handle case where waiting for dispatch if not tasks_results: tasks_results["Pending dispatch"] = ( len(api_resp["spec"]["resources"]["GenericItems"]) - 1 ) # Convert MCAD status to TorchX replica set status format # Warning: Status is not necessarily the match for a particular Replica ID for key, value in tasks_results.items(): for id in range(0, value): task_status.append(key) state = status["state"] app_state = JOB_STATE[state] # Roles spec = api_resp["spec"] resources = spec["resources"] generic_items = resources["GenericItems"] # Note MCAD service is not considered a TorchX role roles = get_role_information(generic_items) task_count = 0 for role in roles: msg = "Warning - MCAD does not report individual replica statuses, but overall task status. Replica id may not match status" warnings.warn(msg) roles_statuses[role] = RoleStatus(role, []) for idx in range(0, roles[role].num_replicas): state = TASK_STATE[task_status[task_count]] roles_statuses[role].replicas.append( ReplicaStatus(id=int(idx), role=role, state=state, hostname="") ) task_count += 1 else: app_state = AppState.UNKNOWN return DescribeAppResponse( app_id=app_id, roles=list(roles.values()), roles_statuses=list(roles_statuses.values()), state=app_state, )
[docs] def log_iter( self, app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None, ) -> Iterable[str]: assert until is None, "kubernetes API doesn't support until" if streams not in (None, Stream.COMBINED): raise ValueError( "KubernetesMCADScheduler only supports COMBINED log stream" ) from kubernetes import client, watch namespace, name = app_id.split(":") pod_name = cleanup_str(f"{name}-{k}") args: Dict[str, object] = { "name": pod_name, "namespace": namespace, "timestamps": True, } if since is not None: args["since_seconds"] = (datetime.now() - since).total_seconds() core_api = client.CoreV1Api(self._api_client()) if should_tail: w = watch.Watch() iterator = w.stream(core_api.read_namespaced_pod_log, **args) else: resp = core_api.read_namespaced_pod_log(**args) iterator = split_lines(resp) if regex: return filter_regex(regex, iterator) else: return iterator
[docs] def list(self) -> List[ListAppResponse]: active_context = self._get_active_context() namespace = active_context["context"]["namespace"] resp = self._custom_objects_api().list_namespaced_custom_object( group="workload.codeflare.dev", version="v1beta1", namespace=namespace, plural="appwrappers", timeout_seconds=30, ) return [ ListAppResponse( app_id=f"{namespace}:{item['metadata']['name']}", state=get_appwrapper_status(item), ) for item in resp["items"] ]
[docs]def create_scheduler( session_name: str, client: Optional["ApiClient"] = None, docker_client: Optional["DockerClient"] = None, **kwargs: Any, ) -> KubernetesMCADScheduler: return KubernetesMCADScheduler( session_name=session_name, client=client, docker_client=docker_client, )
def object_labels( app: AppDef, app_id: str, ) -> Dict[str, str]: return { LABEL_KUBE_APP_NAME: app.name, LABEL_ORGANIZATION: "torchx.pytorch.org", LABEL_UNIQUE_NAME: app_id, }
[docs]def pod_labels( app: AppDef, role_idx: int, role: Role, replica_id: int, coscheduler_name: Optional[str], app_id: str, ) -> Dict[str, str]: labels = object_labels(app, app_id) pod_labels = { LABEL_VERSION: torchx.__version__, LABEL_APP_NAME: app.name, LABEL_ROLE_INDEX: str(role_idx), LABEL_ROLE_NAME: role.name, LABEL_REPLICA_ID: str(replica_id), } if coscheduler_name is not None: pod_group = app_id + "-pg" + str(role_idx) pod_labels.update({"pod-group.scheduling.sigs.k8s.io": pod_group}) labels.update(pod_labels) return labels

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources