Shortcuts

Source code for ts.torch_handler.request_envelope.kservev2

"""
The KServe Envelope is used to handle the KServe
Input Request inside Torchserve.
"""
import json
import logging

import numpy as np

from .base import BaseEnvelope

logger = logging.getLogger(__name__)

_DatatypeToNumpy = {
    "BOOL": "bool",
    "UINT8": "uint8",
    "UINT16": "uint16",
    "UINT32": "uint32",
    "UINT64": "uint64",
    "INT8": "int8",
    "INT16": "int16",
    "INT32": "int32",
    "INT64": "int64",
    "FP16": "float16",
    "FP32": "float32",
    "FP64": "float64",
    "BYTES": "byte",
}

_NumpyToDatatype = {value: key for key, value in _DatatypeToNumpy.items()}

# NOTE: numpy has more types than v2 protocol
_NumpyToDatatype["object"] = "BYTES"

# Adding support for unicode string
# Ref: https://numpy.org/doc/stable/reference/arrays.dtypes.html
_NumpyToDatatype["U"] = "BYTES"


def _to_dtype(datatype: str) -> "np.dtype":
    dtype = _DatatypeToNumpy[datatype]
    return np.dtype(dtype)


def _to_datatype(dtype: np.dtype) -> str:
    as_str = str(dtype)
    if as_str not in _NumpyToDatatype:
        as_str = getattr(dtype, "kind")
    datatype = _NumpyToDatatype[as_str]

    return datatype


[docs]class KServev2Envelope(BaseEnvelope): """Implementation. Captures batches in KServe v2 protocol format, returns also in FServing v2 protocol format. """
[docs] def parse_input(self, data): """Translates KServe request input to list of data expected by Torchserve. Parameters: data (json): KServe v2 request input json. { "inputs": [{ "name": "input-0", "shape": [37], "datatype": "INT64", "data": [66, 108, 111, 111, 109] }] } Returns: list of data objects. [{ 'name': 'input-0', 'shape': [5], 'datatype': 'INT64', 'data': [66, 108, 111, 111, 109] }] """ logger.debug("Parsing input in KServe v2 format %s", data) inputs = self._batch_from_json(data) logger.debug("KServev2 parsed inputs %s", inputs) return inputs
def _batch_from_json(self, rows): """ Joins the instances of a batch of JSON objects """ logger.debug("Parse input data %s", rows) body_list = [ body_list.get("data") or body_list.get("body") for body_list in rows ] data_list = self._from_json(body_list) return data_list def _from_json(self, body_list): """ Extracts the data from the JSON object """ if isinstance(body_list[0], (bytes, bytearray)): body_list = [json.loads(body.decode("utf8")) for body in body_list] logger.debug("Bytes array is %s", body_list) input_names = [] for index, input in enumerate(body_list[0]["inputs"]): if input["datatype"] == "BYTES": body_list[0]["inputs"][index]["data"] = input["data"][0] else: body_list[0]["inputs"][index]["data"] = ( np.array(input["data"]).reshape(tuple(input["shape"])).tolist() ) input_names.append(input["name"]) setattr(self.context, "input_names", input_names) logger.debug("Bytes array is %s", body_list) id = body_list[0].get("id") if id and id.strip(): setattr(self.context, "input_request_id", body_list[0]["id"]) # TODO: Add parameters support # parameters = body_list[0].get("parameters") # if parameters: # setattr(self.context, "input_parameters", body_list[0]["parameters"]) data_list = [inputs_list.get("inputs") for inputs_list in body_list][0] return data_list
[docs] def format_output(self, data): """Translates Torchserve output KServe v2 response format. Parameters: data (list): Torchserve response for handler. Returns: KServe v2 response json. { "id": "f0222600-353f-47df-8d9d-c96d96fa894e", "model_name": "bert", "model_version": "1", "outputs": [{ "name": "input-0", "shape": [1], "datatype": "INT64", "data": [2] }] } """ logger.debug("The Response of KServe v2 format %s", data) response = {} if hasattr(self.context, "input_request_id"): response["id"] = getattr(self.context, "input_request_id") delattr(self.context, "input_request_id") else: response["id"] = self.context.get_request_id(0) # TODO: Add parameters support # if hasattr(self.context, "input_parameters"): # response["parameters"] = getattr(self.context, "input_parameters") # delattr(self.context, "input_parameters") response["model_name"] = self.context.manifest.get("model").get("modelName") response["model_version"] = self.context.manifest.get("model").get( "modelVersion" ) response["outputs"] = self._batch_to_json(data) return [response]
def _batch_to_json(self, data): """ Splits batch output to json objects """ output = [] input_names = getattr(self.context, "input_names") delattr(self.context, "input_names") for index, item in enumerate(data): output.append(self._to_json(item, input_names[index])) return output def _to_json(self, data, input_name): """ Constructs JSON object from data """ output_data = {} data_ndarray = np.array(data).flatten() output_data["name"] = input_name output_data["datatype"] = _to_datatype(data_ndarray.dtype) output_data["data"] = data_ndarray.tolist() output_data["shape"] = data_ndarray.flatten().shape return output_data

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources