import abc
from enum import Enum
import numpy as np
import pymoose as pm
from pymoose.predictors import predictor
from pymoose.predictors import predictor_utils
[docs]class PostTransform(Enum):
"""Variants of output processing for linear classification."""
NONE = 1
SIGMOID = 2
SOFTMAX = 3
[docs]class LinearPredictor(predictor.Predictor, metaclass=abc.ABCMeta):
def __init__(self, coeffs, intercepts=None):
super().__init__()
self.coeffs, self.intercepts = _validate_model_args(coeffs, intercepts)
[docs] @classmethod
@abc.abstractmethod
def from_onnx(cls, model_proto):
pass
[docs] @abc.abstractmethod
def post_transform(self, y):
pass
[docs] @classmethod
def bias_trick(cls, x, plc, dtype):
"""Construct a vector of 1s broadcastable to an input matrix ``x``."""
bias_shape = pm.shape(x, placement=plc)[0:1]
bias = pm.ones(bias_shape, dtype=pm.float64, placement=plc)
reshaped_bias = pm.expand_dims(bias, 1, placement=plc)
return pm.cast(reshaped_bias, dtype=dtype, placement=plc)
[docs] def predictor_fn(self, x, fixedpoint_dtype):
"""Compute the core logic of a linear predictor function."""
if self.intercepts is not None:
w = self.fixedpoint_constant(
np.concatenate([self.intercepts.T, self.coeffs], axis=1).T,
plc=self.mirrored,
dtype=fixedpoint_dtype,
)
bias = self.bias_trick(x, plc=self.bob, dtype=fixedpoint_dtype)
else:
w = self.fixedpoint_constant(
self.coeffs.T, plc=self.mirrored, dtype=fixedpoint_dtype
)
if self.intercepts is not None:
x = pm.concatenate([bias, x], axis=1)
y = pm.dot(x, w)
return y
def __call__(self, x, fixedpoint_dtype=predictor_utils.DEFAULT_FIXED_DTYPE):
"""Compute a single pass of the linear model."""
y = self.predictor_fn(x, fixedpoint_dtype)
return self.post_transform(y)
[docs]class LinearRegressor(LinearPredictor):
"""Linear regression predictor interface.
Args:
coeffs: Array-like convertible to a (n_outputs, n_weights)-shaped ndarray.
intercepts: Optional array-like convertible to a vector.
"""
[docs] def post_transform(self, y):
"""Applies no-op to linear predictor function output."""
return y
[docs] @classmethod
def from_onnx(cls, model_proto):
"""Construct LinearRegressor from a parsed ONNX model.
Args:
model_proto: An ONNX ModelProto containing a LinearRegressor operator node.
Returns:
A LinearRegressor with weighhts and bias terms loaded from the ONNX model.
Raises:
ValueError if ONNX graph is missing expected nodes.
"""
lr_node = predictor_utils.find_node_in_model_proto(
model_proto, "LinearRegressor", enforce=False
)
if lr_node is None:
raise ValueError(
"Incompatible ONNX graph provided: graph must contain a "
"LinearRegressor operator."
)
coeffs_attr = predictor_utils.find_attribute_in_node(lr_node, "coefficients")
if coeffs_attr.type != 6: # FLOATS
raise ValueError(
"LinearRegressor coefficients must be of type FLOATS, found other."
)
coeffs = np.asarray(coeffs_attr.floats)
# extract intercept if it's there, otherwise pass it as None
intercepts_attr = predictor_utils.find_attribute_in_node(
lr_node, "intercepts", enforce=False
)
if intercepts_attr is None:
intercepts = None
elif intercepts_attr.type != 6: # FLOATS
raise ValueError(
"LinearRegressor intercept must be of type FLOATS, found other."
)
else:
intercepts = intercepts_attr.floats
# if n_targets is not None reshape into (n_targets, n_features) matrix
n_targets_ints = predictor_utils.find_attribute_in_node(
lr_node, "targets", enforce=False
)
if n_targets_ints is not None:
n_targets = n_targets_ints.i
coeffs = coeffs.reshape(n_targets, -1)
coeffs_rank = len(coeffs.shape)
if coeffs_rank == 1:
n_coeffs = coeffs.shape[0]
else:
n_coeffs = coeffs.shape[-1]
# `n_features` arg
model_input = model_proto.graph.input[0]
input_shape = predictor_utils.find_input_shape(model_input)
assert len(input_shape) == 2
n_features = input_shape[1].dim_value
if n_features != n_coeffs:
raise ValueError(
f"In the ONNX file, the input shape has {n_features} "
f"features and there are {n_coeffs} coefficients. Validate "
"you set correctly the `initial_types` when converting "
"your model to ONNX."
)
return cls(coeffs=coeffs, intercepts=intercepts)
[docs]class LinearClassifier(LinearPredictor):
"""Linear classifier predictor interface.
Args:
coeffs: Array-like convertible to a (n_outputs, n_weights)-shaped ndarray.
intercepts: Optional array-like convertible to a vector.
post_transform: a PostTransform enum variant describing how to convert the raw
linear model scores into probabilistic classification outputs.
"""
def __init__(self, coeffs, intercepts=None, post_transform=None):
super().__init__(coeffs, intercepts)
n_classes = self.coeffs.shape[0]
# infer post_transform
if post_transform == post_transform.NONE:
self._post_transform = lambda x: x
elif post_transform == post_transform.SIGMOID and n_classes == 2:
self._post_transform = lambda x: pm.sigmoid(x)
elif post_transform == post_transform.SIGMOID and n_classes > 2:
self._post_transform = lambda x: self._normalized_sigmoid(x, axis=1)
elif post_transform == post_transform.SOFTMAX:
self._post_transform = lambda x: pm.softmax(
x, axis=1, upmost_index=n_classes
)
else:
raise ValueError("Could not infer post-transform in LinearClassifier")
[docs] @classmethod
def from_onnx(cls, model_proto):
"""Construct LinearClassifier from a parsed ONNX model.
Args:
model_proto: An ONNX ModelProto containing a LinearClassifier operator node.
Returns:
A LinearClassifier with parameters and model configuration loaded from the
ONNX model.
Raises:
ValueError if ONNX graph is missing expected nodes.
RuntimeError if ONNX LinearClassifier node has an unsupported post-transform
function attribute.
"""
lc_node = predictor_utils.find_node_in_model_proto(
model_proto, "LinearClassifier", enforce=False
)
if lc_node is None:
raise ValueError(
"Incompatible ONNX graph provided: graph must contain a "
"LinearClassifier operator."
)
# parse classifier coefficients
coeffs_attr = predictor_utils.find_attribute_in_node(
lc_node, "coefficients", enforce=False
)
assert coeffs_attr is not None
if coeffs_attr.type != 6: # FLOATS
raise ValueError(
"LinearClassifier coefficients must be of type FLOATS, found other."
)
coeffs = np.asarray(coeffs_attr.floats)
# reshape into (n_classes, n_features) matrix
classlabels_ints = predictor_utils.find_attribute_in_node(
lc_node, "classlabels_ints", enforce=False
)
classlabels_strings = predictor_utils.find_attribute_in_node(
lc_node, "classlabels_strings", enforce=False
)
assert classlabels_ints is not None or classlabels_strings is not None
if classlabels_ints is not None:
classlabels = classlabels_ints.ints
elif classlabels_strings is not None:
classlabels = classlabels_strings.strings
n_classes = len(classlabels)
coeffs = coeffs.reshape(n_classes, -1)
n_coeffs = coeffs.shape[1]
# `n_features` arg
model_input = model_proto.graph.input[0]
input_shape = predictor_utils.find_input_shape(model_input)
assert len(input_shape) == 2
n_features = input_shape[1].dim_value
if n_features != n_coeffs:
raise ValueError(
f"In the ONNX file, the input shape has {n_features} "
f"features and there are {n_coeffs} coefficients. Validate "
"you set correctly the `initial_types` when converting "
"your model to ONNX."
)
# parse classifier intercepts
intercepts_attr = predictor_utils.find_attribute_in_node(
lc_node, "intercepts", enforce=False
)
if intercepts_attr is None:
intercepts = None
elif intercepts_attr.type != 6: # FLOATS
raise ValueError(
"LinearClassifier intercept must be of type FLOATS, found other."
)
else:
intercepts = np.asarray(intercepts_attr.floats).reshape(1, n_classes)
# derive transform_output
post_transform = predictor_utils.find_attribute_in_node(
lc_node, "post_transform"
)
post_transform_str = post_transform.s.decode()
if post_transform_str == "NONE":
post_transform = PostTransform.NONE
elif post_transform_str == "LOGISTIC":
post_transform = PostTransform.SIGMOID
elif post_transform_str == "SOFTMAX":
post_transform = PostTransform.SOFTMAX
else:
raise RuntimeError(
f"{post_transform_str} post_transform is unsupported for "
"LinearClassifier."
)
return cls(
coeffs=coeffs,
intercepts=intercepts,
post_transform=post_transform,
)
[docs] def post_transform(self, y):
"""Applies the classifier's post transform function to a tensor.
Actual function used depends on value of ``PostTransform`` and number of classes
provided at instantiation.
Args:
y: A tensor
Returns:
A tensor.
"""
return self._post_transform(y)
def _normalized_sigmoid(self, x, axis):
"""
For some sklearn linear classifiers, this is used to get values that sum to one
instead of the usual softmax.
"""
y = pm.sigmoid(x)
y_sum = pm.expand_dims(pm.sum(y, axis), axis)
return pm.div(y, y_sum)
def _validate_model_args(coeffs, intercepts):
coeffs = _interpret_coeffs(coeffs)
intercepts = _interpret_intercepts(intercepts)
if intercepts is not None and coeffs.shape[0] != intercepts.shape[-1]:
raise ValueError(
"Shape mismatch between model coefficients and intercepts: "
f"Intercepts size of {coeffs.shape[0]} inferred from coefficients, "
f"found {intercepts.shape[-1]}."
)
return coeffs, intercepts
def _interpret_coeffs(coeffs):
coeffs = np.asarray(coeffs, dtype=np.float64)
coeffs_shape = coeffs.shape
if len(coeffs_shape) == 1:
return np.expand_dims(coeffs, 0)
elif len(coeffs_shape) == 2:
return coeffs
raise ValueError(
f"Coeffs must be convertible to a rank-2 tensor, found shape of {coeffs_shape}."
)
def _interpret_intercepts(intercepts):
if intercepts is None:
return intercepts
intercepts = np.asarray(intercepts, dtype=np.float64)
intercepts_shape = intercepts.shape
if len(intercepts_shape) == 1:
return np.expand_dims(intercepts, 0)
elif len(intercepts_shape) == 2:
if intercepts_shape[0] != 1:
pass
else:
return intercepts
raise ValueError(
f"Intercept must be convertible to a vector, found shape of {intercepts_shape}."
)