Source code for torchx.pipelines.kfp.adapter
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
import json
import os
import os.path
import shlex
from typing import Mapping, Optional, Tuple
import yaml
from kfp import components, dsl
# @manual=fbsource//third-party/pypi/kfp:kfp
from kfp.components.structures import ComponentSpec, OutputSpec
from kubernetes.client.models import (
V1ContainerPort,
V1EmptyDirVolumeSource,
V1Volume,
V1VolumeMount,
)
from torchx.schedulers.kubernetes_scheduler import app_to_resource, pod_labels
from torchx.specs import api
from typing_extensions import Protocol
from .version import __version__ as __version__ # noqa F401
[docs]def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]:
"""
component_spec_from_app takes in a TorchX component and generates the yaml
spec for it. Notably this doesn't apply resources or port_maps since those
must be applied at runtime which is why it returns the role spec as well.
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import component_spec_from_app
>>> app_def = specs.AppDef(
... name="trainer",
... roles=[specs.Role("trainer", image="foo:latest")],
... )
>>> component_spec_from_app(app_def)
('description: ...', Role(...))
"""
assert len(app.roles) == 1, f"KFP adapter only support one role, got {app.roles}"
role = app.roles[0]
assert (
role.num_replicas == 1
), f"KFP adapter only supports one replica, got {app.num_replicas}"
command = [role.entrypoint, *role.args]
spec = {
"name": f"{app.name}-{role.name}",
"description": f"KFP wrapper for TorchX component {app.name}, role {role.name}",
"implementation": {
"container": {
"image": role.image,
"command": command,
"env": role.env,
}
},
"outputs": [],
}
return yaml.dump(spec), role
[docs]class ContainerFactory(Protocol):
"""
ContainerFactory is a protocol that represents a function that when called produces a
kfp.dsl.ContainerOp.
"""
def __call__(self, *args: object, **kwargs: object) -> dsl.ContainerOp: ...
class KFPContainerFactory(ContainerFactory, Protocol):
"""
KFPContainerFactory is a ContainerFactory that also has some KFP metadata
attached to it.
"""
component_spec: ComponentSpec
METADATA_FILE = "/tmp/outputs/mlpipeline-ui-metadata/data.json"
[docs]def component_from_app(
app: api.AppDef, ui_metadata: Optional[Mapping[str, object]] = None
) -> ContainerFactory:
"""
component_from_app takes in a TorchX component/AppDef and returns a KFP
ContainerOp factory. This is equivalent to the
`kfp.components.load_component_from_*
<https://kubeflow-pipelines.readthedocs.io/en/1.8.22/source/kfp.components.html#kfp.components.load_component_from_text>`_
methods.
Args:
app: The AppDef to generate a KFP container factory for.
ui_metadata: KFP UI Metadata to output so you can have model results show
up in the UI. See
https://www.kubeflow.org/docs/components/pipelines/legacy-v1/sdk/output-viewer/
for more info on the format.
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import component_from_app
>>> app_def = specs.AppDef(
... name="trainer",
... roles=[specs.Role("trainer", image="foo:latest")],
... )
>>> component_from_app(app_def)
<function component_from_app...>
"""
role_spec: api.Role
spec, role_spec = component_spec_from_app(app)
resources: api.Resource = role_spec.resource
assert (
len(resources.capabilities) == 0
), f"KFP doesn't support capabilities, got {resources.capabilities}"
component_factory: KFPContainerFactory = components.load_component_from_text(spec)
if ui_metadata is not None:
# pyre-fixme[16]: `ComponentSpec` has no attribute `outputs`
component_factory.component_spec.outputs.append(
OutputSpec(
name="mlpipeline-ui-metadata",
type="MLPipeline UI Metadata",
description="ui metadata",
)
)
def factory_wrapper(*args: object, **kwargs: object) -> dsl.ContainerOp:
c = component_factory(*args, **kwargs)
container = c.container
if ui_metadata is not None:
# We generate the UI metadata from the sidecar so we need to make
# both the container and the sidecar share the same tmp directory so
# the outputs appear in the original container.
c.add_volume(V1Volume(name="tmp", empty_dir=V1EmptyDirVolumeSource()))
container.add_volume_mount(
V1VolumeMount(
name="tmp",
mount_path="/tmp/",
)
)
c.output_artifact_paths["mlpipeline-ui-metadata"] = METADATA_FILE
c.add_sidecar(_ui_metadata_sidecar(ui_metadata))
cpu = resources.cpu
if cpu >= 0:
cpu_str = f"{int(cpu*1000)}m"
container.set_cpu_request(cpu_str)
container.set_cpu_limit(cpu_str)
mem = resources.memMB
if mem >= 0:
mem_str = f"{int(mem)}M"
container.set_memory_request(mem_str)
container.set_memory_limit(mem_str)
gpu = resources.gpu
if gpu > 0:
container.set_gpu_limit(str(gpu))
for name, port in role_spec.port_map.items():
container.add_port(
V1ContainerPort(
name=name,
container_port=port,
),
)
c.pod_labels.update(pod_labels(app, 0, role_spec, 0, app.name))
return c
return factory_wrapper
def _ui_metadata_sidecar(
ui_metadata: Mapping[str, object], image: str = "alpine"
) -> dsl.Sidecar:
shell_encoded = shlex.quote(json.dumps(ui_metadata))
dirname = os.path.dirname(METADATA_FILE)
return dsl.Sidecar(
name="ui-metadata-sidecar",
image=image,
command=[
"sh",
"-c",
f"mkdir -p {dirname}; echo {shell_encoded} > {METADATA_FILE}",
],
mirror_volume_mounts=True,
)
[docs]def container_from_app(
app: api.AppDef,
*args: object,
ui_metadata: Optional[Mapping[str, object]] = None,
**kwargs: object,
) -> dsl.ContainerOp:
"""
container_from_app transforms the app into a KFP component and returns a
corresponding ContainerOp instance.
See component_from_app for description on the arguments. Any unspecified
arguments are passed through to the KFP container factory method.
>>> import kfp
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import container_from_app
>>> app_def = specs.AppDef(
... name="trainer",
... roles=[specs.Role("trainer", image="foo:latest")],
... )
>>> def pipeline():
... trainer = container_from_app(app_def)
... print(trainer)
>>> kfp.compiler.Compiler().compile(
... pipeline_func=pipeline,
... package_path="/tmp/pipeline.yaml",
... )
{'ContainerOp': {... 'name': 'trainer-trainer', ...}}
"""
factory = component_from_app(app, ui_metadata)
return factory(*args, **kwargs)
[docs]def resource_from_app(
app: api.AppDef,
queue: str,
service_account: Optional[str] = None,
) -> dsl.ResourceOp:
"""
resource_from_app generates a KFP ResourceOp from the provided app that uses
the Volcano job scheduler on Kubernetes to run distributed apps. See
https://volcano.sh/en/docs/ for more info on Volcano and how to install.
Args:
app: The torchx AppDef to adapt.
queue: the Volcano queue to schedule the operator in.
>>> import kfp
>>> from torchx import specs
>>> from torchx.pipelines.kfp.adapter import resource_from_app
>>> app_def = specs.AppDef(
... name="trainer",
... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)],
... )
>>> def pipeline():
... trainer = resource_from_app(app_def, queue="test")
... print(trainer)
>>> kfp.compiler.Compiler().compile(
... pipeline_func=pipeline,
... package_path="/tmp/pipeline.yaml",
... )
{'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}}
"""
return dsl.ResourceOp(
name=app.name,
action="create",
success_condition="status.state.phase = Completed",
failure_condition="status.state.phase = Failed",
k8s_resource=app_to_resource(app, queue, service_account=service_account),
)