Shortcuts

Source code for torchx.pipelines.kfp.adapter

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import json
import os
import os.path
import shlex
from typing import Mapping, Optional, Tuple

import yaml
from kfp import components, dsl

# @manual=fbsource//third-party/pypi/kfp:kfp
from kfp.components.structures import ComponentSpec, OutputSpec
from kubernetes.client.models import (
    V1ContainerPort,
    V1EmptyDirVolumeSource,
    V1Volume,
    V1VolumeMount,
)
from torchx.schedulers.kubernetes_scheduler import app_to_resource, pod_labels
from torchx.specs import api
from typing_extensions import Protocol

from .version import __version__ as __version__  # noqa F401


[docs]def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]: """ component_spec_from_app takes in a TorchX component and generates the yaml spec for it. Notably this doesn't apply resources or port_maps since those must be applied at runtime which is why it returns the role spec as well. >>> from torchx import specs >>> from torchx.pipelines.kfp.adapter import component_spec_from_app >>> app_def = specs.AppDef( ... name="trainer", ... roles=[specs.Role("trainer", image="foo:latest")], ... ) >>> component_spec_from_app(app_def) ('description: ...', Role(...)) """ assert len(app.roles) == 1, f"KFP adapter only support one role, got {app.roles}" role = app.roles[0] assert ( role.num_replicas == 1 ), f"KFP adapter only supports one replica, got {app.num_replicas}" command = [role.entrypoint, *role.args] spec = { "name": f"{app.name}-{role.name}", "description": f"KFP wrapper for TorchX component {app.name}, role {role.name}", "implementation": { "container": { "image": role.image, "command": command, "env": role.env, } }, "outputs": [], } return yaml.dump(spec), role
[docs]class ContainerFactory(Protocol): """ ContainerFactory is a protocol that represents a function that when called produces a kfp.dsl.ContainerOp. """ def __call__(self, *args: object, **kwargs: object) -> dsl.ContainerOp: ...
class KFPContainerFactory(ContainerFactory, Protocol): """ KFPContainerFactory is a ContainerFactory that also has some KFP metadata attached to it. """ component_spec: ComponentSpec METADATA_FILE = "/tmp/outputs/mlpipeline-ui-metadata/data.json"
[docs]def component_from_app( app: api.AppDef, ui_metadata: Optional[Mapping[str, object]] = None ) -> ContainerFactory: """ component_from_app takes in a TorchX component/AppDef and returns a KFP ContainerOp factory. This is equivalent to the `kfp.components.load_component_from_* <https://kubeflow-pipelines.readthedocs.io/en/1.8.22/source/kfp.components.html#kfp.components.load_component_from_text>`_ methods. Args: app: The AppDef to generate a KFP container factory for. ui_metadata: KFP UI Metadata to output so you can have model results show up in the UI. See https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/ for more info on the format. >>> from torchx import specs >>> from torchx.pipelines.kfp.adapter import component_from_app >>> app_def = specs.AppDef( ... name="trainer", ... roles=[specs.Role("trainer", image="foo:latest")], ... ) >>> component_from_app(app_def) <function component_from_app...> """ role_spec: api.Role spec, role_spec = component_spec_from_app(app) resources: api.Resource = role_spec.resource assert ( len(resources.capabilities) == 0 ), f"KFP doesn't support capabilities, got {resources.capabilities}" component_factory: KFPContainerFactory = components.load_component_from_text(spec) if ui_metadata is not None: # pyre-fixme[16]: `ComponentSpec` has no attribute `outputs` component_factory.component_spec.outputs.append( OutputSpec( name="mlpipeline-ui-metadata", type="MLPipeline UI Metadata", description="ui metadata", ) ) def factory_wrapper(*args: object, **kwargs: object) -> dsl.ContainerOp: c = component_factory(*args, **kwargs) container = c.container if ui_metadata is not None: # We generate the UI metadata from the sidecar so we need to make # both the container and the sidecar share the same tmp directory so # the outputs appear in the original container. c.add_volume(V1Volume(name="tmp", empty_dir=V1EmptyDirVolumeSource())) container.add_volume_mount( V1VolumeMount( name="tmp", mount_path="/tmp/", ) ) c.output_artifact_paths["mlpipeline-ui-metadata"] = METADATA_FILE c.add_sidecar(_ui_metadata_sidecar(ui_metadata)) cpu = resources.cpu if cpu >= 0: cpu_str = f"{int(cpu*1000)}m" container.set_cpu_request(cpu_str) container.set_cpu_limit(cpu_str) mem = resources.memMB if mem >= 0: mem_str = f"{int(mem)}M" container.set_memory_request(mem_str) container.set_memory_limit(mem_str) gpu = resources.gpu if gpu > 0: container.set_gpu_limit(str(gpu)) for name, port in role_spec.port_map.items(): container.add_port( V1ContainerPort( name=name, container_port=port, ), ) c.pod_labels.update(pod_labels(app, 0, role_spec, 0, app.name)) return c return factory_wrapper
def _ui_metadata_sidecar( ui_metadata: Mapping[str, object], image: str = "alpine" ) -> dsl.Sidecar: shell_encoded = shlex.quote(json.dumps(ui_metadata)) dirname = os.path.dirname(METADATA_FILE) return dsl.Sidecar( name="ui-metadata-sidecar", image=image, command=[ "sh", "-c", f"mkdir -p {dirname}; echo {shell_encoded} > {METADATA_FILE}", ], mirror_volume_mounts=True, )
[docs]def container_from_app( app: api.AppDef, *args: object, ui_metadata: Optional[Mapping[str, object]] = None, **kwargs: object, ) -> dsl.ContainerOp: """ container_from_app transforms the app into a KFP component and returns a corresponding ContainerOp instance. See component_from_app for description on the arguments. Any unspecified arguments are passed through to the KFP container factory method. >>> import kfp >>> from torchx import specs >>> from torchx.pipelines.kfp.adapter import container_from_app >>> app_def = specs.AppDef( ... name="trainer", ... roles=[specs.Role("trainer", image="foo:latest")], ... ) >>> def pipeline(): ... trainer = container_from_app(app_def) ... print(trainer) >>> kfp.compiler.Compiler().compile( ... pipeline_func=pipeline, ... package_path="/tmp/pipeline.yaml", ... ) {'ContainerOp': {... 'name': 'trainer-trainer', ...}} """ factory = component_from_app(app, ui_metadata) return factory(*args, **kwargs)
[docs]def resource_from_app( app: api.AppDef, queue: str, service_account: Optional[str] = None, ) -> dsl.ResourceOp: """ resource_from_app generates a KFP ResourceOp from the provided app that uses the Volcano job scheduler on Kubernetes to run distributed apps. See https://volcano.sh/en/docs/ for more info on Volcano and how to install. Args: app: The torchx AppDef to adapt. queue: the Volcano queue to schedule the operator in. >>> import kfp >>> from torchx import specs >>> from torchx.pipelines.kfp.adapter import resource_from_app >>> app_def = specs.AppDef( ... name="trainer", ... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)], ... ) >>> def pipeline(): ... trainer = resource_from_app(app_def, queue="test") ... print(trainer) >>> kfp.compiler.Compiler().compile( ... pipeline_func=pipeline, ... package_path="/tmp/pipeline.yaml", ... ) {'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}} """ return dsl.ResourceOp( name=app.name, action="create", success_condition="status.state.phase = Completed", failure_condition="status.state.phase = Failed", k8s_resource=app_to_resource(app, queue, service_account=service_account), )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources