Shortcuts

Source code for functorch._src.aot_autograd

import torch
import torch.nn as nn
from torch import Tensor
from functorch import make_fx
from torch.fx import immutable_collections
import torch.utils._pytree as pytree
import torch.utils.dlpack
from torch.nn.utils import _stateless
from functorch._C import CompileCache
from .decompositions import register_decomposition
from .partitioners import default_partition
from .named_members_polyfill import _named_parameters, _named_buffers
from typing import Callable, List, Dict, Any, Tuple, Optional

pytree._register_pytree_node(
    immutable_collections.immutable_list,
    lambda x: (list(x), None),
    lambda x, c: immutable_collections.immutable_list(x),
)
pytree._register_pytree_node(
    immutable_collections.immutable_dict,
    lambda x: (list(x.values()), list(x.keys())),
    lambda x, c: immutable_collections.immutable_dict(
        {key: value for key, value in zip(c, x)}
    ),
)

# TODO - move this to PyTorch core. This overrides the pytree implementation for
# dict to maintain parity with Deepmind pytree.
Context = Any


def _dict_flatten(d: Dict[Any, Any]) -> Tuple[List[Any], Context]:
    keys = list(sorted(d.keys()))
    values = [d[key] for key in keys]
    return values, keys


def _dict_unflatten(values: List[Any], context: Context) -> Dict[Any, Any]:
    return {key: value for key, value in zip(context, values)}


pytree._register_pytree_node(dict, _dict_flatten, _dict_unflatten)

aten = torch.ops.aten


def create_joint_forward_backward(fn):
    def joint_forward_backward(
        primals: List[Any], tangents: List[Any]
    ) -> Tuple[List[Any], List[Any]]:
        # Call the forward pass
        outs = fn(*primals)
        # Get the inputs that need gradients
        grad_primals = []
        inputs_needs_grads = []
        for p in primals:
            is_grad_tensor = isinstance(p, Tensor) and p.requires_grad
            inputs_needs_grads.append(is_grad_tensor)
            if is_grad_tensor:
                grad_primals.append(p)

        # Get the outputs that need gradients
        assert len(tangents) == len(outs)
        needed_outs = []
        needed_tangents = []
        for out, tangent in zip(outs, tangents):
            if isinstance(out, Tensor) and out.requires_grad:
                needed_outs.append(out)
                needed_tangents.append(tangent)
        backward_out = []
        # Call the backwards pass
        if grad_primals:
            backward_out = torch.autograd.grad(
                needed_outs,
                grad_primals,
                grad_outputs=needed_tangents,
                allow_unused=True,
            )
        backward_out_iter = iter(backward_out)
        return outs, [
            next(backward_out_iter) if i else None for i in inputs_needs_grads
        ]

    return joint_forward_backward


def normalize_as_list(x):
    if isinstance(x, tuple):
        return list(x)
    elif isinstance(x, list):
        return x
    return [x]


aot_autograd_decompositions = {}


@register_decomposition(aten.rsub, aot_autograd_decompositions)
def rsub(a, b, alpha=1):
    return -aten.sub(a, b)


@register_decomposition(aten._reshape_alias, aot_autograd_decompositions)
def _reshape_alias(x, shape, strides):
    return aten.view(x, shape)


def create_aot_autograd_function(
    flat_fn, fw_compiler, bw_compiler, partition_fn, decompositions, grad_state
):
    """
    Traces the forward and backward graphs of the attr:`flat_fn` to generate a
    joint graph. The joint graph is an Fx graph with Aten ops. Please refer to
    the tracing mechanism to understand the graph capturing details.

    The joint graph is then passed through attr:`partition_fn` to isolate the
    forward and backward portions, which are then respectively compiled via the
    provided attr:`fw_compiler` and attr:`bw_compiler`.

    The resulting compiled forward and backward graphs are then wrapped up in a
    ``torch.autograd.Function`` object.
    """
    joint_forward_backward = create_joint_forward_backward(flat_fn)

    compiled_fw = None
    compiled_bw = None
    num_outs = None

    class CompiledFunction(torch.autograd.Function):
        @staticmethod
        def forward(ctx, *flat_tensor_args):
            nonlocal compiled_fw, compiled_bw, num_outs
            if compiled_fw is None:
                with torch.set_grad_enabled(grad_state):
                    out = flat_fn(*flat_tensor_args)
                out = pytree.tree_map(
                    lambda x: x.detach() if isinstance(x, Tensor) else x, out
                )

                if isinstance(out, (list, tuple)):
                    num_outs = len(out)
                else:
                    num_outs = 1

                joint_inputs = (flat_tensor_args, out)
                aot_decompositions = {**aot_autograd_decompositions, **decompositions}
                with torch.set_grad_enabled(grad_state):
                    fx_g = make_fx(joint_forward_backward, aot_decompositions)(
                        *joint_inputs
                    )
                fw_module, bw_module = partition_fn(fx_g, joint_inputs)
                # print(fw_module.code, bw_module.code)

                compiled_fw = fw_compiler(fw_module, flat_tensor_args)
                fw_outs = normalize_as_list(compiled_fw(*flat_tensor_args))

                bw_args = fw_outs[num_outs:] + fw_outs[0:num_outs]
                compiled_bw = bw_compiler(bw_module, bw_args)
            else:
                fw_outs = normalize_as_list(compiled_fw(*flat_tensor_args))
            ctx.save_for_backward(*fw_outs[num_outs:])
            return tuple(fw_outs[0:num_outs])

        @staticmethod
        def backward(ctx, *flat_args):
            # hmm... this doesn't feel right. todo
            # contiguous_args = [t.contiguous() for t in flat_args]
            contiguous_args = [t for t in flat_args]
            out = normalize_as_list(compiled_bw(*ctx.saved_tensors, *contiguous_args))
            return tuple(out)

    return CompiledFunction


class _CompileCache(CompileCache):
    pass


# using a C++-based pytree reduces the overhead by about 50%
try:
    import tree

    HAS_TREE = True
except ImportError:
    HAS_TREE = False
compile_cache = None


# Inspired by autodidax (thanks!)
class PytreeThunk:
    spec = None
    # These are some kinda dumb microoptimizations that save about 3-4 us of overhead.
    is_simple = (
        None  # if the output spec is a tuple/list, we won't bother unflattening it.
    )
    is_really_simple = None  # if the output spec is a LeafSpec

    def set(self, spec):
        assert self.spec is None or self.spec == spec
        self.spec = spec
        if type(self.spec) in [tuple, list] and all(
            [isinstance(i, pytree.LeafSpec) for i in spec.children_specs]
        ):
            self.is_simple = True
        if isinstance(self.spec, pytree.LeafSpec):
            self.is_really_simple = True

    def unflatten(self, x):
        if self.is_really_simple:
            return x[0]
        if self.is_simple:
            return x
        return pytree.tree_unflatten(x, self.spec)


def filter_tensor_and_static_args(args, static_argnums):
    """
    Separate out the tensor and static args. Also, for the static args, store
    the hash.
    """
    tensor_args = []
    static_args = []
    static_args_hashed = []
    for idx, arg in enumerate(args):
        if idx not in static_argnums:
            tensor_args.append(arg)
        else:
            static_args.append(arg)
            static_args_hashed.append(arg.__hash__())
    return tensor_args, static_args, static_args_hashed


def rearrange(tensor_args, static_args, static_argnums):
    """
    Generate the args as per the original spec. static_argnums is sorted.
    """
    tensor_index = 0
    static_index = 0
    index = 0
    args = []
    assert len(static_args) == len(static_argnums)
    while tensor_index < len(tensor_args) and static_index < len(static_args):
        if index == static_argnums[static_index]:
            args.append(static_args[static_index])
            static_index += 1
        else:
            args.append(tensor_args[tensor_index])
            tensor_index += 1

    while tensor_index < len(tensor_args):
        args.append(tensor_args[tensor_index])
        tensor_index += 1

    while static_index < len(static_args):
        args.append(static_args[static_index])
        static_index += 1

    return args


KNOWN_TYPES = [torch.Tensor, int, str, float, bool]


[docs]def aot_function( fn: Callable, fw_compiler: Callable, bw_compiler: Optional[Callable] = None, partition_fn: Callable = default_partition, decompositions: Dict = {}, hasher_type: str = "StaticShapeHasher", static_argnums: Optional[Tuple[int]] = None, ) -> Callable: """ Traces the forward and backward graph of :attr:`fn` using torch dispatch mechanism, and then compiles the generated forward and backward graphs through :attr:`fw_compiler` and :attr:`bw_compiler`. :func:`aot_function` traces the forward and backward graph ahead of time, and generates a joint forward and backward graph. :attr:`partition_fn` is then used to separate out forward and backward graphs. The partitioner function can be used to perform optimizations such as recomputation. One can set `decompositions` dictionary to decompose the operators into a sequence of core or simpler operators supported by the backend compilers. :func:`aot_function` uses a compilation cache, based on input tensor properties, to detect when there is a need of recompilation. By default, its behavior is static, i.e., it recompiles if shape of any input tensor changes. :attr:`static_argnums` allows user to mark the arguments of the original :attr:`fn` as static. This is useful when an argument is a non-tensor, e.g., ``int`` or ``bool``. A change in the actual value of static arg causes recompilation. .. warning:: This API is experimental and likely to change. Args: fn (Callable): A Python function that takes one ore more arguments. Must return one or more Tensors. fw_compiler (Callable): A Python function that accepts an Fx graph with Aten ops and input args, and returns a Callable that semantically is equivalent to the input Fx graph. bw_compiler (Optional[Callable]): A Python function that accepts an Fx graph with Aten ops and input args, and returns a Callable that semantically is equivalent to the input Fx graph. Default: None (when None, it defaults to the :attr:`fw_compiler`) partition_fn (Callable): A Python function that takes a joint forward and backward graph, and partitions it into separate forward and backward graphs. decompositions (Dict): A dictionary to define the decomposition of larger Aten ops into simpler or core Aten ops. static_argnums (Optional[Tuple[Int]]): An option tuple of ints to mark the arguments of the function as static. Returns: Returns a ``Callable`` that retains the eager behavior of the original :attr:`fn`, but with forward and backward graph compiled via :attr:`fw_compile` and :attr:`bw_compile`. A simple example usage of :func:`aot_function` is as follows. This example will print the forward and backward graphs of the function ``fn`` >>> fn = lambda x : x.sin().cos() >>> def print_compile_fn(fx_module, args): >>> print(fx_module) >>> return fx_module >>> aot_fn = aot_function(fn, print_compile_fn) >>> x = torch.randn(4, 5, requires_grad=True) >>> aot_fn(x) The static argnums are used to mark the non-tensor arguments as static. An example is as follows where the dropout probability is as argument to the original function. >>> def fn(input, bias, residual, p: float): >>> a = torch.add(input, bias) >>> b = torch.nn.functional.dropout(a, p, training=True) >>> c = b + residual >>> return c >>> aot_fn = aot_function(fn, print_compile_fn, static_argnums=(3,)) """ global compile_cache if compile_cache is None: compile_cache = CompileCache() if bw_compiler is None: bw_compiler = fw_compiler cached_res = None fn_id = id(fn) fw_compiler_id = id(fw_compiler) bw_compiler_id = id(bw_compiler) if isinstance(static_argnums, int): static_argnums = [static_argnums] elif static_argnums is not None and len(static_argnums) == 0: static_argnums = None elif static_argnums is not None: static_argnums = list(static_argnums) static_argnums.sort() def returned_function(*args, **kwargs): global compile_cache nonlocal cached_res # Separate out static args if static_argnums is present tensor_args = args static_args = [] # TODO - move the hashing part of static_args to C++. static_args_hashed = [] if static_argnums is not None: ( tensor_args, static_args, static_args_hashed, ) = filter_tensor_and_static_args(args, static_argnums) # Now flatten the tensor args if HAS_TREE: flat_tensor_args = tree.flatten((tensor_args, kwargs)) else: flat_tensor_args, _ = pytree.tree_flatten((tensor_args, kwargs)) # Check if the fn is already compiled num_tensor_args = len(flat_tensor_args) flat_args_for_cache = flat_tensor_args + static_args_hashed cached_res = compile_cache.at( fn_id, fw_compiler_id, bw_compiler_id, num_tensor_args, hasher_type, *flat_args_for_cache, ) # Compile the function and save it in the cache if cached_res is None: # Save the args_spec for flat_tensor_args to unflatten while tracing _, tensor_args_spec = pytree.tree_flatten((tensor_args, kwargs)) out_spec = PytreeThunk() def flat_fn(*flat_tensor_args): # The input are flattened tensor args. Prepare the args in the # order that original function expects. Add static args as well. # They will appear as tensor constants in the traced graph. nonlocal out_spec, static_args tensor_args, kwargs = pytree.tree_unflatten( flat_tensor_args, tensor_args_spec ) if static_argnums is None: args = tensor_args else: args = rearrange(tensor_args, static_args, static_argnums) tree_out = fn(*args, **kwargs) flat_out, spec = pytree.tree_flatten(tree_out) for i in flat_out: is_known_type = False for j in KNOWN_TYPES: if isinstance(i, j): is_known_type = True break if not is_known_type: raise RuntimeError( f"Found {type(i)} in output, which is not a known type. " "If this type holds tensors, you need to register a pytree for it. " "See https://github.com/pytorch/functorch/issues/475 for a brief " "explanation why. If you don't need to register a pytree, please " "leave a comment explaining your use case and we'll make this more " "ergonomic to deal with" ) out_spec.set(spec) return flat_out compiled_fn = create_aot_autograd_function( flat_fn, fw_compiler, bw_compiler, partition_fn, decompositions, grad_state=torch.is_grad_enabled(), ).apply cached_res = (compiled_fn, out_spec) # Save the compiled_fn in the cache compile_cache.insert( fn_id, fw_compiler_id, bw_compiler_id, num_tensor_args, hasher_type, cached_res, *flat_args_for_cache, ) cached_fn, out_spec = cached_res out = cached_fn(*flat_tensor_args) return out_spec.unflatten(out) return returned_function
def num_of_recompilations(): """ Returns the numbers of recompilations since the last time cache was cleared. This is equivalent to the number of entries in the compilation cache. """ global compile_cache if compile_cache is None: return 0 return compile_cache.size() def clear_compile_cache(): """ Clears the compilation cache. """ global compile_cache if compile_cache is not None: compile_cache.clear() compile_cache = None
[docs]def aot_module(mod: nn.Module, *args, **kwargs) -> nn.Module: """ Traces the forward and backward graph of :attr:`mod` using torch dispatch tracing mechanism. It is wrapper function, that underneath uses :func:`aot_function` to perform tracing and compilation. :func:`aot_module` lifts the parameters and buffers of ``nn.Module`` as inputs to a new callable which is then compiled through :func:`aot_function`. .. warning:: This API is experimental and likely to change. Args: mod (Callable): A ``nn.Module`` module. args : args to be passed to :func:`aot_function` kwargs : kwargs to be passed to :func:`aot_function` Returns: Returns a ``nn.Module`` that retains the eager behavior of the original :attr:`mod`, but with forward and backward graph compiled. """ def functional_call(named_params, named_buffers, *args, **kwargs): params_and_buffers = {**named_params, **named_buffers} return _stateless.functional_call(mod, params_and_buffers, args, kwargs) compiled_f = aot_function(functional_call, *args, **kwargs) class AOTModule(nn.Module): def __init__(self): super(AOTModule, self).__init__() self.orig_module = mod def forward(self, *args, **kwargs): return compiled_f( dict(_named_parameters(mod, remove_duplicate=False)), dict(_named_buffers(mod, remove_duplicate=False)), *args, **kwargs, ) return AOTModule()
compiled_function = aot_function compiled_module = aot_module