Source code for ctapipe.reco.hillas_reconstructor

Line-intersection-based fitting for reconstruction of direction
and core position of a shower.

import warnings
from itertools import combinations

import numpy as np
from astropy import units as u
from astropy.coordinates import AltAz, Longitude, SkyCoord, cartesian_to_spherical

from ..containers import CameraHillasParametersContainer, ReconstructedGeometryContainer
from ..coordinates import (
from .reconstructor import (

__all__ = ["HillasReconstructor"]

INVALID = ReconstructedGeometryContainer(

def angle(v1, v2):
    """computes the angle between two vectors
        assuming cartesian coordinates

    v1 : numpy array
    v2 : numpy array

    the angle between v1 and v2 as a dimensioned astropy quantity
    norm = np.linalg.norm(v1, axis=-1) * np.linalg.norm(v2, axis=-1)
    return np.arccos(np.clip(np.sum(v1 * v2, axis=-1) / norm, -1.0, 1.0))

def normalise(vec):
    """Sets the length of the vector to 1
        without changing its direction

    vec : numpy array

    numpy array with the same direction but length of 1
    norm = np.linalg.norm(vec, axis=-1)
    if vec.ndim == 1:
            return vec / norm
        except ZeroDivisionError:
            return vec

    result = np.zeros(vec.shape)
    mask = norm > 0
    result[mask] = vec[mask] / norm[mask, np.newaxis]
    return result

def line_line_intersection_3d(uvw_vectors, origins):
    Intersection of many lines in 3d.
    C = []
    S = []
    for n, pos in zip(uvw_vectors, origins):
        n = n.reshape((3, 1))
        norm_matrix = (n @ n.T) - np.eye(3)
        C.append(norm_matrix @ pos)

    S = np.array(S).sum(axis=0)
    C = np.array(C).sum(axis=0)
    return np.linalg.inv(S) @ C

[docs]class HillasReconstructor(GeometryReconstructor): """ class that reconstructs the direction of an atmospheric shower using a simple hillas parametrisation of the camera images it provides a direction estimate in two steps and an estimate for the shower's impact position on the ground. so far, it does neither provide an energy estimator nor an uncertainty on the reconstructed parameters """ def __init__(self, subarray, **kwargs): super().__init__(subarray=subarray, **kwargs) _cam_radius_m = { cam: cam.geometry.guess_radius().to_value(u.m) for cam in subarray.camera_types } telframe = TelescopeFrame() _cam_radius_deg = {} for cam, radius_m in _cam_radius_m.items(): point_cam = SkyCoord(0, radius_m, unit=u.m, frame=cam.geometry.frame) point_tel = point_cam.transform_to(telframe) _cam_radius_deg[cam] = point_tel.fov_lon.to_value(u.deg) # store for each tel_id to avoid costly hash of camera self._cam_radius_m = { tel_id: _cam_radius_m[] for tel_id, t in } self._cam_radius_deg = { tel_id: _cam_radius_deg[] for tel_id, t in }
[docs] def __call__(self, event): """ Perform the full shower geometry reconstruction. Parameters ---------- event : `~ctapipe.containers.ArrayEventContainer` The event, needs to have dl1 parameters. Will be filled with the corresponding dl2 containers, reconstructed stereo geometry and telescope-wise impact position. """ warnings.filterwarnings(action="ignore", category=MissingFrameAttributeWarning) try: hillas_dict = self._create_hillas_dict(event) except (TooFewTelescopesException, InvalidWidthException): event.dl2.stereo.geometry[self.__class__.__name__] = INVALID self._store_impact_parameter(event) return # Here we perform some basic quality checks BEFORE applying reconstruction # This should be substituted by a DL1 QualityQuery specific to this # reconstructor ( tel_ids, cog_cartesian, p2_cartesian, corrected_psi, weights, telescope_positions, array_pointing, ) = self.initialize_arrays(event, hillas_dict) norm = np.cross(cog_cartesian, p2_cartesian) # store core corrected psi values for tel_id, psi in zip(tel_ids, corrected_psi):[tel_id].parameters.core.psi = u.Quantity( np.rad2deg(psi), u.deg ) # algebraic direction estimate direction, err_est_dir = self.estimate_direction(norm, weights) # array pointing is needed to define the tilted frame core_pos_ground, core_pos_tilted = self.estimate_core_position( array_pointing, corrected_psi, telescope_positions ) # container class for reconstructed showers _, lat, lon = cartesian_to_spherical(*direction) # estimate max height of shower h_max = self.estimate_h_max(cog_cartesian, telescope_positions) # az is clockwise, lon counter-clockwise, make sure it stays in [0, 2pi) az = Longitude(-lon) event.dl2.stereo.geometry[ self.__class__.__name__ ] = ReconstructedGeometryContainer( alt=lat, az=az, core_x=core_pos_ground.x, core_y=core_pos_ground.y, core_tilted_x=core_pos_tilted.x, core_tilted_y=core_pos_tilted.y, telescopes=tel_ids.tolist(), average_intensity=np.mean([h.intensity for h in hillas_dict.values()]), is_valid=True, alt_uncert=err_est_dir, az_uncert=err_est_dir, h_max=h_max, prefix=self.__class__.__name__, ) self._store_impact_parameter(event)
[docs] def initialize_arrays(self, event, hillas_dict): """ Creates flat arrays of needed quantities from the event structure Parameters ---------- hillas_dict : dictionary dictionary of hillas moments subarray : ctapipe.instrument.SubarrayDescription subarray information telescopes_pointings: dictionary dictionary of pointing direction per each telescope array_pointing: SkyCoord[AltAz] pointing direction of the array Notes ----- The part of the algorithm taking into account divergent pointing mode and the correction to the psi angle is explained in [gasparetto]_ section 7.1.4. """ # get one telescope id to check what frame to use altaz = AltAz() # Due to tracking the pointing of the array will never be a constant array_pointing = SkyCoord( az=event.pointing.array_azimuth, alt=event.pointing.array_altitude, frame=altaz, ) # create arrays / quantities of all needed things for all telescopes # so we can do transformations vectorized cog1 = np.empty(len(hillas_dict)) cog2 = np.empty(len(hillas_dict)) cam_radius = np.empty(len(hillas_dict)) psi = np.empty(len(hillas_dict)) weights = np.empty(len(hillas_dict)) focal_length = np.empty(len(hillas_dict)) alt = np.empty(len(hillas_dict)) az = np.empty(len(hillas_dict)) tel_ids = np.empty(len(hillas_dict), dtype=int) hillas_in_camera_frame = False for i, (tel_id, hillas) in enumerate(hillas_dict.items()): tel_ids[i] = tel_id pointing =[tel_id] alt[i] = pointing.altitude.to_value(u.rad) az[i] = pointing.azimuth.to_value(u.rad) if isinstance(hillas, CameraHillasParametersContainer): hillas_in_camera_frame = True cog1[i] = hillas.x.to_value(u.m) cog2[i] = hillas.y.to_value(u.m) cam_radius[i] = self._cam_radius_m[tel_id] else: cog1[i] = hillas.fov_lon.to_value(u.deg) cog2[i] = hillas.fov_lat.to_value(u.deg) cam_radius[i] = self._cam_radius_deg[tel_id] psi[i] = hillas.psi.to_value(u.rad) weights[i] = hillas.intensity * hillas.length.value / hillas.width.value camera =[tel_id].camera focal_length[i] = camera.geometry.frame.focal_length.to_value(u.m) indices = self.subarray.tel_index_array[tel_ids] telescope_positions = self.subarray.tel_coords[indices] telescope_pointings = SkyCoord(alt=alt, az=az, unit=u.rad, frame=altaz) focal_length = u.Quantity(focal_length, u.m, copy=False) camera_frame = CameraFrame( telescope_pointing=telescope_pointings, focal_length=focal_length ) telescope_frame = TelescopeFrame(telescope_pointing=telescope_pointings) p2_1 = cog1 + 0.1 * cam_radius * np.cos(psi) p2_2 = cog2 + 0.1 * cam_radius * np.sin(psi) if hillas_in_camera_frame: cog_coord = SkyCoord(x=cog1, y=cog2, unit=u.m, frame=camera_frame) p2_coord = SkyCoord(x=p2_1, y=p2_2, unit=u.m, frame=camera_frame) else: p2_coord = SkyCoord( fov_lon=p2_1, fov_lat=p2_2, unit=u.deg, frame=telescope_frame ) cog_coord = SkyCoord( fov_lon=cog1, fov_lat=cog2, unit=u.deg, frame=telescope_frame ) cog_coord = cog_coord.transform_to(altaz) p2_coord = p2_coord.transform_to(altaz) cog_cart = altaz_to_righthanded_cartesian(alt=cog_coord.alt, p2_cart = altaz_to_righthanded_cartesian(alt=p2_coord.alt, pseudo_nominal = CameraFrame( telescope_pointing=array_pointing, focal_length=focal_length ) cog_cam = cog_coord.transform_to(pseudo_nominal) p2_cam = p2_coord.transform_to(pseudo_nominal) corrected_psi = np.arctan( (cog_cam.y.to_value(u.m) - p2_cam.y.to_value(u.m)) / (cog_cam.x.to_value(u.m) - p2_cam.x.to_value(u.m)) ) return ( tel_ids, cog_cart, p2_cart, corrected_psi, weights, telescope_positions, array_pointing, )
[docs] @staticmethod def estimate_direction(norm, weight): """calculates the origin of the gamma as the weighted average direction of the intersections of all hillas planes Returns ------- gamma : shape (3) numpy array direction of origin of the reconstructed shower as a 3D vector crossings : shape (n,3) list an error estimate """ index_a, index_b = np.array(list(combinations(range(len(norm)), 2))).T crossings = np.cross(norm[index_a], norm[index_b]) mask = crossings[:, 2] < 0 crossings[mask] = -crossings[mask] result = np.average( crossings, weights=weight[index_a] * weight[index_b], axis=0 ) result = normalise(result) off_angles = angle(result, crossings) err_est_dir = np.mean(off_angles) return result, u.Quantity(np.rad2deg(err_est_dir), u.deg)
[docs] @staticmethod def estimate_core_position(array_pointing, psi, positions): """ Estimate the core position by intersection the major ellipse lines of each telescope. Parameters ---------- hillas_dict: dict[HillasContainer] dictionary of hillas moments array_pointing: SkyCoord[HorizonFrame] Pointing direction of the array Returns ------- core_x: u.Quantity estimated x position of impact core_y: u.Quantity estimated y position of impact Notes ----- The part of the algorithm taking into account divergent pointing mode and the usage of a corrected psi angle is explained in [gasparetto]_ section 7.1.4. """ # Since psi has been recalculated in the fake CameraFrame # it doesn't need any further corrections because it is now independent # of both pointing and cleaning/parametrization frame. # This angle will be used to visualize the telescope-wise directions of # the shower core the ground. # Estimate the position of the shower's core # from the TiltedFram to the GroundFrame z = np.zeros(len(psi)) uvw_vectors = np.column_stack([np.cos(psi), np.sin(psi), z]) tilted_frame = TiltedGroundFrame(pointing_direction=array_pointing) positions_tilted = positions.transform_to(tilted_frame) core_position = line_line_intersection_3d( uvw_vectors, positions_tilted.to_value(u.m), ) core_position = u.Quantity(core_position, u.m, copy=False) core_pos_tilted = SkyCoord( x=core_position[0], y=core_position[1], z=u.Quantity(0.0, u.m), frame=tilted_frame, ) core_pos_ground = project_to_ground(core_pos_tilted) return core_pos_ground, core_pos_tilted
[docs] @staticmethod def estimate_h_max(cog_vectors, positions): """ Estimate the max height by intersecting the lines of the cog directions of each telescope. Returns ------- astropy.unit.Quantity the estimated max height """ positions = # not sure if its better to return the length of the vector of the z component return u.Quantity( np.linalg.norm(line_line_intersection_3d(cog_vectors, positions)), u.m, )