from __future__ import annotations
import functools
import hashlib
import json
import numbers
import pathlib
from typing import Any, BinaryIO
import uuid
import warnings
import numpy as np
from ..common import h5py
from .cache import HDF5ImageCache, ImageCorrCache, md5sum
from .const import PROTECTED_FEATURES
from .mapped import get_mapped_object, get_mapping_indices
[docs]
class BasinIdentifierMismatchError(BaseException):
"""Used when basin identifiers do not match"""
[docs]
class HDF5Data:
"""HDF5 (.rtdc) input file data instance"""
def __init__(self,
path: pathlib.Path | h5py.File | BinaryIO, # type: ignore
pixel_size: float | None = None,
md5_5m: str | None = None,
meta: dict | None = None,
basins: list[dict[str, list[str] | str]] | None = None,
logs: dict[str, list[str]] | None = None,
tables: dict[str, np.ndarray] | None = None,
image_cache_size: int = 2,
image_chunk_size: int = 1000,
index_mapping: int | slice | list | np.ndarray | None = None,
):
"""
Parameters
----------
path:
path to data file
pixel_size:
pixel size in µm
md5_5m:
MD5 sum of the first 5 MiB; computed if not provided
meta:
metadata dictionary; extracted from HDF5 attributes
if not provided
basins:
list of basin dictionaries; extracted from HDF5 attributes
if not provided
logs:
dictionary of logs; extracted from HDF5 attributes
if not provided
tables:
dictionary of tables; extracted from HDF5 attributes
if not provided
image_cache_size:
size of the image cache to use when accessing image data
image_chunk_size:
maximum number of images in each image cache chunk
index_mapping:
select only a subset of input events, transparently reducing the
size of the dataset, possible data types are
- int `N`: use the first `N` events
- slice: use the events defined by a slice
- list: list of integers specifying the event indices to use
Numpy indexing rules apply. E.g. to only process the first
100 events, set this to `100` or `slice(0, 100)`.
"""
# Init is in __setstate__ so we can pickle this class
# and use it for multiprocessing.
if isinstance(path, h5py.File):
self._h5 = path
path = path.filename
else:
self._h5 = None
# Optimize image chunk size
with h5py.File(path, "r") as h5:
if "events/image" in h5:
h5ds = h5["events/image"]
if isinstance(h5ds, h5py.Dataset) and h5ds.chunks is not None:
# Align the `HDF5ImageCache` chunk size to the chunk size
# of the underlying HDF5 dataset.
# The alignment is not applied to:
# - `h5py.Dataset` data that are stored in contiguous mode
# - `MappedHDF5Dataset` instances
# Determine the chunk size of the dataset.
ds_chunk_size = h5ds.chunks[0]
if ds_chunk_size >= image_chunk_size:
# Adopt the actual chunk size. Nothing else
# makes sense.
image_chunk_size = ds_chunk_size
else:
# Determine the multiples of chunks that comprise
# the new chunk_size.
divider = image_chunk_size // ds_chunk_size
# The new chunk size might be smaller than the
# original one.
image_chunk_size = divider * ds_chunk_size
self.__setstate__({"path": path,
"pixel_size": pixel_size,
"md5_5m": md5_5m,
"meta": meta,
"basins": basins,
"logs": logs,
"tables": tables,
"image_cache_size": image_cache_size,
"image_chunk_size": image_chunk_size,
"index_mapping": index_mapping,
})
[docs]
def __contains__(self, item):
return item in self.keys()
[docs]
def __enter__(self):
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def __getitem__(self, feat):
if feat in ["image", "image_bg", "mask"]:
data = self.get_image_cache(feat) # already index-mapped
if data is None:
raise KeyError(f"Feature '{feat}' not found in {self}!")
else:
return data
elif feat in self._cache_scalar: # check for scalar cached
return self._cache_scalar[feat]
elif (feat in self.h5["events"]
and len(self.h5["events"][feat].shape) == 1): # cache scalar
if self.index_mapping is None:
# no mapping indices, just slice
dat_sc = self.h5["events"][feat][:]
else:
dat_sc = get_mapped_object(self.h5["events"][feat],
index_mapping=self.index_mapping)[:]
self._cache_scalar[feat] = dat_sc
return self._cache_scalar[feat]
else:
if feat in self.h5["events"]:
# Not cached (possibly slow)
warnings.warn(f"Feature {feat} not cached (possibly slow)")
return get_mapped_object(
obj=self.h5["events"][feat],
index_mapping=self.index_mapping)
else:
# Check the basins
for idx in range(len(self.basins)):
bn_grp, bn_feats, bn_map = self.get_basin_data(idx)
if bn_feats and feat in bn_feats:
mapped_ds = get_mapped_object(obj=bn_grp[feat],
index_mapping=bn_map)
return mapped_ds
# If we got here, then the feature data does not exist.
raise KeyError(f"Feature '{feat}' not found in {self}!")
[docs]
def __getstate__(self):
return {"path": self.path,
"pixel_size": self.pixel_size,
"md5_5m": self.md5_5m,
"meta": self.meta,
"logs": self.logs,
"tables": self.tables,
"basins": self.basins,
"image_cache_size": self.image_cache_size,
"image_chunk_size": self.image_chunk_size,
"index_mapping": self.index_mapping,
"len": self._len,
}
[docs]
def __setstate__(self, state):
# Make sure these properties exist (we rely on __init__, because
# we want this class to be pickable and __init__ is not called by
# `pickle.load`).
# Cached properties
self._feats = None
self._keys = None
self._len = state.get("len", None)
# Image cache
if not hasattr(self, "_image_cache"):
self._image_cache: dict[str, Any] = {}
# Basin data
if not hasattr(self, "_basin_data"):
self._basin_data = {}
# Scalar feature cache
if not hasattr(self, "_cache_scalar"):
self._cache_scalar = {}
if not hasattr(self, "_h5"):
self._h5 = None
path = state["path"]
if isinstance(path, str):
path = pathlib.Path(path)
self.path = path
self.md5_5m = state["md5_5m"]
if self.md5_5m is None:
if isinstance(self.path, pathlib.Path):
# 5MB md5sum of input file
self.md5_5m = md5sum(self.path, blocksize=65536, count=80)
else:
self.md5_5m = str(uuid.uuid4()).replace("-", "")
self.meta = state["meta"]
self.logs = state["logs"]
self.tables = state["tables"]
self.basins = state["basins"]
if (self.meta is None
or self.logs is None
or self.tables is None
or self.basins is None):
self.logs = {}
self.tables = {}
self.basins = []
# get dataset configuration
with h5py.File(self.path,
libver="latest",
) as h5:
# meta
self.meta = dict(h5.attrs)
for key in self.meta:
if isinstance(self.meta[key], bytes):
self.meta[key] = self.meta[key].decode("utf-8")
# logs
for key in sorted(h5.get("logs", {}).keys()):
alog = list(h5["logs"][key])
if alog:
if isinstance(alog[0], bytes):
alog = [ll.decode("utf") for ll in alog]
self.logs[key] = alog
# tables
for tab in sorted(h5.get("tables", {}).keys()):
fields = h5["tables"][tab].dtype.fields
if fields is None:
# No individual curves, but an image array
self.tables[tab] = h5["tables"][tab][:]
else:
# List of curves with predefined dtypes
tabdict = {}
for tkey in fields.keys():
tabdict[tkey] = \
np.asarray(h5["tables"][tab][tkey]).reshape(-1)
self.tables[tab] = tabdict
# basins
basins = self.extract_basin_dicts(h5)
def basin_sort_cmp(a, b):
"""Sort internal basins before any other basins"""
if a["type"] == b["type"]:
an = a["name"]
bn = b["name"]
if an == bn:
return 0
if an < bn:
return -1
else:
return 1
elif a["type"] == "internal":
# internal basins should come first
return -1
else:
return 1
self.basins = sorted(basins,
key=functools.cmp_to_key(basin_sort_cmp)
)
if state["pixel_size"] is not None:
self.pixel_size = state["pixel_size"]
self.image_cache_size = state["image_cache_size"]
self.image_chunk_size = state["image_chunk_size"]
self.index_mapping = state["index_mapping"]
[docs]
def __len__(self):
if self._len is None:
if self.index_mapping is not None:
self._len = get_mapping_indices(self.index_mapping).size
else:
self._len = self.h5.attrs["experiment:event count"]
return self._len
@property
def h5(self):
if self._h5 is None:
self._h5 = h5py.File(self.path, libver="latest")
return self._h5
@property
def image(self) -> HDF5ImageCache | None:
return self.get_image_cache("image")
@property
def image_bg(self) -> HDF5ImageCache | None:
return self.get_image_cache("image_bg")
@property
def image_corr(self) -> ImageCorrCache | None:
if "image_corr" not in self._image_cache:
if self.image is not None and self.image_bg is not None:
image_corr = ImageCorrCache(self.image, self.image_bg)
else:
image_corr = None
self._image_cache["image_corr"] = image_corr
return self._image_cache["image_corr"]
@property
def image_num_chunks(self):
"""Number of image chunks given `self.image_chunk_size`"""
return int(np.ceil(len(self) / self.image_chunk_size))
@property
def mask(self):
return self.get_image_cache("mask")
@property
def meta_nest(self):
"""Return `self.meta` as nested dicitonary
This gets very close to the dclab `config` property of datasets.
"""
md = {}
for key in self.meta:
sec, var = key.split(":")
md.setdefault(sec, {})[var] = self.meta[key]
return md
@property
def pixel_size(self):
return self.meta.get("imaging:pixel size", 0)
@pixel_size.setter
def pixel_size(self, pixel_size: float):
# Reduce pixel_size accuracy to 8 digits after the point to
# enforce pipeline reproducibility (see get_ppid_from_ppkw).
pixel_size = float(f"{pixel_size:.8f}")
self.meta["imaging:pixel size"] = pixel_size
@property
def features_scalar_frame(self):
"""Scalar features that apply to all events in a frame
This is a convenience function for copying scalar features
over to new processed datasets. Return a list of all features
that describe a frame (e.g. temperature or time).
"""
if self._feats is None:
feats = []
for feat in self.keys():
if feat in PROTECTED_FEATURES:
feats.append(feat)
self._feats = feats
return self._feats
[docs]
def close(self):
"""Close the underlying HDF5 file"""
# TODO: There is a memory leak (#50).
for bn_group, _, _ in self._basin_data.values():
if bn_group is not None:
if bn_group.id.valid:
bn_group.file.close()
del bn_group
self._image_cache.clear()
self._basin_data.clear()
self._cache_scalar.clear()
self.basins.clear()
self.logs.clear()
self.tables.clear()
self.basins.clear()
self.meta.clear()
if self._h5 is not None:
self._h5.close()
self._h5 = None
[docs]
def get_ppid(self):
return self.get_ppid_from_ppkw(
{"pixel_size": self.pixel_size,
"index_mapping": self.index_mapping})
[docs]
@classmethod
def get_ppid_code(cls):
return "hdf"
[docs]
@classmethod
def get_ppid_from_ppkw(cls, kwargs):
# Data does not really fit into the PPID scheme we use for the rest
# of the pipeline. This implementation here is custom.
code = cls.get_ppid_code()
# pixel size
ppid_ps = f"{kwargs['pixel_size']:.8f}".rstrip("0")
# index mapping
ppid_im = cls.get_ppid_index_mapping(kwargs.get("index_mapping", None))
kwid = "^".join([f"p={ppid_ps}", f"i={ppid_im}"])
return ":".join([code, kwid])
[docs]
@staticmethod
def get_ppid_index_mapping(index_mapping):
"""Return the pipeline identifier part for index mapping"""
im = index_mapping
if im is None:
dim = "0"
elif isinstance(im, numbers.Integral):
dim = f"{im}"
elif isinstance(im, slice):
dim = (f"{im.start if im.start is not None else 'n'}"
+ f"-{im.stop if im.stop is not None else 'n'}"
+ f"-{im.step if im.step is not None else 'n'}"
)
elif isinstance(im, (list, np.ndarray)):
idhash = hashlib.md5(
np.asarray(im, dtype=np.uint32).tobytes()).hexdigest()
dim = f"h-{idhash[:8]}"
else:
dim = "unknown"
return dim
[docs]
@staticmethod
def get_ppkw_from_ppid(dat_ppid):
# Data does not fit in the PPID scheme we use, but we still
# would like to pass pixel_size to __init__ if we need it.
code, kwargs_str = dat_ppid.split(":")
if code != HDF5Data.get_ppid_code():
raise ValueError(f"Could not find data method '{code}'!")
kwitems = kwargs_str.split("^")
kwargs = {}
for item in kwitems:
var, val = item.split("=")
if var == "p":
kwargs["pixel_size"] = float(val)
elif var == "i":
if val.startswith("h-") or val == "unknown":
raise ValueError(f"Cannot invert index mapping {val}")
elif val == "0":
kwargs["index_mapping"] = None
elif val.count("-"):
start, stop, step = val.split("-")
kwargs["index_mapping"] = slice(
None if start == "n" else int(start),
None if stop == "n" else int(stop),
None if step == "n" else int(step)
)
else:
kwargs["index_mapping"] = int(val)
else:
raise ValueError(f"Invalid parameter '{var}'!")
return kwargs
[docs]
def get_basin_data(self, index: int) -> tuple[
h5py.Group, # type: ignore
list,
int | slice | list | np.ndarray,
]:
"""Return HDF5Data info for a basin index in `self.basins`
Parameters
----------
index: int
index of the basin from which to get data
Returns
-------
group: h5py.Group
HDF5 group containing HDF5 Datasets with the names
listed in `features`
features: list of str
list of features made available by this basin
index_mapping:
a mapping (see `__init__`) that defines mapping from
the basin dataset to the referring dataset
"""
if index not in self._basin_data:
bn_dict = self.basins[index]
# HDF5 group containing the feature data
if bn_dict["type"] == "file":
h5group, features = self._get_basin_data_file(bn_dict)
elif bn_dict["type"] == "internal":
h5group, features = self._get_basin_data_internal(bn_dict)
else:
raise ValueError(f"Invalid basin type '{bn_dict['type']}'")
# index mapping
feat_basinmap = bn_dict.get("mapping", None)
if feat_basinmap is None:
# This is NOT a mapped basin.
index_mapping = self.index_mapping
else:
# This is a mapped basin. Create an indexing list.
if self.index_mapping is None:
# The current dataset is not mapped.
basinmap_idx = slice(None)
else:
# The current dataset is also mapped.
basinmap_idx = get_mapping_indices(self.index_mapping)
basinmap = self.h5[f"events/{feat_basinmap}"]
index_mapping = basinmap[basinmap_idx]
self._basin_data[index] = (h5group, features, index_mapping)
return self._basin_data[index]
[docs]
def _get_basin_data_file(self, bn_dict):
for ff in bn_dict["paths"]:
pp = pathlib.Path(ff)
if pp.is_absolute() and pp.exists():
path = pp
break
else:
# try relative path
prel = pathlib.Path(self.path).parent / pp
if prel.exists():
path = prel
break
else:
path = None
if path is None:
# Cannot get data from this basin / cannot find file
h5group = None
features = []
else:
h5 = h5py.File(path, "r")
# verify that the basin was identified correctly
if ((id_exp := bn_dict.get("identifier")) is not None
and (id_act := get_measurement_identifier(h5)) != id_exp):
raise BasinIdentifierMismatchError(
f"The basin '{path}' with identifier '{id_act}' "
f"does not match the expected identifier '{id_exp}'")
h5group = h5["events"]
# features defined in the basin
features = bn_dict.get("features")
if features is None:
# Only get the features from the actual HDF5 file.
# If this file has basins as well, the basin metadata
# should have been copied over to the parent file. This
# makes things a little cleaner, because basins are not
# nested, but all basins are available in the top file.
# See :func:`write.store_metadata` for copying metadata
# between files.
# The writer can still specify "features" in the basin
# metadata, then these basins are indeed nested, and
# we consider that ok as well.
features = sorted(h5group.keys())
return h5group, features
[docs]
def _get_basin_data_internal(self, bn_dict):
# The group name is normally "basin_events"
group_name = bn_dict["paths"][0]
if group_name != "basin_events":
warnings.warn(
f"Uncommon group name for basin features: {group_name}")
h5group = self.h5[group_name]
features = bn_dict.get("features")
if features is None:
raise ValueError(
f"Encountered invalid internal basin '{bn_dict}': "
f"'features' must be defined")
return h5group, features
[docs]
def get_image_cache(self, feat):
"""Create an HDF5ImageCache object for the current dataset
This method also tries to find image data in `self.basins`.
"""
if feat not in self._image_cache:
if f"events/{feat}" in self.h5:
ds = self.h5[f"events/{feat}"]
idx_map = self.index_mapping
else:
idx_map = None
# search all basins (internal basins are always first)
for idx in range(len(self.basins)):
bn_grp, bn_feats, bn_map = self.get_basin_data(idx)
if bn_feats is not None:
if feat in bn_feats:
# HDF5 dataset
ds = bn_grp[feat]
# Index mapping (taken from the basins which
# already includes the mapping from the current
# instance).
idx_map = bn_map
break
else:
ds = None
if ds is not None:
image = HDF5ImageCache(
h5ds=get_mapped_object(obj=ds, index_mapping=idx_map),
cache_size=self.image_cache_size,
chunk_size=self.image_chunk_size,
boolean=feat == "mask")
else:
image = None
self._image_cache[feat] = image
return self._image_cache[feat]
[docs]
def keys(self):
if self._keys is None:
features = sorted(self.h5["/events"].keys())
# add basin features
for ii in range(len(self.basins)):
_, bn_feats, _ = self.get_basin_data(ii)
if bn_feats:
features += bn_feats
self._keys = sorted(set(features))
return self._keys
[docs]
def concatenated_hdf5_data(*args, **kwargs):
warnings.warn(
"Please use `dcnum.read.hdf5_concat.concatenated_hdf5_data`. "
"Accessing this method via `dcnum.read.hdf5_data` is deprecated.",
DeprecationWarning)
from . import hdf5_concat
return hdf5_concat.concatenated_hdf5_data(*args, **kwargs)
[docs]
def get_measurement_identifier(h5: h5py.Group # type: ignore
) -> str | None:
"""Return the measurement identifier for the given H5File object
The basin identifier is taken from the HDF5 attributes. If the
"experiment:run identifier" attribute is not set, it is
computed from the HDF5 attributes "experiment:time",
"experiment:date", and "setup:identifier".
If the measurement identifier cannot be found or computed,
return None.
"""
# This is the current measurement identifier
mid = h5.attrs.get("experiment:run identifier")
if not mid:
# Compute a measurement identifier from the metadata
m_time = h5.attrs.get("experiment:time", None) or None
m_date = h5.attrs.get("experiment:date", None) or None
m_sid = h5.attrs.get("setup:identifier", None) or None
if None not in [m_time, m_date, m_sid]:
# Only compute an identifier if all of the above
# are defined.
hasher = hashlib.md5(
f"{m_time}_{m_date}_{m_sid}".encode("utf-8"))
mid = str(uuid.UUID(hex=hasher.hexdigest()))
return mid