Source code for torch.distributed.elastic.metrics.api
#!/usr/bin/env python3# Copyright (c) Facebook, Inc. and its affiliates.# All rights reserved.## This source code is licensed under the BSD-style license found in the# LICENSE file in the root directory of this source tree.importabcimporttimeimportwarningsfromcollectionsimportnamedtuplefromfunctoolsimportwrapsfromtypingimportDict,Optional__all__=['MetricsConfig','MetricHandler','ConsoleMetricHandler','NullMetricHandler','MetricStream','configure','getStream','prof','profile','put_metric','publish_metric','get_elapsed_time_ms','MetricData']MetricData=namedtuple("MetricData",["timestamp","group_name","name","value"])classMetricsConfig:__slots__=["params"]def__init__(self,params:Optional[Dict[str,str]]=None):self.params=paramsifself.paramsisNone:self.params={}
classMetricStream:def__init__(self,group_name:str,handler:MetricHandler):self.group_name=group_nameself.handler=handlerdefadd_value(self,metric_name:str,metric_value:int):self.handler.emit(MetricData(time.time(),self.group_name,metric_name,metric_value))_metrics_map={}_default_metrics_handler:MetricHandler=NullMetricHandler()# pyre-fixme[9]: group has type `str`; used as `None`.
[docs]defconfigure(handler:MetricHandler,group:Optional[str]=None):ifgroupisNone:global_default_metrics_handler# pyre-fixme[9]: _default_metrics_handler has type `NullMetricHandler`; used# as `MetricHandler`._default_metrics_handler=handlerelse:_metrics_map[group]=handler
[docs]defprof(fn=None,group:str="torchelastic"):r""" @profile decorator publishes duration.ms, count, success, failure metrics for the function that it decorates. The metric name defaults to the qualified name (``class_name.def_name``) of the function. If the function does not belong to a class, it uses the leaf module name instead. Usage :: @metrics.prof def x(): pass @metrics.prof(group="agent") def y(): pass """defwrap(f):@wraps(f)defwrapper(*args,**kwargs):key=_get_metric_name(f)try:start=time.time()result=f(*args,**kwargs)put_metric(f"{key}.success",1,group)exceptException:put_metric(f"{key}.failure",1,group)raisefinally:put_metric(f"{key}.duration.ms",get_elapsed_time_ms(start),group)returnresultreturnwrapperiffn:returnwrap(fn)else:returnwrap
defprofile(group=None):""" @profile decorator adds latency and success/failure metrics to any given function. Usage :: @metrics.profile("my_metric_group") def some_function(<arguments>): """warnings.warn("Deprecated, use @prof instead",DeprecationWarning)defwrap(func):@wraps(func)defwrapper(*args,**kwargs):try:start_time=time.time()result=func(*args,**kwargs)publish_metric(group,f"{func.__name__}.success",1)exceptException:publish_metric(group,f"{func.__name__}.failure",1)raisefinally:publish_metric(group,f"{func.__name__}.duration.ms",get_elapsed_time_ms(start_time),)returnresultreturnwrapperreturnwrap
[docs]defput_metric(metric_name:str,metric_value:int,metric_group:str="torchelastic"):""" Publishes a metric data point. Usage :: put_metric("metric_name", 1) put_metric("metric_name", 1, "metric_group_name") """getStream(metric_group).add_value(metric_name,metric_value)
defpublish_metric(metric_group:str,metric_name:str,metric_value:int):warnings.warn("Deprecated, use put_metric(metric_group)(metric_name, metric_value) instead")metric_stream=getStream(metric_group)metric_stream.add_value(metric_name,metric_value)defget_elapsed_time_ms(start_time_in_seconds:float):""" Returns the elapsed time in millis from the given start time. """end_time=time.time()returnint((end_time-start_time_in_seconds)*1000)
Docs
Access comprehensive developer documentation for PyTorch
To analyze traffic and optimize your experience, we serve cookies on this site. By clicking or navigating, you agree to allow our usage of cookies. As the current maintainers of this site, Facebook’s Cookies Policy applies. Learn more, including about available controls: Cookies Policy.