#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import abc
import logging
import os
import re
import signal
import subprocess
import sys
import time
from contextlib import nullcontext
from dataclasses import dataclass, field
from enum import IntFlag
from typing import Any, Callable, Dict, Optional, Set, Tuple, Union
import torch.multiprocessing as mp
from torchelastic.multiprocessing.errors import ProcessFailure, record
from torchelastic.multiprocessing.redirects import redirect_stderr, redirect_stdout
from torchelastic.multiprocessing.tail_log import TailLog
log = logging.getLogger(__name__)
def _validate_full_rank(d: Dict[int, Any], nprocs: int, what: str):
actual_keys = set(d.keys())
expected_keys = set(range(nprocs))
if actual_keys != expected_keys:
raise RuntimeError(
f"{what}, local rank mapping mismatch,"
f" expected: {expected_keys}, actual: {actual_keys}"
)
_MAPPING_REGEX = r"^(\d:[0123],)*(\d:[0123])$"
_VALUE_REGEX = r"^[0123]$"
class Std(IntFlag):
NONE = 0
OUT = 1
ERR = 2
ALL = OUT | ERR
@classmethod
def from_str(cls, vm: str) -> Union["Std", Dict[int, "Std"]]:
"""
Example:
::
from_str("0") -> Std.NONE
from_str("1") -> Std.OUT
from_str("0:3,1:0,2:1,3:2") -> {0: Std.ALL, 1: Std.NONE, 2: Std.OUT, 3: Std.ERR}
Any other input raises an exception
"""
def to_std(v):
v = int(v)
for s in Std:
if s == v:
return s
# return None -> should NEVER reach here since we regex check input
if re.match(_VALUE_REGEX, vm): # vm is a number (e.g. 0)
return to_std(vm)
elif re.match(_MAPPING_REGEX, vm): # vm is a mapping (e.g. 0:1,1:2)
d: Dict[int, Std] = {}
for m in vm.split(","):
i, v = m.split(":")
d[int(i)] = to_std(v)
return d
else:
raise ValueError(
f"{vm} does not match: <{_VALUE_REGEX}> or <{_MAPPING_REGEX}>"
)
def to_map(
val_or_map: Union[Std, Dict[int, Std]], local_world_size: int
) -> Dict[int, Std]:
"""
Certain APIs take redirect settings either as a single value (e.g. apply to all
local ranks) or as an explicit user-provided mapping. This method is a convenience
method that converts a value or mapping into a mapping.
Example:
::
to_map(Std.OUT, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
to_map({1: Std.OUT}, local_world_size=2) # returns: {0: Std.NONE, 1: Std.OUT}
to_map({0: Std.OUT, 1: Std.OUT}, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
"""
if isinstance(val_or_map, Std):
return {i: val_or_map for i in range(local_world_size)}
else:
map = {}
for i in range(local_world_size):
map[i] = val_or_map.get(i, Std.NONE)
return map
[docs]@dataclass
class RunProcsResult:
"""
Results of a completed run of processes started with ``start_processes()``.
Returned by ``PContext``.
Note the following:
1. All fields are mapped by local rank
2. ``return_values`` - only populated for functions (not the binaries).
3. ``stdouts`` - path to stdout.log (empty string if no redirect)
4. ``stderrs`` - path to stderr.log (empty string if no redirect)
"""
return_values: Dict[int, Any] = field(default_factory=dict)
failures: Dict[int, ProcessFailure] = field(default_factory=dict)
stdouts: Dict[int, str] = field(default_factory=dict)
stderrs: Dict[int, str] = field(default_factory=dict)
def is_failed(self) -> bool:
return len(self.failures) > 0
[docs]class PContext(abc.ABC):
"""
The base class that standardizes operations over a set of processes
that are launched via different mechanisms. The name ``PContext``
is intentional to disambiguate with ``torch.multiprocessing.ProcessContext``.
.. warning:: stdouts and stderrs should ALWAYS be a superset of
tee_stdouts and tee_stderrs (respectively) this is b/c
tee is implemented as a redirect + tail -f <stdout/stderr.log>
"""
def __init__(
self,
name: str,
entrypoint: Union[Callable, str],
args: Dict[int, Tuple],
envs: Dict[int, Dict[str, str]],
stdouts: Dict[int, str],
stderrs: Dict[int, str],
tee_stdouts: Dict[int, str],
tee_stderrs: Dict[int, str],
error_files: Dict[int, str],
):
self.name = name
# validate that all mappings have the same number of keys and
# all local ranks are accounted for
nprocs = len(args)
_validate_full_rank(stdouts, nprocs, "stdouts")
_validate_full_rank(stderrs, nprocs, "stderrs")
self.entrypoint = entrypoint
self.args = args
self.envs = envs
self.stdouts = stdouts
self.stderrs = stderrs
self.error_files = error_files
self.nprocs = nprocs
self._stdout_tail = TailLog(name, tee_stdouts, sys.stdout)
self._stderr_tail = TailLog(name, tee_stderrs, sys.stderr)
def start(self) -> None:
"""
Start processes using parameters defined in the constructor.
"""
self._start()
self._stdout_tail.start()
self._stderr_tail.start()
@abc.abstractmethod
def _start(self) -> None:
"""
Start processes using strategy defined in a particular context.
"""
raise NotImplementedError()
@abc.abstractmethod
def _poll(self) -> Optional[RunProcsResult]:
"""
Polls the run status of the processes running under this context.
This method follows an "all-or-nothing" policy and returns
a ``RunProcessResults`` object if either all processes complete
successfully or any process fails. Returns ``None`` if
all processes are still running.
"""
raise NotImplementedError()
def wait(self, timeout: float = -1, period: float = 1) -> Optional[RunProcsResult]:
"""
Waits for the specified ``timeout`` seconds, polling every ``period`` seconds
for the processes to be done. Returns ``None`` if the processes are still running
on timeout expiry. Negative timeout values are interpreted as "wait-forever".
A timeout value of zero simply queries the status of the processes (e.g. equivalent
to a poll).
"""
if timeout == 0:
return self._poll()
if timeout < 0:
timeout = sys.maxsize
expiry = time.time() + timeout
while time.time() < expiry:
pr = self._poll()
if pr:
return pr
time.sleep(period)
return None
@abc.abstractmethod
def pids(self) -> Dict[int, int]:
"""
Returns pids of processes mapped by their respective local_ranks
"""
raise NotImplementedError()
@abc.abstractmethod
def _close(self) -> None:
r"""
Terminates all processes managed by this context and cleans up any
meta resources (e.g. redirect, error_file files).
"""
raise NotImplementedError()
def close(self) -> None:
self._close()
if self._stdout_tail:
self._stdout_tail.stop()
if self._stderr_tail:
self._stderr_tail.stop()
def _wrap(
local_rank: int,
fn: Callable,
args: Dict[int, Tuple],
envs: Dict[int, Dict[str, str]],
stdout_redirects: Dict[int, str], # redirect file for stdout (to console if None)
stderr_redirects: Dict[int, str], # redirect file for stderr (to console if None)
ret_vals: Dict[int, mp.SimpleQueue],
) -> None:
# get the per-rank params up front so we fail fast if no mapping is found
args_ = args[local_rank]
env_ = envs[local_rank]
ret_val_ = ret_vals[local_rank]
stdout_rd = stdout_redirects[local_rank]
stderr_rd = stderr_redirects[local_rank]
stdout_cm = redirect_stdout(stdout_rd) if stdout_rd else nullcontext()
stderr_cm = redirect_stderr(stderr_rd) if stderr_rd else nullcontext()
for k, v in env_.items():
os.environ[k] = v
with stdout_cm, stderr_cm:
ret = record(fn)(*args_)
ret_val_.put(ret)
[docs]class MultiprocessContext(PContext):
"""
``PContext`` holding worker processes invoked as a function.
"""
def __init__(
self,
name: str,
entrypoint: Callable,
args: Dict[int, Tuple],
envs: Dict[int, Dict[str, str]],
stdouts: Dict[int, str],
stderrs: Dict[int, str],
tee_stdouts: Dict[int, str],
tee_stderrs: Dict[int, str],
error_files: Dict[int, str],
start_method: str,
):
super().__init__(
name,
entrypoint,
args,
envs,
stdouts,
stderrs,
tee_stdouts,
tee_stderrs,
error_files,
)
self.start_method = start_method
# each ret_val queue will always contain a single element.
self._ret_vals = {
local_rank: mp.get_context(self.start_method).SimpleQueue()
for local_rank in range(self.nprocs)
}
# see comments in ``join()`` for what this is
self._return_values: Dict[int, Any] = {}
# pyre-fixme[8]: The attribute is defined in _start method
self._pc: mp.ProcessContext = None
def _start(self):
if self._pc:
raise ValueError(
"The process context already initialized. Most likely the start method got called twice."
)
self._pc: mp.ProcessContext = mp.start_processes(
fn=_wrap,
args=(
self.entrypoint,
self.args,
self.envs,
self.stdouts,
self.stderrs,
self._ret_vals,
),
nprocs=self.nprocs,
join=False,
daemon=False,
start_method=self.start_method,
)
def _poll(self) -> Optional[RunProcsResult]:
try:
# torch.mp.ProcessContext returns True if all the workers have
# successfully finished, False if some/all are still running
# and throws an Exception if some/all of them failed
# timeout < 0 checks worker status and return immediately
done = self._pc.join(-1)
# IMPORTANT: we use multiprocessing.Queue to carry worker return values
# back to the parent, the worker process will wait before terminating
# until all the buffered items are fed by the feeder thread to the underlying
# pipe. Hence to prevent deadlocks on large return values,
# we opportunistically try queue.get on each join call
# See: https://docs.python.org/2/library/multiprocessing.html#all-platforms
for local_rank in range(0, self.nprocs):
return_queue = self._ret_vals[local_rank]
if not return_queue.empty():
# save the return values temporarily into a member var
self._return_values[local_rank] = return_queue.get()
if done:
# we should ALWAYS have ALL the return values when all the processes are done
_validate_full_rank(
self._return_values, self.nprocs, "return_value queue"
)
self.close()
return RunProcsResult(
return_values=self._return_values,
stdouts=self.stdouts,
stderrs=self.stderrs,
)
else:
return None
except (mp.ProcessRaisedException, mp.ProcessExitedException) as e:
failed_local_rank = e.error_index
# pyre-ignore entrypoint fn has a __qualname__.
fn_name = self.entrypoint.__qualname__
failed_proc = self._pc.processes[failed_local_rank]
error_filepath = self.error_files[failed_local_rank]
log.error(
f"failed (exitcode: {failed_proc.exitcode})"
f" local_rank: {failed_local_rank} (pid: {e.pid})"
f" of fn: {fn_name} (start_method: {self.start_method})",
exc_info=True,
)
self.close()
return RunProcsResult(
failures={
failed_local_rank: ProcessFailure(
local_rank=failed_local_rank,
pid=e.pid,
exitcode=failed_proc.exitcode,
error_file=error_filepath,
)
},
stdouts=self.stdouts,
stderrs=self.stderrs,
)
def pids(self) -> Dict[int, int]:
return {local_rank: pid for local_rank, pid in enumerate(self._pc.pids())}
def _close(self) -> None:
if self._pc:
for proc in self._pc.processes:
proc.terminate()
proc.join()
class SubprocessHandler:
"""
Convenience wrapper around python's ``subprocess.Popen``. Keeps track of
meta-objects associated to the process (e.g. stdout and stderr redirect fds).
"""
def __init__(
self,
entrypoint: str,
args: Tuple,
env: Dict[str, str],
preexec_fn: Callable,
stdout: str,
stderr: str,
):
self._stdout = open(stdout, "w") if stdout else None
self._stderr = open(stderr, "w") if stderr else None
args_str = [str(e) for e in args]
# inherit parent environment vars
env_vars = os.environ.copy()
env_vars.update(env)
self.proc: subprocess.Popen = subprocess.Popen(
args=(entrypoint, *args_str),
env=env_vars,
preexec_fn=preexec_fn,
stdout=self._stdout,
stderr=self._stderr,
)
def close(self):
self.proc.terminate()
self.proc.wait()
if self._stdout:
self._stdout.close()
if self._stderr:
self._stderr.close()
[docs]class SubprocessContext(PContext):
"""
``PContext`` holding worker processes invoked as a binary.
"""
def __init__(
self,
name: str,
entrypoint: str,
args: Dict[int, Tuple],
envs: Dict[int, Dict[str, str]],
stdouts: Dict[int, str],
stderrs: Dict[int, str],
tee_stdouts: Dict[int, str],
tee_stderrs: Dict[int, str],
error_files: Dict[int, str],
):
super().__init__(
name,
entrypoint,
args,
envs,
stdouts,
stderrs,
tee_stdouts,
tee_stderrs,
error_files,
)
# state vector; _vdone[local_rank] -> is local_rank finished or not
self._running_local_ranks: Set[int] = set(range(self.nprocs))
self._failures: Dict[int, ProcessFailure] = {}
# pyre-fixme[8]: The attribute is defined in _start method
self.subprocess_handlers: Dict[int, SubprocessHandler] = None
def _start(self):
if self.subprocess_handlers:
raise ValueError(
"The subprocess handlers already initialized. Most likely the start method got called twice."
)
self.subprocess_handlers: Dict[int, SubprocessHandler] = {
local_rank: SubprocessHandler(
entrypoint=self.entrypoint,
args=self.args[local_rank],
env=self.envs[local_rank],
preexec_fn=mp._prctl_pr_set_pdeathsig(signal.SIGTERM),
stdout=self.stdouts[local_rank],
stderr=self.stderrs[local_rank],
)
for local_rank in range(self.nprocs)
}
def _poll(self) -> Optional[RunProcsResult]:
done_local_ranks = set()
for local_rank in self._running_local_ranks:
handler = self.subprocess_handlers[local_rank]
exitcode = handler.proc.poll()
if exitcode is not None:
done_local_ranks.add(local_rank)
if exitcode != 0: # failed or signaled
self._failures[local_rank] = ProcessFailure(
local_rank=local_rank,
pid=handler.proc.pid,
exitcode=exitcode,
error_file=self.error_files[local_rank],
)
# else: --> succeeded; nothing to do
self._running_local_ranks.difference_update(done_local_ranks)
# if ALL procs are finished or ANY have failed
if not self._running_local_ranks or self._failures:
self.close() # terminate all running procs
result = RunProcsResult(
failures=self._failures,
stdouts=self.stdouts,
stderrs=self.stderrs,
)
if result.is_failed():
first_failure = min(result.failures.values(), key=lambda f: f.timestamp)
log.error(
f"failed (exitcode: {first_failure.exitcode})"
f" local_rank: {first_failure.local_rank} (pid: {first_failure.pid})"
f" of binary: {self.entrypoint}"
)
else:
# Populate return with dummy values. This provides consistency with MultiprocessingHandler
result.return_values = {
local_rank: None for local_rank in range(self.nprocs)
}
return result
else: # there are no failures and procs still running
return None
def pids(self) -> Dict[int, int]:
return {
local_rank: sh.proc.pid
for local_rank, sh in self.subprocess_handlers.items()
}
def _close(self) -> None:
if self.subprocess_handlers:
for handler in self.subprocess_handlers.values():
handler.close()