Shortcuts

Source code for ts.service

"""
CustomService class definitions
"""
import logging
import os
import time
from builtins import str

import ts
from ts.context import Context, RequestProcessor
from ts.protocol.otf_message_handler import create_predict_response
from ts.utils.util import PredictionException, get_yaml_config

PREDICTION_METRIC = "PredictionTime"
logger = logging.getLogger(__name__)


[docs]class Service(object): """ Wrapper for custom entry_point """ def __init__( self, model_name, model_dir, manifest, entry_point, gpu, batch_size, limit_max_image_pixels=True, metrics_cache=None, ): model_yaml_config = {} if manifest is not None and "model" in manifest: model = manifest["model"] if "configFile" in model: model_yaml_config_file = model["configFile"] model_yaml_config = get_yaml_config( os.path.join(model_dir, model_yaml_config_file) ) self._context = Context( model_name, model_dir, manifest, batch_size, gpu, ts.__version__, limit_max_image_pixels, metrics_cache, model_yaml_config, ) self._entry_point = entry_point @property def context(self): return self._context
[docs] @staticmethod def retrieve_data_for_inference(batch): """ REQUEST_INPUT = { "requestId" : "111-222-3333", "parameters" : [ PARAMETER ] } PARAMETER = { "name" : parameter name "contentType": "http-content-types", "value": "val1" } :param batch: :return: """ if batch is None: raise ValueError("Received invalid inputs") req_to_id_map = {} headers = [] input_batch = [] for batch_idx, request_batch in enumerate(batch): req_id = request_batch.get("requestId").decode("utf-8") parameters = request_batch["parameters"] model_in_headers = {} model_in = {} # Parameter level headers are updated here. multipart/form-data can have multiple headers. for parameter in parameters: model_in.update({parameter["name"]: parameter["value"]}) model_in_headers.update( {parameter["name"]: {"content-type": parameter["contentType"]}} ) # Request level headers are populated here if request_batch.get("headers") is not None: for h in request_batch.get("headers"): model_in_headers.update( {h["name"].decode("utf-8"): h["value"].decode("utf-8")} ) headers.append(RequestProcessor(model_in_headers)) input_batch.append(model_in) req_to_id_map[batch_idx] = req_id return headers, input_batch, req_to_id_map
[docs] def set_cl_socket(self, cl_socket): self.cl_socket = cl_socket
[docs] def predict(self, batch): """ PREDICT COMMAND = { "command": "predict", "batch": [ REQUEST_INPUT ] } :param batch: list of request :return: """ headers, input_batch, req_id_map = Service.retrieve_data_for_inference(batch) self.context.request_ids = req_id_map self.context.request_processor = headers metrics = self.context.metrics metrics.request_ids = req_id_map self.context.cl_socket = self.cl_socket start_time = time.time() # noinspection PyBroadException try: ret = self._entry_point(input_batch, self.context) except MemoryError: logger.error("System out of memory", exc_info=True) return create_predict_response(None, req_id_map, "Out of resources", 507) except PredictionException as e: logger.error("Prediction error", exc_info=True) return create_predict_response(None, req_id_map, e.message, e.error_code) except Exception as ex: # pylint: disable=broad-except if "CUDA" in str(ex): # Handles Case A: CUDA error: CUBLAS_STATUS_NOT_INITIALIZED (Close to OOM) & # Case B: CUDA out of memory (OOM) logger.error("CUDA out of memory", exc_info=True) return create_predict_response( None, req_id_map, "Out of resources", 507 ) else: logger.warning("Invoking custom service failed.", exc_info=True) return create_predict_response( None, req_id_map, "Prediction failed", 503 ) if not isinstance(ret, list): logger.warning( "model: %s, Invalid return type: %s.", self.context.model_name, type(ret), ) return create_predict_response( None, req_id_map, "Invalid model predict output", 503 ) if len(ret) != len(input_batch): logger.warning( "model: %s, number of batch response mismatched, expect: %d, got: %d.", self.context.model_name, len(input_batch), len(ret), ) return create_predict_response( None, req_id_map, "number of batch response mismatched", 503 ) duration = round((time.time() - start_time) * 1000, 2) metrics.add_time(PREDICTION_METRIC, duration) return create_predict_response( ret, req_id_map, "Prediction success", 200, context=self.context )
[docs]def emit_metrics(metrics): """ Emit the metrics in the provided Dictionary Parameters ---------- metrics: Dictionary A dictionary of all metrics, when key is metric_name value is a metric object """ if metrics: for met in metrics: logger.info("[METRICS]%s", str(met))

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources