Shortcuts

Source code for sdk.inspector.inspector

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import dataclasses
import logging
from collections import defaultdict, OrderedDict
from dataclasses import dataclass
from enum import Enum
from typing import (
    Dict,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    TypeAlias,
    TypedDict,
    Union,
)

import numpy as np
import pandas as pd
import torch
from executorch.exir import ExportedProgram

from executorch.sdk.debug_format.et_schema import OperatorNode
from executorch.sdk.etdump.schema_flatcc import ETDumpFlatCC, ProfileEvent
from executorch.sdk.etrecord import parse_etrecord
from executorch.sdk.inspector._inspector_utils import (
    create_debug_handle_to_op_node_mapping,
    EDGE_DIALECT_GRAPH_KEY,
    gen_etdump_object,
    gen_graphs_from_etrecord,
)

from tabulate import tabulate

FORWARD = "forward"
RESERVED_FRAMEWORK_EVENT_NAMES = [
    "Method::init",
    "Program::load_method",
    "Method::execute",
]
EXCLUDED_COLUMNS_WHEN_PRINTING = [
    "raw",
    "delegate_debug_identifier",
    "stack_traces",
    "module_hierarchy",
    "debug_data",
]
EXCLUDED_EVENTS_WHEN_PRINTING = {"OPERATOR_CALL"}


log: logging.Logger = logging.getLogger(__name__)

# Signature of a ProfileEvent
@dataclass(frozen=True, order=True)
class ProfileEventSignature:
    name: str
    instruction_id: Optional[int]
    delegate_id: Optional[int] = None
    delegate_id_str: Optional[str] = None

    @staticmethod
    def _gen_from_event(event: ProfileEvent) -> "ProfileEventSignature":
        """
        Given a ProfileEvent, extract the fields into a signature

        ProfileEvents from ETDump default to "" and -1 when the field is not populated
        The Signature will convert these back to the intended None value
        """
        return ProfileEventSignature(
            event.name or "",
            event.instruction_id if event.instruction_id != -1 else None,
            event.delegate_debug_id_int if event.delegate_debug_id_int != -1 else None,
            event.delegate_debug_id_str if event.delegate_debug_id_str != "" else None,
        )


# Signature of a RunData as defined by its ProfileEvents
RunSignature: TypeAlias = Tuple[ProfileEventSignature]


# Typing for mapping Event.delegate_debug_identifiers to debug_handle(s)
DelegateIdentifierDebugHandleMap: TypeAlias = Union[
    Mapping[int, Tuple[int, ...]], Mapping[str, Tuple[int, ...]]
]

# Typing for Dict containig delegate metadata
DelegateMetadata = TypedDict(
    "DelegateMetadata",
    {"name": str, "delegate_map": DelegateIdentifierDebugHandleMap},
)


class TimeScale(Enum):
    NS = "ns"
    US = "us"
    MS = "ms"
    S = "s"
    CYCLES = "cycles"


time_scale_dict = {
    TimeScale.NS: 1000000000,
    TimeScale.US: 1000000,
    TimeScale.MS: 1000,
    TimeScale.S: 1,
    TimeScale.CYCLES: 1,
}


@dataclass
class PerfData:
    def __init__(self, raw: List[float]):
        self.raw: List[float] = raw

    @property
    def p50(self) -> float:
        return np.percentile(self.raw, 50)

    @property
    def p90(self) -> float:
        return np.percentile(self.raw, 90)

    @property
    def avg(self) -> float:
        return np.mean(self.raw)

    @property
    def min(self) -> float:
        return min(self.raw)

    @property
    def max(self) -> float:
        return max(self.raw)


# TODO: detailed documentation
[docs]@dataclass class Event: """ An Event corresponds to an operator instance with perf data retrieved from the runtime and other metadata from `ETRecord`. Args: name: Name of the profiling/debugging `Event`. perf_data: Performance data associated with the event retrived from the runtime (available attributes: p50, p90, avg, min and max). op_type: List of op types corresponding to the event. delegate_debug_identifier: Supplemental identifier used in combination with instruction id. debug_handles: Debug handles in the model graph to which this event is correlated. stack_trace: A dictionary mapping the name of each associated op to its stack trace. module_hierarchy: A dictionary mapping the name of each associated op to its module hierarchy. is_delegated_op: Whether or not the event was delegated. delegate_backend_name: Name of the backend this event was delegated to. debug_data: Intermediate data collected during runtime. """ name: str perf_data: PerfData op_types: List[str] = dataclasses.field(default_factory=list) delegate_debug_identifier: Optional[Union[int, str]] = None debug_handles: Optional[Union[int, Sequence[int]]] = None stack_traces: Dict[str, str] = dataclasses.field(default_factory=dict) module_hierarchy: Dict[str, Dict] = dataclasses.field(default_factory=dict) is_delegated_op: Optional[bool] = None delegate_backend_name: Optional[str] = None debug_data: List[torch.Tensor] = dataclasses.field(default_factory=list) _instruction_id: Optional[int] = None @staticmethod def _gen_from_profile_events( signature: ProfileEventSignature, events: List[ProfileEvent], scale_factor: float = 1.0, ) -> "Event": """ Given a ProfileEventSignature and a list of ProfileEvents with that signature, return an Event object matching the ProfileEventSignature, with perf_data populated from the list of ProfileEvents An optional inverse scale factor can be provided to adjust the event timestamps """ if signature.delegate_id is not None: # 0 is a valid value delegate_debug_identifier = signature.delegate_id else: delegate_debug_identifier = signature.delegate_id_str or None # Use the delegate identifier as the event name if delegated is_delegated_op = delegate_debug_identifier is not None name = signature.name if not is_delegated_op else str(delegate_debug_identifier) perf_data = PerfData( [ float(event.end_time - event.start_time) / scale_factor for event in events ] ) return Event( name=name, perf_data=perf_data, delegate_debug_identifier=delegate_debug_identifier, is_delegated_op=is_delegated_op, _instruction_id=signature.instruction_id, ) def _associate_with_op_graph_nodes( self, debug_handle_to_op_node_map: Dict[int, OperatorNode] ) -> None: """ Helper function to populate the stack_traces, module_hierarchy and op_types attributes based on the debug handles of this event """ # Framework events aren't logically associated with any nodes if self.name in RESERVED_FRAMEWORK_EVENT_NAMES: return if (debug_handles := self.debug_handles) is None: return if isinstance(debug_handles, int): debug_handles = [debug_handles] for handle in debug_handles: node = debug_handle_to_op_node_map.get(handle) if node is not None and (metadata := node.metadata) is not None: self.stack_traces[node.name] = metadata.get("stack_trace") self.module_hierarchy[node.name] = metadata.get("nn_module_stack") if node.op: # TODO: consider having this as a dict from node.name -> node.op self.op_types += [node.op]
[docs]@dataclass class EventBlock: r""" An `EventBlock` contains a collection of events associated with a particular profiling/debugging block retrieved from the runtime. Each `EventBlock` represents a pattern of execution. For example, model initiation and loading lives in a single `EventBlock`. If there's a control flow, each branch will be represented by a separate `EventBlock`. Args: name: Name of the profiling/debugging block. events: List of `Event`\ s associated with the profiling/debugging block. """ name: str events: List[Event] = dataclasses.field(default_factory=list) source_time_scale: TimeScale = TimeScale.NS target_time_scale: TimeScale = TimeScale.MS def to_dataframe(self, include_units: bool = False) -> pd.DataFrame: """ Converts the EventBlock into a DataFrame with each row being an event instance Note: Rows that have an event_name = OPERATOR_CALL correspond to the perf of the previous operator + framework tax of making said operator call. Args: include_units: Whether headers should include units (default false) Returns: A Pandas DataFrame containing the data of each Event instance in this EventBlock. """ units = " (" + self.target_time_scale.value + ")" if include_units else "" # TODO: push row generation down to Event data = { "event_block_name": [self.name] * len(self.events), "event_name": [event.name for event in self.events], "raw": [event.perf_data.raw for event in self.events], "p50" + units: [event.perf_data.p50 for event in self.events], "p90" + units: [event.perf_data.p90 for event in self.events], "avg" + units: [event.perf_data.avg for event in self.events], "min" + units: [event.perf_data.min for event in self.events], "max" + units: [event.perf_data.max for event in self.events], "op_types": [event.op_types for event in self.events], "delegate_debug_identifier": [ event.delegate_debug_identifier for event in self.events ], "stack_traces": [event.stack_traces for event in self.events], "module_hierarchy": [event.module_hierarchy for event in self.events], "is_delegated_op": [event.is_delegated_op for event in self.events], "delegate_backend_name": [ event.delegate_backend_name for event in self.events ], "debug_data": [event.debug_data for event in self.events], } df = pd.DataFrame(data) return df @staticmethod def _gen_from_etdump( etdump: ETDumpFlatCC, source_time_scale: TimeScale = TimeScale.NS, target_time_scale: TimeScale = TimeScale.MS, ) -> List["EventBlock"]: """ Given an etdump, generate a list of EventBlocks corresponding to the contents. An optional (inverse) scale factor can be provided to adjust the etdump timestamps associated with each EventBlocks """ # Group all the RunData by the set of profile events profile_run_groups: Mapping[ RunSignature, OrderedDict[ProfileEventSignature, List[ProfileEvent]], ] = defaultdict(OrderedDict) for run in etdump.run_data: if (run_events := run.events) is None: continue # Identify all the ProfileEventSignatures profile_events: OrderedDict[ ProfileEventSignature, ProfileEvent ] = OrderedDict() for event in run_events: if (profile_event := event.profile_event) is not None: signature = ProfileEventSignature._gen_from_event(profile_event) profile_events[signature] = profile_event # Create a RunSignature from the ProfileEventSignature found run_signature: RunSignature = tuple(profile_events.keys()) # Update the Profile Run Groups, indexed on the RunSignature run_signature_events: OrderedDict[ ProfileEventSignature, List[ProfileEvent] ] = profile_run_groups[run_signature] for event_signature, event in profile_events.items(): run_signature_events.setdefault(event_signature, []).append(event) scale_factor = ( time_scale_dict[source_time_scale] / time_scale_dict[target_time_scale] ) # Create EventBlocks from the Profile Run Groups return [ EventBlock( name=str(index), events=[ Event._gen_from_profile_events(signature, event, scale_factor) for signature, event in profile_events.items() ], source_time_scale=source_time_scale, target_time_scale=target_time_scale, ) for index, profile_events in enumerate(profile_run_groups.values()) ] # TODO: Considering changing ETRecord deserialization logic to cast the ints in string format to actual ints def _gen_resolve_debug_handles( self, handle_map: Dict[str, List[int]], delegate_map: Optional[Dict[str, DelegateMetadata]] = None, ): """ Given mappings from instruction id to debug handles, populate the debug_handles field of all underlying events If the event is delegated, index with the instruction_id and delegate_debug_identifier to obtain the debug_handle via the delegate map """ for event in self.events: # Check if instruction_id is present in the event if event._instruction_id is None: continue # Check for the instruction_id in handle map if (instruction_id := str(event._instruction_id)) not in handle_map: continue # For non-delegated event, handles are found in handle_map if (delegate_debug_id := event.delegate_debug_identifier) is None: event.debug_handles = handle_map[instruction_id] continue # Check that the delegated event has a corresponding mapping if ( delegate_map is None or (delegate_metadata := delegate_map.get(instruction_id)) is None ): event.debug_handles = handle_map[instruction_id] log.warning( f" No delegate mapping found for delegate with instruction id {event._instruction_id}" ) continue # For delegated events, handles are found via delegateMetadata event.delegate_backend_name = delegate_metadata.get("name", "") delegate_metadata_delegate_map = delegate_metadata.get("delegate_map", {}) # delegate_debug_id can be either int based or string based, therefore we need to check both debug_handles = delegate_metadata_delegate_map.get( delegate_debug_id # pyre-ignore ) if debug_handles is not None: event.debug_handles = debug_handles else: event.debug_handles = delegate_metadata_delegate_map.get( str(delegate_debug_id) # pyre-ignore )
class Inspector: """ APIs for examining model architecture and performance stats. Public Attributes: event_blocks: List["EventBlocks"]. Structured data from ETDump (correlated with ETRecord if provided). Private Attributes: _etrecord: Optional[ETRecord]. File under etrecord_path deserialized into an object. """ def __init__( self, etdump_path: Optional[str] = None, etrecord_path: Optional[str] = None, source_time_scale: TimeScale = TimeScale.NS, target_time_scale: TimeScale = TimeScale.MS, ) -> None: r""" Initialize an `Inspector` instance with the underlying `EventBlock`\ s populated with data from the provided ETDump path and optional ETRecord path. Args: etdump_path: Path to the ETDump file. etrecord_path: Optional path to the ETRecord file. source_time_scale: The time scale of the performance data retrieved from the runtime. The default time hook implentation in the runtime returns NS. target_time_scale: The target time scale to which the users want their performance data converted to. Defaults to MS. Returns: None """ self._etrecord = ( parse_etrecord(etrecord_path=etrecord_path) if etrecord_path is not None else None ) etdump = gen_etdump_object(etdump_path=etdump_path) if (source_time_scale == TimeScale.CYCLES) ^ ( target_time_scale == TimeScale.CYCLES ): raise RuntimeError( "For TimeScale in cycles both the source and target time scale have to be in cycles." ) self._source_time_scale = source_time_scale self._target_time_scale = target_time_scale self.event_blocks = EventBlock._gen_from_etdump( etdump, self._source_time_scale, self._target_time_scale ) # No additional data association can be done without ETRecord, so return early if self._etrecord is None: return # Use the delegate map from etrecord, associate debug handles with each event for event_block in self.event_blocks: event_block._gen_resolve_debug_handles( self._etrecord._debug_handle_map[FORWARD], self._etrecord._delegate_map[FORWARD] if self._etrecord._delegate_map is not None else None, ) # Traverse the edge dialect op graph to create mapping from debug_handle to op node op_graph_dict = gen_graphs_from_etrecord(etrecord=self._etrecord) debug_handle_to_op_node_map = {} create_debug_handle_to_op_node_mapping( op_graph_dict[EDGE_DIALECT_GRAPH_KEY], debug_handle_to_op_node_map, ) for event_block in self.event_blocks: for event in event_block.events: event._associate_with_op_graph_nodes(debug_handle_to_op_node_map) def print_data_tabular(self, include_units: bool = True) -> None: """ Displays the underlying EventBlocks in a structured tabular format, with each row representing an Event. Args: include_units: Whether headers should include units (default true) Returns: None """ def style_text_size(val, size=12): return f"font-size: {size}px" df_list = [ event_block.to_dataframe(include_units=include_units) for event_block in self.event_blocks ] combined_df = pd.concat(df_list, ignore_index=True) # Filter out some columns and rows for better readability when printing filtered_column_df = combined_df.drop(columns=EXCLUDED_COLUMNS_WHEN_PRINTING) filtered_df = filtered_column_df[ ~filtered_column_df["event_name"].isin(EXCLUDED_EVENTS_WHEN_PRINTING) ] try: from IPython import get_ipython from IPython.display import display if get_ipython() is not None: styled_df = filtered_df.style.applymap(style_text_size) display(styled_df) else: raise Exception( "Environment unable to support IPython. Fall back to print()." ) except: print(tabulate(filtered_df, headers="keys", tablefmt="fancy_grid")) # TODO: write unit test def find_total_for_module(self, module_name: str) -> float: """ Returns the total average compute time of all operators within the specified module. Args: module_name: Name of the module to be aggregated against. Returns: Sum of the average compute time (in seconds) of all operators within the module with "module_name". """ total = 0.0 for block in self.event_blocks: for event in block.events: module_hierarchy = event.module_hierarchy.values() for hierarchy in module_hierarchy: if not hierarchy: continue found = any(module_name in key for key in hierarchy.keys()) if found: total += event.perf_data.avg break return total def get_op_list( self, event_block: str, show_delegated_ops: Optional[bool] = True ) -> Dict[str, List[Event]]: """ Return a map of op_types to Events of that op_type """ # TODO: implement return {} def write_tensorboard_artifact(self, path: str) -> None: """ Write to the provided path, the artifacts required for visualization in TensorBoard """ # TODO: implement pass def get_exported_program( self, graph: Optional[str] = None ) -> Optional[ExportedProgram]: """ Access helper for ETRecord, defaults to returning the Edge Dialect program. Args: graph: Optional name of the graph to access. If None, returns the Edge Dialect program. Returns: The ExportedProgram object of "graph". """ if self._etrecord is None: log.warning( "Exported program is only available when a valid etrecord_path was provided at the time of Inspector construction" ) return None return ( self._etrecord.edge_dialect_program if graph is None else self._etrecord.graph_map.get(graph) )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources