Source code for torchx.runner.config
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
"""
Status: Beta
You can store the scheduler run cfg (run configs) for your project
by storing them in the ``.torchxconfig`` file. Currently this file is only read
and honored when running the component from the CLI.
CLI Usage
~~~~~~~~~~~
**Scheduler Config**
#. ``cd`` into the directory where you want the ``.torchxconfig`` file to be dropped.
The CLI only picks up ``.torchxconfig`` files from the current-working-directory (CWD)
so chose a directory where you typically run ``torchx`` from. Typically this
is the root of your project directory.
#. Generate the config file by running
.. code-block:: shell-session
$ torchx configure -s <comma,delimited,scheduler,names>
# -- or for all registered schedulers --
$ torchx configure
#. If you specified ``-s local_cwd,kubernetes``, you should see a ``.torchxconfig``
file as shown below:
.. code-block:: shell-session
$ cat .torchxconfig
[local_cwd]
[kubernetes]
queue = #FIXME:(str) Volcano queue to schedule job in
#. ``.torchxconfig`` in in INI format and the section names map to the scheduler names.
Each section contains the run configs for the scheduler as ``$key = $value`` pairs.
You may find that certain schedulers have empty sections, this means that
the scheduler defines sensible defaults for all its run configs hence no run configs
are required at runtime. If you'd like to override the default you can add them.
**TIP:** To see all the run options for a scheduler use ``torchx runopts <scheduler_name>``.
#. The sections with ``FIXME`` placeholders are run configs that are required
by the scheduler. Replace these with the values that apply to you.
#. **IMPORTANT:** If you are happy with the scheduler provided defaults for a particular
run config, you **should not** redundantly specify them in ``.torchxconfig`` with the
same default value. This is because the scheduler may decide to change the default
value at a later date which would leave you with a stale default.
#. Now you can run your component without having to specify the scheduler run configs
each time. Just make sure the directory you are running ``torchx`` cli from actually
has ``.torchxconfig``!
.. code-block:: shell-session
$ ls .torchxconfig
.torchxconfig
$ torchx run -s local_cwd ./my_component.py:train
#. In addition, it is possible to specify a different config other than .torchxconfig to
load at runtime. Requirements are that the config path is specified by enviornment
variable TORCHX_CONFIG. It also disables hierarchy loading configs from multiple
directories as the cases otherwise.
#. User level .torchxconfig
In addition to the project-level .torchxconfig at the root of the project directory,
you can create one in ``$HOME/.torchxconfig`` to override or specify additional default configs.
This config file will get overlaid on top of the one defined at the project root.
#. Config options take the following precedence (high to low):
1. Options specified directly from the CLI
2. If TORCHXCONFIG env variable is set, the options specified in that file
3. If TORCHXCONFIG env variable is not set,
a. Options specified in user level .torchxconfig (``$HOME/.torchxconfig``)
b. Options specified in .torchxconfig
4. Any default values in the code
Note that malformed or unrecognized options are simply skipped and not applied
**Component Config**
You can specify component defaults by adding a section prefixed with
``component:``.
.. code-block:: ini
[component:dist.ddp]
j=2x8
cpu=4
Now when you run the ``dist.ddp`` component those configs are automatically
picked up.
.. code-block:: shell-session
$ torchx run -s local_cwd dist.ddp
... runs with -j 2x8 --cpu 4
**CLI Subcommand Config**
The default arguments for the ``torchx`` subcommands can be overwritten. Any
``--foo FOO`` argument can be set via the correspond ``[cli:<cmd>]`` settings
block.
For the ``run`` command you can additionally set ``component`` to set the
default component to run.
.. code-block:: ini
[cli:run]
component=dist.ddp
scheduler=local_docker
workspace=file://some_workspace
Programmatic Usage
~~~~~~~~~~~~~~~~~~~
Unlike the cli, ``.torchxconfig`` file **is not** picked up automatically
from ``CWD`` if you are programmatically running your component with :py:class:`torchx.runner.Runner`.
You'll have to manually specify the directory containing ``.torchxconfig``.
Below is an example
.. doctest:: [runner_config_example]
from torchx.runner import get_runner
from torchx.runner.config import apply
import torchx.specs as specs
def my_component(a: int) -> specs.AppDef:
# <... component body omitted for brevity ...>
pass
scheduler = "local_cwd"
cfg = {"log_dir": "/these/take/outmost/precedence"}
apply(scheduler, cfg, dirs=["/home/bob"]) # looks for /home/bob/.torchxconfig
get_runner().run(my_component(1), scheduler, cfg)
You may also specify multiple directories (in preceding order) which is useful when
you want to keep personal config overrides on top of a project defined default.
"""
import configparser as configparser
import logging
import os
from pathlib import Path
from typing import Dict, Iterable, List, Optional, TextIO
from torchx.schedulers import get_scheduler_factories, Scheduler
from torchx.specs import CfgVal, get_type_name
from torchx.specs.api import runopt
CONFIG_FILE = ".torchxconfig"
CONFIG_PREFIX_DELIM = ":"
ENV_TORCHXCONFIG = "TORCHXCONFIG"
DEFAULT_CONFIG_DIRS = [str(Path.home()), str(Path.cwd())]
_NONE = "None"
log: logging.Logger = logging.getLogger(__name__)
def _configparser() -> configparser.ConfigParser:
"""
Sets up the configparser and returns it. The same config parser
should be used between dumps() and loads() methods for ser/de compatibility
"""
config = configparser.ConfigParser()
# if optionxform is not overridden, configparser will by default lowercase
# the option keys because it is compatible with Windows INI files
# which are expected to be parsed case insensitive.
# override since torchx's runopts are case-sensitive
# see: https://stackoverflow.com/questions/19359556/configparser-reads-capital-keys-and-make-them-lower-case
# pyre-ignore[8]
config.optionxform = lambda option: option
return config
def _get_scheduler(name: str) -> Scheduler:
schedulers = get_scheduler_factories()
if name not in schedulers:
raise ValueError(
f"`{name}` is not a registered scheduler. Valid scheduler names: {schedulers.keys()}"
)
sched = schedulers[name](session_name="_")
return sched
def _fixme_placeholder(runopt: runopt, max_len: int = 60) -> str:
ph = f"#FIXME:({get_type_name(runopt.opt_type)}) {runopt.help}"
return ph if len(ph) <= max_len else f"{ph[:max_len]}..."
[docs]def dump(
f: TextIO, schedulers: Optional[List[str]] = None, required_only: bool = False
) -> None:
"""
Dumps a default INI-style config template containing the :py:class:torchx.specs.runopts for the
given scheduler names into the file-like object specified by ``f``.
If no ``schedulers`` are specified dumps all known registered schedulers.
Optional runopts are pre-filled with their default values.
Required runopts are set with a ``FIXME: ...`` placeholder.
To only dump required runopts pass ``required_only=True``.
Each scheduler's runopts are written in the section called
``[{scheduler_name}]``.
For example:
::
[kubernetes]
namespace = default
queue = #FIXME (str)Volcano queue to schedule job in
Raises:
ValueError: if given a scheduler name that is not known
"""
if schedulers:
scheds = schedulers
else:
scheds = get_scheduler_factories().keys()
config = _configparser()
for sched_name in scheds:
try:
sched = _get_scheduler(sched_name)
except ModuleNotFoundError:
continue
section = f"{sched_name}"
config.add_section(section)
for opt_name, opt in sched.run_opts():
if opt.is_required:
val = _fixme_placeholder(opt)
else: # not required runopts MUST have a default
if required_only:
continue
# serialize list elements with `;` delimiter (consistent with torchx cli)
if opt.opt_type == List[str]:
# deal with empty or None default lists
if opt.default:
# pyre-ignore[6] opt.default type checked already as List[str]
val = ";".join(opt.default)
else:
val = _NONE
elif opt.opt_type == Dict[str, str]:
# deal with empty or None default lists
if opt.default:
# pyre-ignore[16] opt.default type checked already as Dict[str, str]
val = ";".join([f"{k}:{v}" for k, v in opt.default.items()])
else:
val = _NONE
else:
val = f"{opt.default}"
config.set(section, opt_name, val)
config.write(f, space_around_delimiters=True)
[docs]def apply(
scheduler: str,
cfg: Dict[str, CfgVal],
dirs: Optional[List[str]] = None,
) -> None:
"""
Loads a ``.torchxconfig`` INI file from the specified directories in
preceding order and applies the run configs for the scheduler onto
the given ``cfg``.
If no ``dirs`` is specified, then it looks for ``.torchxconfig`` in the
current working directory. If a specified directory does not have ``.torchxconfig``
then it is ignored.
Note that the configs already present in the given ``cfg`` take precedence
over the ones in the config file and only new configs are added. The same holds
true for the configs loaded in list order.
For instance if ``cfg={"foo":"bar"}`` and the config file is:
::
# dir_1/.torchxconfig
[local_cwd]
foo = baz
hello = world
# dir_2/.torchxconfig
[local_cwd]
hello = bob
Then after the method call, ``cfg={"foo":"bar","hello":"world"}``.
"""
for configfile in find_configs(dirs):
with open(configfile, "r") as f:
load(scheduler, f, cfg)
log.info(f"loaded configs from {configfile}")
[docs]def load_sections(
prefix: str,
dirs: Optional[List[str]] = None,
) -> Dict[str, Dict[str, str]]:
"""
Loads the content of the sections in the given ``.torchxconfig`` file
that start with the specified prefix. Returns a map of maps of the
section name WITHOUT the prefix with the contents of the section loaded
into a map. ``":"`` is used as the prefix delimiter.
Example config format for specifying defaults for the builtin
component ``dist.ddp`` is shown below:
::
[component:dist.ddp]
j = 1x2
image = ghcr.io/foo:1
# calling `load_sections(prefix="component")` returns
# {
# "dist.ddp": {
# "j":"1x2",
# "image":"ghcr.io/foo:1",
# },
# }
The keys in the section must match the parameter name of the component function.
The example below shows how to represent the various types that are allowable
as component parameter types.
::
[component:foo.bar]
int = 1
float = 1.2
bool = True # or False
str = foobar
list = a,b,c
map = A=B,C=D
vararg = -a b --c=d e
# to call the component as:
foo.bar(
"-a", "b", "--c=d", "e",
int=1,
float=1.2,
bool=True,
str="foobar",
list=["a", "b", "c"],
map={"A":"B", "C": "D"})
"""
def strip_prefix(section_name: str) -> Optional[str]:
# returns the section_name with the prefix removed
# or None if the section name does not match the prefix
idx = section_name.find(CONFIG_PREFIX_DELIM)
# index guard (e.g ":foo" and "foo:" are bad, just return None)
if 0 < idx < len(section_name) - 1:
if section_name[0:idx] == prefix:
return section_name[idx + 1 :]
return None
sections: Dict[str, Dict[str, str]] = {}
for configfile in find_configs(dirs):
with open(configfile, "r") as f:
config = _configparser()
config.read_file(f)
for section_name in config.sections():
name = strip_prefix(section_name)
if name:
section = sections.setdefault(name, {})
for key, value in config.items(section_name):
if key not in section:
log.debug(
f"Loaded config: {prefix}.{key}={value} from {configfile}"
)
section[key] = value
return sections
[docs]def get_configs(
prefix: str,
name: str,
dirs: Optional[List[str]] = None,
) -> Dict[str, str]:
"""
Gets all the config values in the section ``["{prefix}:{name}"]``.
Or an empty map if the section does not exist.
Example:
::
# for config file:
# [foo:bar]
# baz = 1
get_configs(prefix="foo", name="bar") # returns {"baz": "1"}
get_config(prefix="foo", name="barr") # returns {}
"""
sections = load_sections(prefix, dirs)
return sections.get(name, {})
[docs]def get_config(
prefix: str,
name: str,
key: str,
dirs: Optional[List[str]] = None,
) -> Optional[str]:
"""
Gets the config value for the ``key`` in the section ``["{prefix}:{name}"]``.
Or ``None`` if no section or key exists
Example:
::
# for config file:
# [foo:bar]
# baz = 1
get_config(prefix="foo", name="bar", key="baz") == 1
get_config(prefix="foo", name="bar", key="bazz") == None
get_config(prefix="foo", name="barr", key="baz") == None
get_config(prefix="fooo", name="bar", key="baz") == None
"""
return get_configs(prefix, name, dirs).get(key, None)
[docs]def find_configs(dirs: Optional[Iterable[str]] = None) -> List[str]:
"""
Finds and returns the filepath to ``.torchxconfig`` files based
on the following logic:
1. If the environment variable ``TORCHXCONFIG`` exists, then its value
is returned in a single-element list and the directories specified through
the ``dirs`` parameter is NOT searched.
2. Otherwise, a ``.torchxconfig`` file is looked for in ``dirs`` and
the filepaths to existing config files are returned. If ``dirs`` is
not specified or is empty then ``dirs`` defaults to ``[$HOME, $CWD]``
where CWD is current working dir.
"""
config = os.getenv(ENV_TORCHXCONFIG)
if config is not None:
configfile = Path(config)
if not configfile.is_file():
raise FileNotFoundError(
f"`{ENV_TORCHXCONFIG}={config}` does not exist or is not a file."
)
return [str(configfile)]
else:
config_files = []
if not dirs:
dirs = DEFAULT_CONFIG_DIRS
for d in dirs:
configfile = Path(d) / CONFIG_FILE
if os.access(configfile, os.R_OK):
config_files.append(str(configfile))
return config_files
[docs]def load(scheduler: str, f: TextIO, cfg: Dict[str, CfgVal]) -> None:
"""
loads the section ``[{scheduler}]`` from the given
configfile ``f`` (in .INI format) into the provided ``runcfg``, only adding
configs that are NOT currently in the given ``runcfg`` (e.g. does not
override existing values in ``runcfg``). If no section is found, does nothing.
"""
config = _configparser()
config.read_file(f)
runopts = _get_scheduler(scheduler).run_opts()
section = f"{scheduler}"
if config.has_section(section):
for name, value in config.items(section):
if name in cfg.keys():
# DO NOT OVERRIDE existing configs
continue
if value == _NONE:
# should map to None (not str 'None')
# this also handles empty or None lists
cfg[name] = None
else:
runopt = runopts.get(name)
if runopt is None:
log.warning(
f"`{name} = {value}` was declared in the [{section}] section "
f" of the config file but is not a runopt of `{scheduler}` scheduler."
f" Remove the entry from the config file to no longer see this warning"
)
else:
if runopt.opt_type is bool:
# need to handle bool specially since str -> bool is based on
# str emptiness not value (e.g. bool("False") == True)
cfg[name] = config.getboolean(section, name)
elif runopt.opt_type is List[str]:
cfg[name] = value.split(";")
elif runopt.opt_type is Dict[str, str]:
cfg[name] = {
s.split(":", 1)[0]: s.split(":", 1)[1]
for s in value.replace(",", ";").split(";")
}
else:
# pyre-ignore[29]
cfg[name] = runopt.opt_type(value)