Shortcuts

Source code for ignite.engine.events

import numbers
import warnings
import weakref
from collections.abc import Sequence
from enum import Enum
from types import DynamicClassAttribute
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TYPE_CHECKING, Union

from torch.utils.data import DataLoader

from ignite.engine.utils import _check_signature

if TYPE_CHECKING:
    from ignite.engine.engine import Engine

__all__ = ["CallableEventWithFilter", "EventEnum", "Events", "State", "EventsList", "RemovableEventHandle"]


[docs]class CallableEventWithFilter: """Single Event containing a filter, specifying whether the event should be run at the current event (if the event type is correct) Args: value: The actual enum value. Only needed for internal use. Do not touch! event_filter: A function taking the engine and the current event value as input and returning a boolean to indicate whether this event should be executed. Defaults to None, which will result to a function that always returns `True` name: The enum-name of the current object. Only needed for internal use. Do not touch! """ def __init__(self, value: str, event_filter: Optional[Callable] = None, name: Optional[str] = None) -> None: self.filter = event_filter if not hasattr(self, "_value_"): self._value_ = value if not hasattr(self, "_name_") and name is not None: self._name_ = name # copied to be compatible to enum @DynamicClassAttribute def name(self) -> str: """The name of the Enum member.""" return self._name_ @DynamicClassAttribute def value(self) -> str: """The value of the Enum member.""" return self._value_ def __call__( self, event_filter: Optional[Callable] = None, every: Optional[int] = None, once: Optional[Union[int, List]] = None, before: Optional[int] = None, after: Optional[int] = None, ) -> "CallableEventWithFilter": """ Makes the event class callable and accepts either an arbitrary callable as filter (which must take in the engine and current event value and return a boolean) or an every or once value Args: event_filter: a filter function to check if the event should be executed when the event type was fired every: a value specifying how often the event should be fired once: a value or list of values specifying when the event should be fired (if only once) before: a value specifying the number of occurrence that event should be fired before after: a value specifying the number of occurrence that event should be fired after Returns: CallableEventWithFilter: A new event having the same value but a different filter function """ if ( sum( ( event_filter is not None, once is not None, (every is not None or before is not None or after is not None), ) ) != 1 ): raise ValueError("Only one of the input arguments should be specified, except before, after and every") if (event_filter is not None) and not callable(event_filter): raise TypeError("Argument event_filter should be a callable") if (every is not None) and not (isinstance(every, numbers.Integral) and every > 0): raise ValueError("Argument every should be integer and greater than zero") if once is not None: c1 = isinstance(once, numbers.Integral) and once > 0 c2 = isinstance(once, Sequence) and len(once) > 0 and all(isinstance(e, int) and e > 0 for e in once) if not (c1 or c2): raise ValueError( f"Argument once should either be a positive integer or a list of positive integers, got {once}" ) if (before is not None) and not (isinstance(before, numbers.Integral) and before >= 0): raise ValueError("Argument before should be integer and greater or equal to zero") if (after is not None) and not (isinstance(after, numbers.Integral) and after >= 0): raise ValueError("Argument after should be integer and greater or equal to zero") if every is not None: if every == 1: # Just return the event itself event_filter = None else: event_filter = self.every_event_filter(every) if once is not None: event_filter = self.once_event_filter([once] if isinstance(once, int) else once) if before is not None or after is not None: if every is not None: event_filter = self.every_before_and_after_event_filter(every, before, after) else: event_filter = self.before_and_after_event_filter(before, after) # check signature: if event_filter is not None: _check_signature(event_filter, "event_filter", "engine", "event") return CallableEventWithFilter(self.value, event_filter, self.name)
[docs] @staticmethod def every_event_filter(every: int) -> Callable: """A wrapper for every event filter.""" def wrapper(engine: "Engine", event: int) -> bool: if event % every == 0: return True return False return wrapper
[docs] @staticmethod def once_event_filter(once: List) -> Callable: """A wrapper for once event filter.""" def wrapper(engine: "Engine", event: int) -> bool: if event in once: return True return False return wrapper
[docs] @staticmethod def before_and_after_event_filter(before: Optional[int] = None, after: Optional[int] = None) -> Callable: """A wrapper for before and after event filter.""" before_: Union[int, float] = float("inf") if before is None else before after_: int = 0 if after is None else after def wrapper(engine: "Engine", event: int) -> bool: if event > after_ and event < before_: return True return False return wrapper
[docs] @staticmethod def every_before_and_after_event_filter( every: int, before: Optional[int] = None, after: Optional[int] = None ) -> Callable: """A wrapper which triggers for every `every` iterations after `after` and before `before`.""" before_: Union[int, float] = float("inf") if before is None else before after_: int = 0 if after is None else after def wrapper(engine: "Engine", event: int) -> bool: if after_ < event < before_ and (event - after_ - 1) % every == 0: return True return False return wrapper
[docs] @staticmethod def default_event_filter(engine: "Engine", event: int) -> bool: """Default event filter. This method is is deprecated and will be removed. Please, use None instead""" warnings.warn("Events.default_event_filter is deprecated and will be removed. Please, use None instead") return True
def __repr__(self) -> str: out = f"Events.{self.name}" if self.filter is not None: out += f"(filter={self.filter})" return out def __eq__(self, other: Any) -> bool: if isinstance(other, CallableEventWithFilter): return self.name == other.name elif isinstance(other, str): return self.name == other else: return NotImplemented def __hash__(self) -> int: return hash(self._name_) def __or__(self, other: Any) -> "EventsList": return EventsList() | self | other
class CallableEvents(CallableEventWithFilter): # For backward compatibility def __init__(self, *args: Any, **kwargs: Any) -> None: super(CallableEvents, self).__init__(*args, **kwargs) warnings.warn( "Class ignite.engine.events.CallableEvents is deprecated. It will be removed in 0.5.0. " "Please, use ignite.engine.EventEnum instead", DeprecationWarning, )
[docs]class EventEnum(CallableEventWithFilter, Enum): """Base class for all :class:`~ignite.engine.events.Events`. User defined custom events should also inherit this class. Examples: Custom events based on the loss calculation and backward pass can be created as follows: .. code-block:: python from ignite.engine import EventEnum class BackpropEvents(EventEnum): BACKWARD_STARTED = 'backward_started' BACKWARD_COMPLETED = 'backward_completed' OPTIM_STEP_COMPLETED = 'optim_step_completed' def update(engine, batch): # ... loss = criterion(y_pred, y) engine.fire_event(BackpropEvents.BACKWARD_STARTED) loss.backward() engine.fire_event(BackpropEvents.BACKWARD_COMPLETED) optimizer.step() engine.fire_event(BackpropEvents.OPTIM_STEP_COMPLETED) # ... trainer = Engine(update) trainer.register_events(*BackpropEvents) @trainer.on(BackpropEvents.BACKWARD_STARTED) def function_before_backprop(engine): # ... """ pass
[docs]class Events(EventEnum): """Events that are fired by the :class:`~ignite.engine.engine.Engine` during execution. Built-in events: - STARTED : triggered when engine's run is started - EPOCH_STARTED : triggered when the epoch is started - GET_BATCH_STARTED : triggered before next batch is fetched - GET_BATCH_COMPLETED : triggered after the batch is fetched - ITERATION_STARTED : triggered when an iteration is started - ITERATION_COMPLETED : triggered when the iteration is ended - DATALOADER_STOP_ITERATION : engine's specific event triggered when dataloader has no more data to provide - EXCEPTION_RAISED : triggered when an exception is encountered - TERMINATE_SINGLE_EPOCH : triggered when the run is about to end the current epoch, after receiving a :meth:`~ignite.engine.engine.Engine.terminate_epoch()` or :meth:`~ignite.engine.engine.Engine.terminate()` call. - TERMINATE : triggered when the run is about to end completely, after receiving :meth:`~ignite.engine.engine.Engine.terminate()` call. - EPOCH_COMPLETED : triggered when the epoch is ended. Note that this is triggered even when :meth:`~ignite.engine.engine.Engine.terminate_epoch()` is called. - COMPLETED : triggered when engine's run is completed The table below illustrates which events are triggered when various termination methods are called. .. list-table:: :widths: 24 25 33 18 :header-rows: 1 * - Method - EVENT_COMPLETED - TERMINATE_SINGLE_EPOCH - TERMINATE * - no termination - ✔ - ✗ - ✗ * - :meth:`~ignite.engine.engine.Engine.terminate_epoch()` - ✔ - ✔ - ✗ * - :meth:`~ignite.engine.engine.Engine.terminate()` - ✗ - ✔ - ✔ Since v0.3.0, Events become more flexible and allow to pass an event filter to the Engine: .. code-block:: python engine = Engine() # a) custom event filter def custom_event_filter(engine, event): if event in [1, 2, 5, 10, 50, 100]: return True return False @engine.on(Events.ITERATION_STARTED(event_filter=custom_event_filter)) def call_on_special_event(engine): # do something on 1, 2, 5, 10, 50, 100 iterations # b) "every" event filter @engine.on(Events.ITERATION_STARTED(every=10)) def call_every(engine): # do something every 10th iteration # c) "once" event filter @engine.on(Events.ITERATION_STARTED(once=50)) def call_once(engine): # do something on 50th iteration # d) "before" and "after" event filter @engine.on(Events.EPOCH_STARTED(before=30, after=10)) def call_before(engine): # do something in 11 to 29 epoch # e) Mixing "every" and "before" / "after" event filters @engine.on(Events.EPOCH_STARTED(every=5, before=25, after=8)) def call_every_itr_before_after(engine): # do something on 9, 14, 19, 24 epochs Event filter function `event_filter` accepts as input `engine` and `event` and should return True/False. Argument `event` is the value of iteration or epoch, depending on which type of Events the function is passed. Since v0.4.0, user can also combine events with `|`-operator: .. code-block:: python events = Events.STARTED | Events.COMPLETED | Events.ITERATION_STARTED(every=3) engine = ... @engine.on(events) def call_on_events(engine): # do something Since v0.4.0, custom events defined by user should inherit from :class:`~ignite.engine.events.EventEnum` : .. code-block:: python class CustomEvents(EventEnum): FOO_EVENT = "foo_event" BAR_EVENT = "bar_event" """ EPOCH_STARTED = "epoch_started" """triggered when the epoch is started.""" EPOCH_COMPLETED = "epoch_completed" """Event attribute indicating epoch is ended.""" STARTED = "started" """triggered when engine's run is started.""" COMPLETED = "completed" """triggered when engine's run is completed""" ITERATION_STARTED = "iteration_started" """triggered when an iteration is started.""" ITERATION_COMPLETED = "iteration_completed" """triggered when the iteration is ended.""" EXCEPTION_RAISED = "exception_raised" """triggered when an exception is encountered.""" GET_BATCH_STARTED = "get_batch_started" """triggered before next batch is fetched.""" GET_BATCH_COMPLETED = "get_batch_completed" """triggered after the batch is fetched.""" DATALOADER_STOP_ITERATION = "dataloader_stop_iteration" """engine's specific event triggered when dataloader has no more data to provide""" TERMINATE = "terminate" """triggered when the run is about to end completely, after receiving terminate() call.""" TERMINATE_SINGLE_EPOCH = "terminate_single_epoch" """triggered when the run is about to end the current epoch, after receiving a terminate_epoch() call.""" INTERRUPT = "interrupt" """triggered when the run is interrupted, after receiving interrupt() call.""" def __or__(self, other: Any) -> "EventsList": return EventsList() | self | other
[docs]class EventsList: """Collection of events stacked by operator `__or__`. .. code-block:: python events = Events.STARTED | Events.COMPLETED events |= Events.ITERATION_STARTED(every=3) engine = ... @engine.on(events) def call_on_events(engine): # do something or .. code-block:: python @engine.on(Events.STARTED | Events.COMPLETED | Events.ITERATION_STARTED(every=3)) def call_on_events(engine): # do something """ def __init__(self) -> None: self._events: List[Union[Events, CallableEventWithFilter]] = [] def _append(self, event: Union[Events, CallableEventWithFilter]) -> None: if not isinstance(event, (Events, CallableEventWithFilter)): raise TypeError(f"Argument event should be Events or CallableEventWithFilter, got: {type(event)}") self._events.append(event) def __getitem__(self, item: int) -> Union[Events, CallableEventWithFilter]: return self._events[item] def __iter__(self) -> Iterator[Union[Events, CallableEventWithFilter]]: return iter(self._events) def __len__(self) -> int: return len(self._events) def __or__(self, other: Union[Events, CallableEventWithFilter]) -> "EventsList": self._append(event=other) return self
[docs]class State: """An object that is used to pass internal and user-defined state between event handlers. By default, state contains the following attributes: .. code-block:: python state.iteration # 1-based, the first iteration is 1 state.epoch # 1-based, the first epoch is 1 state.seed # seed to set at each epoch state.dataloader # data passed to engine state.epoch_length # optional length of an epoch state.max_epochs # number of epochs to run state.batch # batch passed to `process_function` state.output # output of `process_function` after a single iteration state.metrics # dictionary with defined metrics if any state.times # dictionary with total and per-epoch times fetched on # keys: Events.EPOCH_COMPLETED.name and Events.COMPLETED.name Args: kwargs: keyword arguments to be defined as State attributes. """ event_to_attr: Dict[Union[str, "Events", "CallableEventWithFilter"], str] = { Events.GET_BATCH_STARTED: "iteration", Events.GET_BATCH_COMPLETED: "iteration", Events.ITERATION_STARTED: "iteration", Events.ITERATION_COMPLETED: "iteration", Events.EPOCH_STARTED: "epoch", Events.EPOCH_COMPLETED: "epoch", Events.STARTED: "epoch", Events.COMPLETED: "epoch", } def __init__(self, **kwargs: Any) -> None: self.iteration = 0 self.epoch = 0 self.epoch_length: Optional[int] = None self.max_epochs: Optional[int] = None self.output: Optional[int] = None self.batch: Optional[int] = None self.metrics: Dict[str, Any] = {} self.dataloader: Optional[Union[DataLoader, Iterable[Any]]] = None self.seed: Optional[int] = None self.times: Dict[str, Optional[float]] = { Events.EPOCH_COMPLETED.name: None, Events.COMPLETED.name: None, } for k, v in kwargs.items(): setattr(self, k, v) self._update_attrs() def _update_attrs(self) -> None: for value in self.event_to_attr.values(): if not hasattr(self, value): setattr(self, value, 0)
[docs] def get_event_attrib_value(self, event_name: Union[str, Events, CallableEventWithFilter]) -> int: """Get the value of Event attribute with given `event_name`.""" if event_name not in State.event_to_attr: raise RuntimeError(f"Unknown event name '{event_name}'") return getattr(self, State.event_to_attr[event_name])
def __repr__(self) -> str: s = "State:\n" for attr, value in self.__dict__.items(): if not isinstance(value, (numbers.Number, str)): value = type(value) s += f"\t{attr}: {value}\n" return s
[docs]class RemovableEventHandle: """A weakref handle to remove a registered event. A handle that may be used to remove a registered event handler via the remove method, with-statement, or context manager protocol. Returned from :meth:`~ignite.engine.engine.Engine.add_event_handler`. Args: event_name: Registered event name. handler: Registered event handler, stored as weakref. engine: Target engine, stored as weakref. Examples: .. code-block:: python engine = Engine() def print_epoch(engine): print(f"Epoch: {engine.state.epoch}") with engine.add_event_handler(Events.EPOCH_COMPLETED, print_epoch): # print_epoch handler registered for a single run engine.run(data) # print_epoch handler is now unregistered """ def __init__( self, event_name: Union[CallableEventWithFilter, Enum, EventsList, Events], handler: Callable, engine: "Engine" ) -> None: self.event_name = event_name self.handler = weakref.ref(handler) self.engine = weakref.ref(engine)
[docs] def remove(self) -> None: """Remove handler from engine.""" handler = self.handler() engine = self.engine() if handler is None or engine is None: return if hasattr(handler, "_parent"): handler = handler._parent() if handler is None: raise RuntimeError( "Internal error! Please fill an issue on https://github.com/pytorch/ignite/issues " "if encounter this error. Thank you!" ) if isinstance(self.event_name, EventsList): for e in self.event_name: if engine.has_event_handler(handler, e): engine.remove_event_handler(handler, e) else: if engine.has_event_handler(handler, self.event_name): engine.remove_event_handler(handler, self.event_name)
def __enter__(self) -> "RemovableEventHandle": return self def __exit__(self, *args: Any, **kwargs: Any) -> None: self.remove()

© Copyright 2024, PyTorch-Ignite Contributors. Last updated on 04/17/2024, 8:18:43 PM.

Built with Sphinx using a theme provided by Read the Docs.