# Source code for ctapipe.utils.unstructured_interpolator

```
"""
This class is largely created as a more versatile alternative to the Linear ND
interpolator, allowing the user to pass the interpolator an array of classes and then
perform interpolation between the results of a member function of that class. Currently
the name of the target interpolation function is passed in a string at initialisation.
TODO:
- Figure out what to do when out of bounds of interpolation range
- Create function to append points to the interpolator
"""
import numpy as np
import numpy.ma as ma
from scipy.ndimage import map_coordinates
from scipy.spatial import Delaunay
[docs]class UnstructuredInterpolator:
"""
This class performs linear interpolation between an unstructured set of data
points. However the class is expanded such that it can interpolate between the
values returned from class member functions. The primary use case of this being to
interpolate between the predictions of a set of machine learning algorithms or
regular grid interpolators.
In the case that a numpy array is passed as the interpolation values this class will
behave exactly the same as the scipy LinearNDInterpolator
"""
def __init__(
self,
interpolation_points,
function_name=None,
remember_last=False,
bounds=None,
dtype=None,
):
"""
Parameters
----------
interpolation_points: dict
Dictionary of interpolation points (stored as key) and values
function_name: str
Name of class member function to call in the case we are interpolating
between class predictions, for numpy arrays leave blank
"""
self.keys = np.array(list(interpolation_points.keys()))
if dtype:
self.values = np.array(list(interpolation_points.values()), dtype=dtype)
else:
self.values = np.array(list(interpolation_points.values()))
self._n_dimensions = len(self.keys[0])
# create an object with triangulation
self._tri = Delaunay(self.keys)
self._function_name = function_name
# OK this code is horrid and will need fixing
self._numpy_input = (
isinstance(self.values[0], np.ndarray)
or issubclass(type(self.values[0]), np.floating)
or issubclass(type(self.values[0]), np.integer)
)
if self._numpy_input is False and function_name is None:
self._function_name = "__call__"
self._remember = remember_last
self.reset()
self._bounds = bounds
[docs] def reset(self):
"""
Function used to reset some class values stored after previous event
"""
self._previous_v = None
self._previous_m = None
self._previous_shape = None
[docs] def __call__(self, points, eval_points=None):
# Convert to a numpy array here incase we get a list
points = np.array(points)
if len(points.shape) == 1:
points = np.array([points])
# First find simplexes that contain interpolated points
# In
if self._remember and self._previous_v is not None:
previous_keys = self.keys[self._previous_v.ravel()]
hull = Delaunay(previous_keys)
if np.all(eval_points is not None):
shape_check = eval_points.shape == self._previous_shape
else:
shape_check = True
if np.all(hull.find_simplex(points) >= 0) and shape_check:
v = self._previous_v
m = self._previous_m
else:
s = self._tri.find_simplex(points)
v = self._tri.simplices[s]
m = self._tri.transform[s]
self._previous_v = v
self._previous_m = m
if np.all(eval_points is not None):
self._previous_shape = eval_points.shape
else:
s = self._tri.find_simplex(points)
# get the vertices for each simplex
v = self._tri.simplices[s]
# get transform matrices for each simplex
m = self._tri.transform[s]
self._previous_v = v
self._previous_m = m
if np.all(eval_points is not None):
self._previous_shape = eval_points.shape
# Here comes some serious numpy magic, it could be done with a loop but would
# be pretty inefficient I had to rip this from stack overflow - RDP
# For each interpolated point, take the the transform matrix and multiply it by
# the vector p-r, where r=m[:,n,:] is one of the simplex vertices to which
# the matrix m is related to
b = np.einsum(
"ijk,ik->ij",
m[:, : self._n_dimensions, : self._n_dimensions],
points - m[:, self._n_dimensions, :],
)
# Use the above array to get the weights for the vertices; `b` contains an
# n-dimensional vector with weights for all but the last vertices of the simplex
# (note that for n-D grid, each simplex consists of n+1 vertices);
# the remaining weight for the last vertex can be copmuted from
# the condition that sum of weights must be equal to 1
w = np.c_[b, 1 - b.sum(axis=1)]
if self._numpy_input:
if eval_points is None:
selected_points = self.values[v]
else:
selected_points = self._numpy_interpolation(v, eval_points)
else:
selected_points = self._call_class_function(v, eval_points)
# Multiply point values by weight
p_values = np.einsum("ij...,ij...->i...", selected_points, w)
# print(time.time() - t)
return p_values
def _call_class_function(self, point_num, eval_points):
"""
Function to loop over class function and return array of outputs
Parameters
----------
point_num: int
Index of class position in values list
eval_points: ndarray
Inputs used to evaluate class member function
Returns
-------
ndarray: output from member function
"""
outputs = list()
shape = point_num.shape
three_dim = False
if len(eval_points.shape) > 2:
first_index = np.arange(point_num.shape[0])[..., np.newaxis] * np.ones_like(
point_num
)
first_index = first_index.ravel()
three_dim = True
num = 0
for pt in point_num.ravel():
cls = self.values[pt]
cls_function = getattr(cls, self._function_name)
pt = eval_points
if three_dim:
pt = eval_points[first_index[num]]
outputs.append(cls_function(pt))
num += 1
outputs = np.array(outputs)
new_shape = (*shape, *outputs.shape[1:])
outputs = outputs.reshape(new_shape)
return outputs
def _numpy_interpolation(self, point_num, eval_points):
"""
Parameters
----------
point_num: int
Index of class position in values list
eval_points: ndarray
Inputs used to evaluate class member function
Returns
-------
ndarray: output from member function
"""
is_masked = ma.is_masked(eval_points)
shape = point_num.shape
ev_shape = eval_points.shape
vals = self.values[point_num.ravel()]
eval_points = np.repeat(eval_points, shape[1], axis=0)
it = np.arange(eval_points.shape[0])
it = np.repeat(it, eval_points.shape[1], axis=0)
eval_points = eval_points.reshape(
eval_points.shape[0] * eval_points.shape[1], eval_points.shape[-1]
)
scaled_points = eval_points.T
if is_masked:
mask = np.invert(ma.getmask(scaled_points[0]))
else:
mask = np.ones_like(scaled_points[0], dtype=bool)
it = ma.masked_array(it, mask)
scaled_points[0] = (
(scaled_points[0] - (self._bounds[0][0]))
/ (self._bounds[0][1] - self._bounds[0][0])
) * (vals.shape[-2] - 1)
scaled_points[1] += (
(scaled_points[1] - (self._bounds[1][0]))
/ (self._bounds[1][1] - self._bounds[1][0])
) * (vals.shape[-1] - 1)
scaled_points = np.vstack((it, scaled_points))
output = np.zeros(scaled_points.T.shape[:-1])
output[mask] = map_coordinates(vals, scaled_points.T[mask].T, order=1)
new_shape = (*shape, ev_shape[-2])
output = output.reshape(new_shape)
return ma.masked_array(output, mask=mask)
```