Source code for dcnum.read.hdf5_data

from __future__ import annotations

import functools
import hashlib
import json
import numbers
import pathlib
from typing import Any, BinaryIO
import uuid
import warnings

import numpy as np

from ..common import h5py

from .cache import HDF5ImageCache, ImageCorrCache, md5sum
from .const import PROTECTED_FEATURES
from .mapped import get_mapped_object, get_mapping_indices


[docs] class BasinIdentifierMismatchError(BaseException): """Used when basin identifiers do not match"""
[docs] class HDF5Data: """HDF5 (.rtdc) input file data instance""" def __init__(self, path: pathlib.Path | h5py.File | BinaryIO, # type: ignore pixel_size: float | None = None, md5_5m: str | None = None, meta: dict | None = None, basins: list[dict[str, list[str] | str]] | None = None, logs: dict[str, list[str]] | None = None, tables: dict[str, np.ndarray] | None = None, image_cache_size: int = 2, image_chunk_size: int = 1000, index_mapping: int | slice | list | np.ndarray | None = None, ): """ Parameters ---------- path: path to data file pixel_size: pixel size in µm md5_5m: MD5 sum of the first 5 MiB; computed if not provided meta: metadata dictionary; extracted from HDF5 attributes if not provided basins: list of basin dictionaries; extracted from HDF5 attributes if not provided logs: dictionary of logs; extracted from HDF5 attributes if not provided tables: dictionary of tables; extracted from HDF5 attributes if not provided image_cache_size: size of the image cache to use when accessing image data image_chunk_size: maximum number of images in each image cache chunk index_mapping: select only a subset of input events, transparently reducing the size of the dataset, possible data types are - int `N`: use the first `N` events - slice: use the events defined by a slice - list: list of integers specifying the event indices to use Numpy indexing rules apply. E.g. to only process the first 100 events, set this to `100` or `slice(0, 100)`. """ # Init is in __setstate__ so we can pickle this class # and use it for multiprocessing. if isinstance(path, h5py.File): self._h5 = path path = path.filename else: self._h5 = None # Optimize image chunk size with h5py.File(path, "r") as h5: if "events/image" in h5: h5ds = h5["events/image"] if isinstance(h5ds, h5py.Dataset) and h5ds.chunks is not None: # Align the `HDF5ImageCache` chunk size to the chunk size # of the underlying HDF5 dataset. # The alignment is not applied to: # - `h5py.Dataset` data that are stored in contiguous mode # - `MappedHDF5Dataset` instances # Determine the chunk size of the dataset. ds_chunk_size = h5ds.chunks[0] if ds_chunk_size >= image_chunk_size: # Adopt the actual chunk size. Nothing else # makes sense. image_chunk_size = ds_chunk_size else: # Determine the multiples of chunks that comprise # the new chunk_size. divider = image_chunk_size // ds_chunk_size # The new chunk size might be smaller than the # original one. image_chunk_size = divider * ds_chunk_size self.__setstate__({"path": path, "pixel_size": pixel_size, "md5_5m": md5_5m, "meta": meta, "basins": basins, "logs": logs, "tables": tables, "image_cache_size": image_cache_size, "image_chunk_size": image_chunk_size, "index_mapping": index_mapping, })
[docs] def __contains__(self, item): return item in self.keys()
[docs] def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def __getitem__(self, feat): if feat in ["image", "image_bg", "mask"]: data = self.get_image_cache(feat) # already index-mapped if data is None: raise KeyError(f"Feature '{feat}' not found in {self}!") else: return data elif feat in self._cache_scalar: # check for scalar cached return self._cache_scalar[feat] elif (feat in self.h5["events"] and len(self.h5["events"][feat].shape) == 1): # cache scalar if self.index_mapping is None: # no mapping indices, just slice dat_sc = self.h5["events"][feat][:] else: dat_sc = get_mapped_object(self.h5["events"][feat], index_mapping=self.index_mapping)[:] self._cache_scalar[feat] = dat_sc return self._cache_scalar[feat] else: if feat in self.h5["events"]: # Not cached (possibly slow) warnings.warn(f"Feature {feat} not cached (possibly slow)") return get_mapped_object( obj=self.h5["events"][feat], index_mapping=self.index_mapping) else: # Check the basins for idx in range(len(self.basins)): bn_grp, bn_feats, bn_map = self.get_basin_data(idx) if bn_feats and feat in bn_feats: mapped_ds = get_mapped_object(obj=bn_grp[feat], index_mapping=bn_map) return mapped_ds # If we got here, then the feature data does not exist. raise KeyError(f"Feature '{feat}' not found in {self}!")
[docs] def __getstate__(self): return {"path": self.path, "pixel_size": self.pixel_size, "md5_5m": self.md5_5m, "meta": self.meta, "logs": self.logs, "tables": self.tables, "basins": self.basins, "image_cache_size": self.image_cache_size, "image_chunk_size": self.image_chunk_size, "index_mapping": self.index_mapping, "len": self._len, }
[docs] def __setstate__(self, state): # Make sure these properties exist (we rely on __init__, because # we want this class to be pickable and __init__ is not called by # `pickle.load`). # Cached properties self._feats = None self._keys = None self._len = state.get("len", None) # Image cache if not hasattr(self, "_image_cache"): self._image_cache: dict[str, Any] = {} # Basin data if not hasattr(self, "_basin_data"): self._basin_data = {} # Scalar feature cache if not hasattr(self, "_cache_scalar"): self._cache_scalar = {} if not hasattr(self, "_h5"): self._h5 = None path = state["path"] if isinstance(path, str): path = pathlib.Path(path) self.path = path self.md5_5m = state["md5_5m"] if self.md5_5m is None: if isinstance(self.path, pathlib.Path): # 5MB md5sum of input file self.md5_5m = md5sum(self.path, blocksize=65536, count=80) else: self.md5_5m = str(uuid.uuid4()).replace("-", "") self.meta = state["meta"] self.logs = state["logs"] self.tables = state["tables"] self.basins = state["basins"] if (self.meta is None or self.logs is None or self.tables is None or self.basins is None): self.logs = {} self.tables = {} self.basins = [] # get dataset configuration with h5py.File(self.path, libver="latest", ) as h5: # meta self.meta = dict(h5.attrs) for key in self.meta: if isinstance(self.meta[key], bytes): self.meta[key] = self.meta[key].decode("utf-8") # logs for key in sorted(h5.get("logs", {}).keys()): alog = list(h5["logs"][key]) if alog: if isinstance(alog[0], bytes): alog = [ll.decode("utf") for ll in alog] self.logs[key] = alog # tables for tab in sorted(h5.get("tables", {}).keys()): fields = h5["tables"][tab].dtype.fields if fields is None: # No individual curves, but an image array self.tables[tab] = h5["tables"][tab][:] else: # List of curves with predefined dtypes tabdict = {} for tkey in fields.keys(): tabdict[tkey] = \ np.asarray(h5["tables"][tab][tkey]).reshape(-1) self.tables[tab] = tabdict # basins basins = self.extract_basin_dicts(h5) def basin_sort_cmp(a, b): """Sort internal basins before any other basins""" if a["type"] == b["type"]: an = a["name"] bn = b["name"] if an == bn: return 0 if an < bn: return -1 else: return 1 elif a["type"] == "internal": # internal basins should come first return -1 else: return 1 self.basins = sorted(basins, key=functools.cmp_to_key(basin_sort_cmp) ) if state["pixel_size"] is not None: self.pixel_size = state["pixel_size"] self.image_cache_size = state["image_cache_size"] self.image_chunk_size = state["image_chunk_size"] self.index_mapping = state["index_mapping"]
[docs] def __len__(self): if self._len is None: if self.index_mapping is not None: self._len = get_mapping_indices(self.index_mapping).size else: self._len = self.h5.attrs["experiment:event count"] return self._len
@property def h5(self): if self._h5 is None: self._h5 = h5py.File(self.path, libver="latest") return self._h5 @property def image(self) -> HDF5ImageCache | None: return self.get_image_cache("image") @property def image_bg(self) -> HDF5ImageCache | None: return self.get_image_cache("image_bg") @property def image_corr(self) -> ImageCorrCache | None: if "image_corr" not in self._image_cache: if self.image is not None and self.image_bg is not None: image_corr = ImageCorrCache(self.image, self.image_bg) else: image_corr = None self._image_cache["image_corr"] = image_corr return self._image_cache["image_corr"] @property def image_num_chunks(self): """Number of image chunks given `self.image_chunk_size`""" return int(np.ceil(len(self) / self.image_chunk_size)) @property def mask(self): return self.get_image_cache("mask") @property def meta_nest(self): """Return `self.meta` as nested dicitonary This gets very close to the dclab `config` property of datasets. """ md = {} for key in self.meta: sec, var = key.split(":") md.setdefault(sec, {})[var] = self.meta[key] return md @property def pixel_size(self): return self.meta.get("imaging:pixel size", 0) @pixel_size.setter def pixel_size(self, pixel_size: float): # Reduce pixel_size accuracy to 8 digits after the point to # enforce pipeline reproducibility (see get_ppid_from_ppkw). pixel_size = float(f"{pixel_size:.8f}") self.meta["imaging:pixel size"] = pixel_size
[docs] @staticmethod def extract_basin_dicts(h5, check=True): """Return list of basin dictionaries""" # TODO: support iterative mapped basins and catch # circular basin definitions. basins = [] for bnkey in h5.get("basins", {}).keys(): bn_data = "\n".join( [s.decode() for s in h5["basins"][bnkey][:].tolist()]) bn_dict = json.loads(bn_data) if check: if bn_dict["type"] not in ["internal", "file"]: # we only support file-based and internal basins continue basinmap = bn_dict.get("mapping") if basinmap is not None and basinmap not in h5["events"]: # basinmap feature is missing continue # Add the basin basins.append(bn_dict) return basins
@property def features_scalar_frame(self): """Scalar features that apply to all events in a frame This is a convenience function for copying scalar features over to new processed datasets. Return a list of all features that describe a frame (e.g. temperature or time). """ if self._feats is None: feats = [] for feat in self.keys(): if feat in PROTECTED_FEATURES: feats.append(feat) self._feats = feats return self._feats
[docs] def close(self): """Close the underlying HDF5 file""" # TODO: There is a memory leak (#50). for bn_group, _, _ in self._basin_data.values(): if bn_group is not None: if bn_group.id.valid: bn_group.file.close() del bn_group self._image_cache.clear() self._basin_data.clear() self._cache_scalar.clear() self.basins.clear() self.logs.clear() self.tables.clear() self.basins.clear() self.meta.clear() if self._h5 is not None: self._h5.close() self._h5 = None
[docs] def get_ppid(self): return self.get_ppid_from_ppkw( {"pixel_size": self.pixel_size, "index_mapping": self.index_mapping})
[docs] @classmethod def get_ppid_code(cls): return "hdf"
[docs] @classmethod def get_ppid_from_ppkw(cls, kwargs): # Data does not really fit into the PPID scheme we use for the rest # of the pipeline. This implementation here is custom. code = cls.get_ppid_code() # pixel size ppid_ps = f"{kwargs['pixel_size']:.8f}".rstrip("0") # index mapping ppid_im = cls.get_ppid_index_mapping(kwargs.get("index_mapping", None)) kwid = "^".join([f"p={ppid_ps}", f"i={ppid_im}"]) return ":".join([code, kwid])
[docs] @staticmethod def get_ppid_index_mapping(index_mapping): """Return the pipeline identifier part for index mapping""" im = index_mapping if im is None: dim = "0" elif isinstance(im, numbers.Integral): dim = f"{im}" elif isinstance(im, slice): dim = (f"{im.start if im.start is not None else 'n'}" + f"-{im.stop if im.stop is not None else 'n'}" + f"-{im.step if im.step is not None else 'n'}" ) elif isinstance(im, (list, np.ndarray)): idhash = hashlib.md5( np.asarray(im, dtype=np.uint32).tobytes()).hexdigest() dim = f"h-{idhash[:8]}" else: dim = "unknown" return dim
[docs] @staticmethod def get_ppkw_from_ppid(dat_ppid): # Data does not fit in the PPID scheme we use, but we still # would like to pass pixel_size to __init__ if we need it. code, kwargs_str = dat_ppid.split(":") if code != HDF5Data.get_ppid_code(): raise ValueError(f"Could not find data method '{code}'!") kwitems = kwargs_str.split("^") kwargs = {} for item in kwitems: var, val = item.split("=") if var == "p": kwargs["pixel_size"] = float(val) elif var == "i": if val.startswith("h-") or val == "unknown": raise ValueError(f"Cannot invert index mapping {val}") elif val == "0": kwargs["index_mapping"] = None elif val.count("-"): start, stop, step = val.split("-") kwargs["index_mapping"] = slice( None if start == "n" else int(start), None if stop == "n" else int(stop), None if step == "n" else int(step) ) else: kwargs["index_mapping"] = int(val) else: raise ValueError(f"Invalid parameter '{var}'!") return kwargs
[docs] def get_basin_data(self, index: int) -> tuple[ h5py.Group, # type: ignore list, int | slice | list | np.ndarray, ]: """Return HDF5Data info for a basin index in `self.basins` Parameters ---------- index: int index of the basin from which to get data Returns ------- group: h5py.Group HDF5 group containing HDF5 Datasets with the names listed in `features` features: list of str list of features made available by this basin index_mapping: a mapping (see `__init__`) that defines mapping from the basin dataset to the referring dataset """ if index not in self._basin_data: bn_dict = self.basins[index] # HDF5 group containing the feature data if bn_dict["type"] == "file": h5group, features = self._get_basin_data_file(bn_dict) elif bn_dict["type"] == "internal": h5group, features = self._get_basin_data_internal(bn_dict) else: raise ValueError(f"Invalid basin type '{bn_dict['type']}'") # index mapping feat_basinmap = bn_dict.get("mapping", None) if feat_basinmap is None: # This is NOT a mapped basin. index_mapping = self.index_mapping else: # This is a mapped basin. Create an indexing list. if self.index_mapping is None: # The current dataset is not mapped. basinmap_idx = slice(None) else: # The current dataset is also mapped. basinmap_idx = get_mapping_indices(self.index_mapping) basinmap = self.h5[f"events/{feat_basinmap}"] index_mapping = basinmap[basinmap_idx] self._basin_data[index] = (h5group, features, index_mapping) return self._basin_data[index]
[docs] def _get_basin_data_file(self, bn_dict): for ff in bn_dict["paths"]: pp = pathlib.Path(ff) if pp.is_absolute() and pp.exists(): path = pp break else: # try relative path prel = pathlib.Path(self.path).parent / pp if prel.exists(): path = prel break else: path = None if path is None: # Cannot get data from this basin / cannot find file h5group = None features = [] else: h5 = h5py.File(path, "r") # verify that the basin was identified correctly if ((id_exp := bn_dict.get("identifier")) is not None and (id_act := get_measurement_identifier(h5)) != id_exp): raise BasinIdentifierMismatchError( f"The basin '{path}' with identifier '{id_act}' " f"does not match the expected identifier '{id_exp}'") h5group = h5["events"] # features defined in the basin features = bn_dict.get("features") if features is None: # Only get the features from the actual HDF5 file. # If this file has basins as well, the basin metadata # should have been copied over to the parent file. This # makes things a little cleaner, because basins are not # nested, but all basins are available in the top file. # See :func:`write.store_metadata` for copying metadata # between files. # The writer can still specify "features" in the basin # metadata, then these basins are indeed nested, and # we consider that ok as well. features = sorted(h5group.keys()) return h5group, features
[docs] def _get_basin_data_internal(self, bn_dict): # The group name is normally "basin_events" group_name = bn_dict["paths"][0] if group_name != "basin_events": warnings.warn( f"Uncommon group name for basin features: {group_name}") h5group = self.h5[group_name] features = bn_dict.get("features") if features is None: raise ValueError( f"Encountered invalid internal basin '{bn_dict}': " f"'features' must be defined") return h5group, features
[docs] def get_image_cache(self, feat): """Create an HDF5ImageCache object for the current dataset This method also tries to find image data in `self.basins`. """ if feat not in self._image_cache: if f"events/{feat}" in self.h5: ds = self.h5[f"events/{feat}"] idx_map = self.index_mapping else: idx_map = None # search all basins (internal basins are always first) for idx in range(len(self.basins)): bn_grp, bn_feats, bn_map = self.get_basin_data(idx) if bn_feats is not None: if feat in bn_feats: # HDF5 dataset ds = bn_grp[feat] # Index mapping (taken from the basins which # already includes the mapping from the current # instance). idx_map = bn_map break else: ds = None if ds is not None: image = HDF5ImageCache( h5ds=get_mapped_object(obj=ds, index_mapping=idx_map), cache_size=self.image_cache_size, chunk_size=self.image_chunk_size, boolean=feat == "mask") else: image = None self._image_cache[feat] = image return self._image_cache[feat]
[docs] def keys(self): if self._keys is None: features = sorted(self.h5["/events"].keys()) # add basin features for ii in range(len(self.basins)): _, bn_feats, _ = self.get_basin_data(ii) if bn_feats: features += bn_feats self._keys = sorted(set(features)) return self._keys
[docs] def concatenated_hdf5_data(*args, **kwargs): warnings.warn( "Please use `dcnum.read.hdf5_concat.concatenated_hdf5_data`. " "Accessing this method via `dcnum.read.hdf5_data` is deprecated.", DeprecationWarning) from . import hdf5_concat return hdf5_concat.concatenated_hdf5_data(*args, **kwargs)
[docs] def get_measurement_identifier(h5: h5py.Group # type: ignore ) -> str | None: """Return the measurement identifier for the given H5File object The basin identifier is taken from the HDF5 attributes. If the "experiment:run identifier" attribute is not set, it is computed from the HDF5 attributes "experiment:time", "experiment:date", and "setup:identifier". If the measurement identifier cannot be found or computed, return None. """ # This is the current measurement identifier mid = h5.attrs.get("experiment:run identifier") if not mid: # Compute a measurement identifier from the metadata m_time = h5.attrs.get("experiment:time", None) or None m_date = h5.attrs.get("experiment:date", None) or None m_sid = h5.attrs.get("setup:identifier", None) or None if None not in [m_time, m_date, m_sid]: # Only compute an identifier if all of the above # are defined. hasher = hashlib.md5( f"{m_time}_{m_date}_{m_sid}".encode("utf-8")) mid = str(uuid.UUID(hex=hasher.hexdigest())) return mid