import logging
from functools import lru_cache
from pathlib import Path
from typing import Dict

import astropy.units as u
import numpy as np
import tables
from astropy.utils.decorators import lazyproperty

from ctapipe.atmosphere import AtmosphereDensityProfile
from ctapipe.instrument.optics import FocalLengthKind

from ..containers import (
from ..core import Container, Field
from ..core.traits import UseEnum
from ..instrument import SubarrayDescription
from ..utils import IndexFinder
from .astropy_helpers import read_table
from .datalevels import DataLevel
from .eventsource import EventSource
from .hdf5tableio import HDF5TableReader, get_column_attrs
from .tableloader import DL2_SUBARRAY_GROUP, DL2_TELESCOPE_GROUP

__all__ = ["HDF5EventSource"]

logger = logging.getLogger(__name__)

    "energy": ReconstructedEnergyContainer,
    "geometry": ReconstructedGeometryContainer,
    "classification": ParticleClassificationContainer,
    "impact": TelescopeImpactParameterContainer,


[docs]def get_hdf5_datalevels(h5file): """Get the data levels present in the hdf5 file""" datalevels = [] if "/r1/event/telescope" in h5file.root: datalevels.append(DataLevel.R1) if "/dl1/event/telescope/images" in h5file.root: datalevels.append(DataLevel.DL1_IMAGES) if "/dl1/event/telescope/parameters" in h5file.root: datalevels.append(DataLevel.DL1_PARAMETERS) if "/dl2" in h5file.root: datalevels.append(DataLevel.DL2) return tuple(datalevels)
def read_atmosphere_density_profile( h5file: tables.File, path="/simulation/service/atmosphere_density_profile" ): """return a subclass of AtmosphereDensityProfile by reading a table in a h5 file Parameters ---------- h5file: tables.File file handle of HDF5 file path: str path in the file where the serialized model is stored as an astropy.table.Table Returns ------- AtmosphereDensityProfile: subclass depending on type of table """ if path not in h5file: return None table = read_table(h5file=h5file, path=path) return AtmosphereDensityProfile.from_table(table)
[docs]class HDF5EventSource(EventSource): """ Event source for files in the ctapipe DL1 format. For general information about the concept of event sources, take a look at the parent class To use this event source, create an instance of this class specifying the file to be read. Looping over the EventSource yields events from the _generate_events method. An event equals an ArrayEventContainer instance. See ctapipe.containers.ArrayEventContainer for details. Attributes ---------- input_url: str Path to the input event file. file: tables.File File object obs_ids: list Observation ids of te recorded runs. For unmerged files, this should only contain a single number. subarray: ctapipe.instrument.SubarrayDescription The subarray configuration of the recorded run. datalevels: Tuple One or both of and depending on the information present in the file. is_simulation: Boolean Whether the events are simulated or observed. simulation_configs: Dict Mapping of obs_id to ctapipe.containers.SimulationConfigContainer if the file contains simulated events. has_simulated_dl1: Boolean Whether the file contains simulated camera images and/or image parameters evaluated on these. """ focal_length_choice = UseEnum( FocalLengthKind, default_value=FocalLengthKind.EFFECTIVE, help=( "If both nominal and effective focal lengths are available, " " which one to use for the `CameraFrame` attached" " to the `CameraGeometry` instances in the `SubarrayDescription`" ", which will be used in CameraFrame to TelescopeFrame coordinate" " transforms. The 'nominal' focal length is the one used during " " the simulation, the 'effective' focal length is computed using specialized " " ray-tracing from a point light source" ), ).tag(config=True) def __init__(self, input_url=None, config=None, parent=None, **kwargs): """ EventSource for dl1 files in the standard DL1 data format Parameters: ----------- input_url : str Path of the file to load config : traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. Set to None if no configuration to pass. parent: Parent from which the config is used. Mutually exclusive with config kwargs """ super().__init__(input_url=input_url, config=config, parent=parent, **kwargs) self.file_ = tables.open_file(self.input_url) self._full_subarray = SubarrayDescription.from_hdf( self.input_url, focal_length_choice=self.focal_length_choice, ) if self.allowed_tels: self._subarray = self._full_subarray.select_subarray(self.allowed_tels) else: self._subarray = self._full_subarray self._simulation_configs = self._parse_simulation_configs() ( self._scheduling_block, self._observation_block, ) = self._parse_sb_and_ob_configs() version = self.file_.root._v_attrs["CTA PRODUCT DATA MODEL VERSION"] self.datamodel_version = tuple(map(int, version.lstrip("v").split("."))) def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.file_.close()
[docs] @staticmethod def is_compatible(file_path): path = Path(file_path).expanduser() if not path.is_file(): return False with"rb") as f: magic_number = if magic_number != b"\x89HDF\r\n\x1a\n": return False with tables.open_file(path) as f: metadata = f.root._v_attrs if "CTA PRODUCT DATA MODEL VERSION" not in metadata._v_attrnames: return False version = metadata["CTA PRODUCT DATA MODEL VERSION"] if version not in COMPATIBLE_DATA_MODEL_VERSIONS: logger.error( "File is a ctapipe HDF5 file but has unsupported data model" f" version {version}" f", supported versions are {COMPATIBLE_DATA_MODEL_VERSIONS}." " You may need to downgrade ctapipe (if the file version is older)" ", update ctapipe (if the file version is newer) or" " reproduce the file with your current ctapipe version." ) return False if "CTA PRODUCT DATA LEVELS" not in metadata._v_attrnames: return False # we can now read both R1 and DL1 datalevels = set(metadata["CTA PRODUCT DATA LEVELS"].split(",")) if not datalevels.intersection( ("R1", "DL1_IMAGES", "DL1_PARAMETERS", "DL2") ): return False return True
@property def is_simulation(self): """ True for files with a simulation group at the root of the file. """ return "simulation" in self.file_.root @property def has_simulated_dl1(self): """ True for files with telescope-wise event information in the simulation group """ if self.is_simulation: if "telescope" in self.file_.root.simulation.event: return True return False @property def subarray(self): return self._subarray @lazyproperty def datalevels(self): return get_hdf5_datalevels(self.file_) @lazyproperty def atmosphere_density_profile(self) -> AtmosphereDensityProfile: return read_atmosphere_density_profile(self.file_) @lazyproperty def obs_ids(self): return list(np.unique(self.file_.root.dl1.event.subarray.trigger.col("obs_id"))) @property def scheduling_blocks(self) -> Dict[int, SchedulingBlockContainer]: return self._scheduling_block @property def observation_blocks(self) -> Dict[int, ObservationBlockContainer]: return self._observation_block @property def simulation_config(self) -> Dict[int, SimulationConfigContainer]: """ Returns the simulation config(s) as a dict mapping obs_id to the respective config. """ return self._simulation_configs def __len__(self): n_events = len(self.file_.root.dl1.event.subarray.trigger) if self.max_events is not None: return min(n_events, self.max_events) return n_events def _parse_simulation_configs(self): """ Construct a dict of SimulationConfigContainers from the These are used to match the correct header to each event """ # Just returning next(reader) would work as long as there are no merged files # The reader ignores obs_id making the setup somewhat tricky # This is ugly but supports multiple headers so each event can have # the correct mcheader assigned by matching the obs_id # Alternatively this becomes a flat list # and the obs_id matching part needs to be done in _generate_events() class ObsIdContainer(Container): default_prefix = "" obs_id = Field(-1) if "simulation" in self.file_.root.configuration: reader = HDF5TableReader(self.file_).read( "/configuration/simulation/run", containers=(SimulationConfigContainer, ObsIdContainer), ) return {index.obs_id: config for (config, index) in reader} else: return {} def _parse_sb_and_ob_configs(self): """read Observation and Scheduling block configurations""" sb_reader = HDF5TableReader(self.file_).read( "/configuration/observation/scheduling_block", containers=SchedulingBlockContainer, ) scheduling_blocks = {sb.sb_id: sb for sb in sb_reader} ob_reader = HDF5TableReader(self.file_).read( "/configuration/observation/observation_block", containers=ObservationBlockContainer, ) observation_blocks = {ob.obs_id: ob for ob in ob_reader} return scheduling_blocks, observation_blocks def _is_hillas_in_camera_frame(self): parameters_group = self.file_.root.dl1.event.telescope.parameters telescope_tables = parameters_group._v_children.values() # in case of no parameters, it doesn't matter, we just return False if len(telescope_tables) == 0: return False # check the first telescope table one_telescope = parameters_group._v_children.values()[0] return "camera_frame_hillas_intensity" in one_telescope.colnames def _generator(self): """ Yield ArrayEventContainer to iterate through events. """ self.reader = HDF5TableReader(self.file_) if DataLevel.R1 in self.datalevels: waveform_readers = { f"/r1/event/telescope/{}", R1CameraContainer ) for table in self.file_.root.r1.event.telescope } if DataLevel.DL1_IMAGES in self.datalevels: ignore_columns = {"parameters"} # if there are no parameters, there are no image_mask, avoids warnings if DataLevel.DL1_PARAMETERS not in self.datalevels: ignore_columns.add("image_mask") image_readers = { f"/dl1/event/telescope/images/{}", DL1CameraContainer, ignore_columns=ignore_columns, ) for table in self.file_.root.dl1.event.telescope.images } if self.has_simulated_dl1: simulated_image_iterators = { self.file_.root.simulation.event.telescope.images[ ].iterrows() for table in self.file_.root.simulation.event.telescope.images } if DataLevel.DL1_PARAMETERS in self.datalevels: hillas_cls = HillasParametersContainer timing_cls = TimingParametersContainer hillas_prefix = "hillas" timing_prefix = "timing" if self._is_hillas_in_camera_frame(): hillas_cls = CameraHillasParametersContainer timing_cls = CameraTimingParametersContainer hillas_prefix = "camera_frame_hillas" timing_prefix = "camera_frame_timing" param_readers = { f"/dl1/event/telescope/parameters/{}", containers=( hillas_cls, timing_cls, LeakageContainer, ConcentrationContainer, MorphologyContainer, IntensityStatisticsContainer, PeakTimeStatisticsContainer, ), prefixes=[ hillas_prefix, timing_prefix, "leakage", "concentration", "morphology", "intensity", "peak_time", ], ) for table in self.file_.root.dl1.event.telescope.parameters } if self.has_simulated_dl1: simulated_param_readers = { f"/simulation/event/telescope/parameters/{}", containers=[ hillas_cls, LeakageContainer, ConcentrationContainer, MorphologyContainer, IntensityStatisticsContainer, ], prefixes=[ f"true_{hillas_prefix}", "true_leakage", "true_concentration", "true_morphology", "true_intensity", ], ) for table in self.file_.root.dl1.event.telescope.parameters } dl2_readers = {} if DL2_SUBARRAY_GROUP in self.file_.root: dl2_group = self.file_.root[DL2_SUBARRAY_GROUP] for kind, group in dl2_group._v_children.items(): try: container = DL2_CONTAINERS[kind] except KeyError: self.log.warning("Unknown DL2 stereo group %s", kind) continue dl2_readers[kind] = { algorithm: HDF5TableReader(self.file_).read( table._v_pathname, containers=container, prefixes=(algorithm,), ) for algorithm, table in group._v_children.items() } dl2_tel_readers = {} if DL2_TELESCOPE_GROUP in self.file_.root: dl2_group = self.file_.root[DL2_TELESCOPE_GROUP] for kind, group in dl2_group._v_children.items(): try: container = DL2_CONTAINERS[kind] except KeyError: self.log.warning("Unknown DL2 telescope group %s", kind) continue dl2_tel_readers[kind] = {} for algorithm, algorithm_group in group._v_children.items(): dl2_tel_readers[kind][algorithm] = { key: HDF5TableReader(self.file_).read( table._v_pathname, containers=container, ) for key, table in algorithm_group._v_children.items() } true_impact_readers = {} if self.is_simulation: # simulated shower wide information mc_shower_reader = HDF5TableReader(self.file_).read( "/simulation/event/subarray/shower", SimulatedShowerContainer, prefixes="true", ) if "impact" in self.file_.root.simulation.event.telescope: true_impact_readers = { f"/simulation/event/telescope/impact/{}", containers=TelescopeImpactParameterContainer, prefixes=["true_impact"], ) for table in self.file_.root.simulation.event.telescope.impact } # Setup iterators for the array events events = HDF5TableReader(self.file_).read( "/dl1/event/subarray/trigger", [TriggerContainer, EventIndexContainer], ignore_columns={"tel"}, ) telescope_trigger_reader = HDF5TableReader(self.file_).read( "/dl1/event/telescope/trigger", [TelEventIndexContainer, TelescopeTriggerContainer], ignore_columns={"trigger_pixels"}, ) array_pointing_finder = IndexFinder( self.file_.root.dl1.monitoring.subarray.pointing.col("time") ) tel_pointing_finder = { IndexFinder(table.col("time")) for table in self.file_.root.dl1.monitoring.telescope.pointing } counter = 0 for trigger, index in events: data = ArrayEventContainer( trigger=trigger, count=counter, index=index, simulation=SimulatedEventContainer() if self.is_simulation else None, ) # Maybe take some other metadata, but there are still some 'unknown' # written out by the stage1 tool data.meta["origin"] = self.file_.root._v_attrs["CTA PROCESS TYPE"] data.meta["input_url"] = self.input_url data.meta["max_events"] = self.max_events data.trigger.tels_with_trigger = self._full_subarray.tel_mask_to_tel_ids( data.trigger.tels_with_trigger ) full_tels_with_trigger = data.trigger.tels_with_trigger.copy() if self.allowed_tels: data.trigger.tels_with_trigger = np.intersect1d( data.trigger.tels_with_trigger, np.array(list(self.allowed_tels)) ) # the telescope trigger table contains triggers for all telescopes # that participated in the event, so we need to read a row for each # of them, ignoring the ones not in allowed_tels after reading for tel_id in full_tels_with_trigger: tel_index, tel_trigger = next(telescope_trigger_reader) if self.allowed_tels and tel_id not in self.allowed_tels: continue[tel_index.tel_id] = tel_trigger # this needs to stay *after* reading the telescope trigger table if len(data.trigger.tels_with_trigger) == 0: continue self._fill_array_pointing(data, array_pointing_finder) self._fill_telescope_pointing(data, tel_pointing_finder) if self.is_simulation: data.simulation.shower = next(mc_shower_reader) for tel_id in key = f"tel_{tel_id:03d}" if self.allowed_tels and tel_id not in self.allowed_tels: continue if key in true_impact_readers:[tel_id].impact = next(true_impact_readers[key]) if DataLevel.R1 in self.datalevels:[tel_id] = next(waveform_readers[key]) if self.has_simulated_dl1: simulated =[tel_id] if DataLevel.DL1_IMAGES in self.datalevels: if key not in image_readers: logger.debug( f"Triggered telescope {tel_id} is missing " "from the image table." ) continue[tel_id] = next(image_readers[key]) if self.has_simulated_dl1: if key not in simulated_image_iterators: logger.warning( f"Triggered telescope {tel_id} is missing " "from the simulated image table, but was present at the " "reconstructed image table." ) continue simulated_image_row = next(simulated_image_iterators[key]) simulated.true_image = simulated_image_row["true_image"] if DataLevel.DL1_PARAMETERS in self.datalevels: if f"tel_{tel_id:03d}" not in param_readers: logger.debug( f"Triggered telescope {tel_id} is missing " "from the parameters table." ) continue # Is there a smarter way to unpack this? # Best would probbaly be if we could directly read # into the ImageParametersContainer params = next(param_readers[key])[tel_id].parameters = ImageParametersContainer( hillas=params[0], timing=params[1], leakage=params[2], concentration=params[3], morphology=params[4], intensity_statistics=params[5], peak_time_statistics=params[6], ) if self.has_simulated_dl1: if f"tel_{tel_id:03d}" not in simulated_param_readers: logger.debug( f"Triggered telescope {tel_id} is missing " "from the simulated parameters table, but was " "present at the reconstructed parameters table." ) continue simulated_params = next( simulated_param_readers[f"tel_{tel_id:03d}"] ) simulated.true_parameters = ImageParametersContainer( hillas=simulated_params[0], leakage=simulated_params[1], concentration=simulated_params[2], morphology=simulated_params[3], intensity_statistics=simulated_params[4], ) for kind, algorithms in dl2_tel_readers.items(): c = getattr([tel_id], kind) for algorithm, readers in algorithms.items(): c[algorithm] = next(readers[key]) # change prefix to new data model if kind == "impact" and self.datamodel_version == (4, 0, 0): prefix = f"{algorithm}_tel_{c[algorithm].default_prefix}" c[algorithm].prefix = prefix for kind, readers in dl2_readers.items(): c = getattr(data.dl2.stereo, kind) for algorithm, reader in readers.items(): c[algorithm] = next(reader) yield data counter += 1 @lazyproperty def _subarray_pointing_attrs(self): table = self.file_.root.dl1.monitoring.subarray.pointing return get_column_attrs(table) @lru_cache(maxsize=1000) def _telescope_pointing_attrs(self, tel_id): pointing_group = self.file_.root.dl1.monitoring.telescope.pointing return get_column_attrs(pointing_group[f"tel_{tel_id:03d}"]) def _fill_array_pointing(self, data, array_pointing_finder): """ Fill the array pointing information of a given event """ # Only unique pointings are stored, so wont work as easily # Thats why we match the pointings based on trigger time closest_time_index = array_pointing_finder.closest(data.trigger.time.mjd) table = self.file_.root.dl1.monitoring.subarray.pointing array_pointing = table[closest_time_index] data.pointing.array_azimuth = u.Quantity( array_pointing["array_azimuth"], self._subarray_pointing_attrs["array_azimuth"]["UNIT"], ) data.pointing.array_altitude = u.Quantity( array_pointing["array_altitude"], self._subarray_pointing_attrs["array_altitude"]["UNIT"], ) data.pointing.array_ra = u.Quantity( array_pointing["array_ra"], self._subarray_pointing_attrs["array_ra"]["UNIT"], ) data.pointing.array_dec = u.Quantity( array_pointing["array_dec"], self._subarray_pointing_attrs["array_dec"]["UNIT"], ) def _fill_telescope_pointing(self, data, tel_pointing_finder): """ Fill the telescope pointing information of a given event """ # Same comments as to _fill_array_pointing apply pointing_group = self.file_.root.dl1.monitoring.telescope.pointing for tel_id in if self.allowed_tels and tel_id not in self.allowed_tels: continue tel_pointing_table = pointing_group[f"tel_{tel_id:03d}"] closest_time_index = tel_pointing_finder[f"tel_{tel_id:03d}"].closest([tel_id].time.mjd ) pointing_telescope = tel_pointing_table[closest_time_index] attrs = self._telescope_pointing_attrs(tel_id)[tel_id].azimuth = u.Quantity( pointing_telescope["azimuth"], attrs["azimuth"]["UNIT"], )[tel_id].altitude = u.Quantity( pointing_telescope["altitude"], attrs["altitude"]["UNIT"], )