Shortcuts

Source code for ignite.contrib.handlers.trains_logger

import numbers
import os
import tempfile
import warnings
from collections import defaultdict
from datetime import datetime
from enum import Enum
from typing import Any, List, Mapping, Optional, Type

import torch

import ignite.distributed as idist
from ignite.contrib.handlers.base_logger import (
    BaseLogger,
    BaseOptimizerParamsHandler,
    BaseOutputHandler,
    BaseWeightsHistHandler,
    BaseWeightsScalarHandler,
)
from ignite.handlers import global_step_from_engine
from ignite.handlers.checkpoint import DiskSaver

__all__ = [
    "TrainsLogger",
    "TrainsSaver",
    "OptimizerParamsHandler",
    "OutputHandler",
    "WeightsScalarHandler",
    "WeightsHistHandler",
    "GradsScalarHandler",
    "GradsHistHandler",
    "global_step_from_engine",
]


[docs]class OutputHandler(BaseOutputHandler): """Helper handler to log engine's output and/or metrics Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the evaluator on the validation dataset and log NLL, Accuracy metrics after # each epoch. We setup `global_step_transform=global_step_from_engine(trainer)` to take the epoch # of the `trainer`: trains_logger.attach( evaluator, log_handler=OutputHandler( tag="validation", metric_names=["nll", "accuracy"], global_step_transform=global_step_from_engine(trainer) ), event_name=Events.EPOCH_COMPLETED ) # or equivalently trains_logger.attach_output_handler( evaluator, event_name=Events.EPOCH_COMPLETED, tag="validation", metric_names=["nll", "accuracy"], global_step_transform=global_step_from_engine(trainer) ) Another example, where model is evaluated every 500 iterations: .. code-block:: python from ignite.contrib.handlers.trains_logger import * @trainer.on(Events.ITERATION_COMPLETED(every=500)) def evaluate(engine): evaluator.run(validation_set, max_epochs=1) # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) def global_step_transform(*args, **kwargs): return trainer.state.iteration # Attach the logger to the evaluator on the validation dataset and log NLL, Accuracy metrics after # every 500 iterations. Since evaluator engine does not have access to the training iteration, we # provide a global_step_transform to return the trainer.state.iteration for the global_step, each time # evaluator metrics are plotted on Trains. trains_logger.attach_output_handler( evaluator, event_name=Events.EPOCH_COMPLETED, tag="validation", metrics=["nll", "accuracy"], global_step_transform=global_step_transform ) Args: tag (str): common title for all produced plots. For example, "training" metric_names (list of str, optional): list of metric names to plot or a string "all" to plot all available metrics. output_transform (callable, optional): output transform function to prepare `engine.state.output` as a number. For example, `output_transform = lambda output: output` This function can also return a dictionary, e.g `{"loss": loss1, "another_loss": loss2}` to label the plot with corresponding keys. global_step_transform (callable, optional): global step transform function to output a desired global step. Input of the function is `(engine, event_name)`. Output of function should be an integer. Default is None, global_step based on attached engine. If provided, uses function output as global_step. To setup global step from another engine, please use :meth:`~ignite.contrib.handlers.trains_logger.global_step_from_engine`. Note: Example of `global_step_transform`: .. code-block:: python def global_step_transform(engine, event_name): return engine.state.get_event_attrib_value(event_name) """ def __init__(self, tag, metric_names=None, output_transform=None, global_step_transform=None): super(OutputHandler, self).__init__(tag, metric_names, output_transform, global_step_transform) def __call__(self, engine, logger, event_name): if not isinstance(logger, TrainsLogger): raise RuntimeError("Handler OutputHandler works only with TrainsLogger") metrics = self._setup_output_metrics(engine) global_step = self.global_step_transform(engine, event_name) if not isinstance(global_step, int): raise TypeError( "global_step must be int, got {}." " Please check the output of global_step_transform.".format(type(global_step)) ) for key, value in metrics.items(): if isinstance(value, numbers.Number) or isinstance(value, torch.Tensor) and value.ndimension() == 0: logger.trains_logger.report_scalar(title=self.tag, series=key, iteration=global_step, value=value) elif isinstance(value, torch.Tensor) and value.ndimension() == 1: for i, v in enumerate(value): logger.trains_logger.report_scalar( title="{}/{}".format(self.tag, key), series=str(i), iteration=global_step, value=v.item() ) else: warnings.warn("TrainsLogger output_handler can not log metrics value type {}".format(type(value)))
[docs]class OptimizerParamsHandler(BaseOptimizerParamsHandler): """Helper handler to log optimizer parameters Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the trainer to log optimizer's parameters, e.g. learning rate at each iteration trains_logger.attach( trainer, log_handler=OptimizerParamsHandler(optimizer), event_name=Events.ITERATION_STARTED ) # or equivalently trains_logger.attach_opt_params_handler( trainer, event_name=Events.ITERATION_STARTED, optimizer=optimizer ) Args: optimizer (torch.optim.Optimizer or object): torch optimizer or any object with attribute ``param_groups`` as a sequence. param_name (str): parameter name tag (str, optional): common title for all produced plots. For example, "generator" """ def __init__(self, optimizer, param_name="lr", tag=None): super(OptimizerParamsHandler, self).__init__(optimizer, param_name, tag) def __call__(self, engine, logger, event_name): if not isinstance(logger, TrainsLogger): raise RuntimeError("Handler OptimizerParamsHandler works only with TrainsLogger") global_step = engine.state.get_event_attrib_value(event_name) tag_prefix = "{}/".format(self.tag) if self.tag else "" params = { str(i): float(param_group[self.param_name]) for i, param_group in enumerate(self.optimizer.param_groups) } for k, v in params.items(): logger.trains_logger.report_scalar( title="{}{}".format(tag_prefix, self.param_name), series=k, value=v, iteration=global_step )
[docs]class WeightsScalarHandler(BaseWeightsScalarHandler): """Helper handler to log model's weights as scalars. Handler iterates over named parameters of the model, applies reduction function to each parameter produce a scalar and then logs the scalar. Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the trainer to log model's weights norm after each iteration trains_logger.attach( trainer, event_name=Events.ITERATION_COMPLETED, log_handler=WeightsScalarHandler(model, reduction=torch.norm) ) Args: model (torch.nn.Module): model to log weights reduction (callable): function to reduce parameters into scalar tag (str, optional): common title for all produced plots. For example, "generator" """ def __init__(self, model, reduction=torch.norm, tag=None): super(WeightsScalarHandler, self).__init__(model, reduction, tag=tag) def __call__(self, engine, logger, event_name): if not isinstance(logger, TrainsLogger): raise RuntimeError("Handler WeightsScalarHandler works only with TrainsLogger") global_step = engine.state.get_event_attrib_value(event_name) tag_prefix = "{}/".format(self.tag) if self.tag else "" for name, p in self.model.named_parameters(): if p.grad is None: continue title_name, _, series_name = name.partition(".") logger.trains_logger.report_scalar( title="{}weights_{}/{}".format(tag_prefix, self.reduction.__name__, title_name), series=series_name, value=self.reduction(p.data), iteration=global_step, )
[docs]class WeightsHistHandler(BaseWeightsHistHandler): """Helper handler to log model's weights as histograms. Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the trainer to log model's weights norm after each iteration trains_logger.attach( trainer, event_name=Events.ITERATION_COMPLETED, log_handler=WeightsHistHandler(model) ) Args: model (torch.nn.Module): model to log weights tag (str, optional): common title for all produced plots. For example, 'generator' """ def __init__(self, model, tag=None): super(WeightsHistHandler, self).__init__(model, tag=tag) def __call__(self, engine, logger, event_name): if not isinstance(logger, TrainsLogger): raise RuntimeError("Handler 'WeightsHistHandler' works only with TrainsLogger") global_step = engine.state.get_event_attrib_value(event_name) tag_prefix = "{}/".format(self.tag) if self.tag else "" for name, p in self.model.named_parameters(): if p.grad is None: continue title_name, _, series_name = name.partition(".") logger.grad_helper.add_histogram( title="{}weights_{}".format(tag_prefix, title_name), series=series_name, step=global_step, hist_data=p.grad.detach().cpu().numpy(), )
[docs]class GradsScalarHandler(BaseWeightsScalarHandler): """Helper handler to log model's gradients as scalars. Handler iterates over the gradients of named parameters of the model, applies reduction function to each parameter produce a scalar and then logs the scalar. Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the trainer to log model's weights norm after each iteration trains_logger.attach( trainer, event_name=Events.ITERATION_COMPLETED, log_handler=GradsScalarHandler(model, reduction=torch.norm) ) Args: model (torch.nn.Module): model to log weights reduction (callable): function to reduce parameters into scalar tag (str, optional): common title for all produced plots. For example, "generator" """ def __init__(self, model, reduction=torch.norm, tag=None): super(GradsScalarHandler, self).__init__(model, reduction, tag=tag) def __call__(self, engine, logger, event_name): if not isinstance(logger, TrainsLogger): raise RuntimeError("Handler GradsScalarHandler works only with TrainsLogger") global_step = engine.state.get_event_attrib_value(event_name) tag_prefix = "{}/".format(self.tag) if self.tag else "" for name, p in self.model.named_parameters(): if p.grad is None: continue title_name, _, series_name = name.partition(".") logger.trains_logger.report_scalar( title="{}grads_{}/{}".format(tag_prefix, self.reduction.__name__, title_name), series=series_name, value=self.reduction(p.data), iteration=global_step, )
[docs]class GradsHistHandler(BaseWeightsHistHandler): """Helper handler to log model's gradients as histograms. Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the trainer to log model's weights norm after each iteration trains_logger.attach( trainer, event_name=Events.ITERATION_COMPLETED, log_handler=GradsHistHandler(model) ) Args: model (torch.nn.Module): model to log weights tag (str, optional): common title for all produced plots. For example, 'generator' """ def __init__(self, model, tag=None): super(GradsHistHandler, self).__init__(model, tag=tag) def __call__(self, engine, logger, event_name): if not isinstance(logger, TrainsLogger): raise RuntimeError("Handler 'GradsHistHandler' works only with TrainsLogger") global_step = engine.state.get_event_attrib_value(event_name) tag_prefix = "{}/".format(self.tag) if self.tag else "" for name, p in self.model.named_parameters(): if p.grad is None: continue title_name, _, series_name = name.partition(".") logger.grad_helper.add_histogram( title="{}grads_{}".format(tag_prefix, title_name), series=series_name, step=global_step, hist_data=p.grad.detach().cpu().numpy(), )
[docs]class TrainsLogger(BaseLogger): """ `Trains <https://github.com/allegroai/trains>`_ handler to log metrics, text, model/optimizer parameters, plots during training and validation. Also supports model checkpoints logging and upload to the storage solution of your choice (i.e. Trains File server, S3 bucket etc.) .. code-block:: bash pip install trains trains-init Args: project_name (str): The name of the project in which the experiment will be created. If the project does not exist, it is created. If ``project_name`` is ``None``, the repository name is used. (Optional) task_name (str): The name of Task (experiment). If ``task_name`` is ``None``, the Python experiment script's file name is used. (Optional) task_type (str): Optional. The task type. Valid values are: - ``TaskTypes.training`` (Default) - ``TaskTypes.train`` - ``TaskTypes.testing`` - ``TaskTypes.inference`` Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * # Create a logger trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) # Attach the logger to the trainer to log training loss at each iteration trains_logger.attach_output_handler( trainer, event_name=Events.ITERATION_COMPLETED, tag="training", output_transform=lambda loss: {"loss": loss} ) # Attach the logger to the evaluator on the training dataset and log NLL, Accuracy metrics after each epoch # We setup `global_step_transform=global_step_from_engine(trainer)` to take the epoch # of the `trainer` instead of `train_evaluator`. trains_logger.attach_output_handler( train_evaluator, event_name=Events.EPOCH_COMPLETED, tag="training", metric_names=["nll", "accuracy"], global_step_transform=global_step_from_engine(trainer), ) # Attach the logger to the evaluator on the validation dataset and log NLL, Accuracy metrics after # each epoch. We setup `global_step_transform=global_step_from_engine(trainer)` to take the epoch of the # `trainer` instead of `evaluator`. trains_logger.attach_output_handler( evaluator, event_name=Events.EPOCH_COMPLETED, tag="validation", metric_names=["nll", "accuracy"], global_step_transform=global_step_from_engine(trainer)), ) # Attach the logger to the trainer to log optimizer's parameters, e.g. learning rate at each iteration trains_logger.attach_opt_params_handler( trainer, event_name=Events.ITERATION_STARTED, optimizer=optimizer, param_name='lr' # optional ) # Attach the logger to the trainer to log model's weights norm after each iteration trains_logger.attach( trainer, event_name=Events.ITERATION_COMPLETED, log_handler=WeightsScalarHandler(model) ) """ def __init__(self, *_, **kwargs): try: from trains import Task from trains.binding.frameworks.tensorflow_bind import WeightsGradientHistHelper except ImportError: raise RuntimeError( "This contrib module requires trains to be installed. " "You may install trains using: \n pip install trains \n" ) experiment_kwargs = {k: v for k, v in kwargs.items() if k not in ("project_name", "task_name", "task_type",)} if self.bypass_mode(): warnings.warn("TrainsSaver: running in bypass mode") class _Stub(object): def __call__(self, *_, **__): return self def __getattr__(self, attr): if attr in ("name", "id"): return "" return self def __setattr__(self, attr, val): pass self._task = _Stub() else: self._task = Task.init( project_name=kwargs.get("project_name"), task_name=kwargs.get("task_name"), task_type=kwargs.get("task_type", Task.TaskTypes.training), **experiment_kwargs, ) self.trains_logger = self._task.get_logger() self.grad_helper = WeightsGradientHistHelper(logger=self.trains_logger,)
[docs] @classmethod def set_bypass_mode(cls, bypass: bool) -> None: """ Will bypass all outside communication, and will drop all logs. Should only be used in "standalone mode", when there is no access to the *trains-server*. Args: bypass: If ``True``, all outside communication is skipped. """ setattr(cls, "_bypass", bypass)
[docs] @classmethod def bypass_mode(cls) -> bool: """ Returns the bypass mode state. Note: `GITHUB_ACTIONS` env will automatically set bypass_mode to ``True`` unless overridden specifically with ``TrainsLogger.set_bypass_mode(False)``. Return: If True, all outside communication is skipped. """ return getattr(cls, "_bypass", bool(os.environ.get("CI")))
def close(self): self.trains_logger.flush() def _create_output_handler(self, *args, **kwargs): return OutputHandler(*args, **kwargs) def _create_opt_params_handler(self, *args, **kwargs): return OptimizerParamsHandler(*args, **kwargs)
[docs]class TrainsSaver(DiskSaver): """ Handler that saves input checkpoint as Trains artifacts Args: logger (TrainsLogger, optional): An instance of :class:`~ignite.contrib.handlers.TrainsLogger`, ensuring a valid Trains ``Task`` has been initialized. If not provided, and a Trains Task has not been manually initialized, a runtime error will be raised. output_uri (str, optional): The default location for output models and other artifacts uploaded by Trains. For more information, see ``trains.Task.init``. dirname (str, optional): Directory path where the checkpoint will be saved. If not provided, a temporary directory will be created. Examples: .. code-block:: python from ignite.contrib.handlers.trains_logger import * from ignite.handlers import Checkpoint trains_logger = TrainsLogger( project_name="pytorch-ignite-integration", task_name="cnn-mnist" ) to_save = {"model": model} handler = Checkpoint( to_save, TrainsSaver(), n_saved=1, score_function=lambda e: 123, score_name="acc", filename_prefix="best", global_step_transform=global_step_from_engine(trainer) ) validation_evaluator.add_event_handler(Events.EVENT_COMPLETED, handler) """ def __init__(self, logger: TrainsLogger = None, output_uri: str = None, dirname: str = None, *args, **kwargs): self._setup_check_trains(logger, output_uri) if not dirname: dirname = "" if idist.get_rank() == 0: dirname = tempfile.mkdtemp( prefix="ignite_checkpoints_{}".format(datetime.now().strftime("%Y_%m_%d_%H_%M_%S_")) ) if idist.get_world_size() > 1: dirname = idist.all_gather(dirname)[0] warnings.warn("TrainsSaver created a temporary checkpoints directory: {}".format(dirname)) idist.barrier() # Let's set non-atomic tmp dir saving behaviour if "atomic" not in kwargs: kwargs["atomic"] = False self._checkpoint_slots = defaultdict(list) super(TrainsSaver, self).__init__(dirname=dirname, *args, **kwargs) @idist.one_rank_only() def _setup_check_trains(self, logger, output_uri): try: from trains import Task except ImportError: raise RuntimeError( "This contrib module requires trains to be installed. " "You may install trains using: \n pip install trains \n" ) if logger and not isinstance(logger, TrainsLogger): raise TypeError("logger must be an instance of TrainsLogger") self._task = Task.current_task() if not self._task: raise RuntimeError( "TrainsSaver requires a Trains Task to be initialized." "Please use the `logger` argument or call `trains.Task.init()`." ) if output_uri: self._task.output_uri = output_uri class _CallbacksContext: def __init__( self, callback_type: Type[Enum], slots: List, checkpoint_key: str, filename: str, basename: str, metadata: Optional[Mapping] = None, ): self._callback_type = callback_type self._slots = slots self._checkpoint_key = str(checkpoint_key) self._filename = filename self._basename = basename self._metadata = metadata def pre_callback(self, action: str, model_info: Any): if action != self._callback_type.save: return model_info try: slot = self._slots.index(None) self._slots[slot] = model_info.upload_filename except ValueError: self._slots.append(model_info.upload_filename) slot = len(self._slots) - 1 model_info.upload_filename = "{}_{}{}".format(self._basename, slot, os.path.splitext(self._filename)[1]) model_info.local_model_id = "{}:{}".format(self._checkpoint_key, model_info.upload_filename) return model_info def post_callback(self, action: str, model_info: Any): if action != self._callback_type.save: return model_info model_info.model.name = "{}: {}".format(model_info.task.name, self._filename) prefix = "Checkpoint Metadata: " metadata = "{}{}".format( prefix, ", ".join("{}={}".format(k, v) for k, v in self._metadata.items()) if self._metadata else "none", ) comment = "\n".join( metadata if line.startswith(prefix) else line for line in (model_info.model.comment or "").split("\n") ) if prefix not in comment: comment += "\n" + metadata model_info.model.comment = comment return model_info def __call__(self, checkpoint: Mapping, filename: str, metadata: Optional[Mapping] = None) -> None: try: from trains import Model from trains.binding.frameworks import WeightsFileHandler except ImportError: raise RuntimeError( "This contrib module requires trains to be installed. " "You may install trains using: \n pip install trains \n" ) try: basename = metadata["basename"] except (TypeError, KeyError): warnings.warn("Checkpoint metadata missing or basename cannot be found") basename = "checkpoint" checkpoint_key = (self.dirname, basename) cb_context = self._CallbacksContext( callback_type=WeightsFileHandler.CallbackType, slots=self._checkpoint_slots[checkpoint_key], checkpoint_key=str(checkpoint_key), filename=filename, basename=basename, metadata=metadata, ) pre_cb_id = WeightsFileHandler.add_pre_callback(cb_context.pre_callback) post_cb_id = WeightsFileHandler.add_post_callback(cb_context.post_callback) try: super(TrainsSaver, self).__call__(checkpoint, filename, metadata) finally: WeightsFileHandler.remove_pre_callback(pre_cb_id) WeightsFileHandler.remove_post_callback(post_cb_id)
[docs] @idist.one_rank_only() def get_local_copy(self, filename: str) -> Optional[str]: """Get artifact local copy. .. warning:: In distributed configuration this method should be called on rank 0 process. Args: filename (str): artifact name. Returns: a local path to a downloaded copy of the artifact """ artifact = self._task.artifacts.get(filename) if artifact: return artifact.get_local_copy() self._task.get_logger().report_text("Can not find artifact {}".format(filename))
[docs] @idist.one_rank_only() def remove(self, filename: str) -> None: super(TrainsSaver, self).remove(filename) for slots in self._checkpoint_slots.values(): try: slots[slots.index(filename)] = None except ValueError: pass else: break

© Copyright 2025, PyTorch-Ignite Contributors. Last updated on 01/04/2025, 10:24:02 AM.

Built with Sphinx using a theme provided by Read the Docs.