from __future__ import annotations
import hashlib
import json
import pathlib
import warnings
import numpy as np
from ..common import LazyLoader, h5py
from ..read import HDF5Data, get_measurement_identifier
from .._version import version
hdf5plugin = LazyLoader("hdf5plugin")
[docs]
class CreatingFileWithoutBasinWarning(UserWarning):
"""Issued when creating a basin-based dataset without basins"""
pass
[docs]
class IgnoringBasinTypeWarning(UserWarning):
"""Issued when a specific basin type is ignored"""
pass
[docs]
class HDF5Writer:
def __init__(self,
obj: h5py.File | pathlib.Path | str, # type: ignore
mode: str = "a",
ds_kwds: dict | None = None,
):
"""Write deformability cytometry HDF5 data
Parameters
----------
obj: h5py.File | pathlib.Path | str
object to instantiate the writer from; If this is already
a :class:`h5py.File` object, then it is used, otherwise the
argument is passed to :class:`h5py.File`
mode: str
opening mode when using :class:`h5py.File`
ds_kwds: dict
keyword arguments with which to initialize new Datasets
(e.g. compression)
"""
if isinstance(obj, h5py.File):
self.h5 = obj
self.h5_owned = False
else:
self.h5 = h5py.File(
obj,
mode=mode,
libver="latest",
# https://support.hdfgroup.org/documentation/hdf5-docs/advanced_topics/chunking_in_hdf5.html
# https://docs.h5py.org/en/stable/high/file.html#chunk-cache
# Set chunk cache size for each
# dataset to allow partial writes.
rdcc_nbytes=10 * 1024**2,
# If the value is set to 1, the library will
# always evict the least recently used chunk
# which has been fully read or written
rdcc_w0=1,
# The number of chunk slots in the cache for each dataset.
rdcc_nslots=1000,
)
self.h5_owned = True
self.events = self.h5.require_group("events")
ds_kwds = set_default_filter_kwargs(ds_kwds)
self.ds_kwds = ds_kwds
[docs]
def __enter__(self):
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def close(self):
self.h5.flush()
if self.h5_owned:
self.h5.close()
[docs]
@staticmethod
def get_best_nd_chunks(item_shape, feat_dtype=np.float64):
"""Return best chunks for HDF5 datasets
Chunking has performance implications. It’s recommended to keep the
total size of dataset chunks between 10 KiB and 1 MiB. This number
defines the maximum chunk size as well as half the maximum cache
size for each dataset.
"""
# set image feature chunk size to approximately 1MiB
num_bytes = 1024 ** 2
# Note that `np.prod(()) == 1`
event_size = np.prod(item_shape) * np.dtype(feat_dtype).itemsize
chunk_size = num_bytes / event_size
# Set minimum chunk size to 10 so that we can have at least some
# compression performance.
# Set maximum chunk size to 1000, so that we are not continuously
# rewriting data when there are a lot of events.
chunk_size_int = min(1000, max(10, int(np.floor(chunk_size))))
return tuple([chunk_size_int] + list(item_shape))
[docs]
def require_feature(self,
feat: str,
item_shape: tuple[int],
feat_dtype: np.dtype,
ds_kwds: dict | None = None,
group_name: str = "events"):
"""Create a new feature in the "events" group
Parameters
----------
feat: str
name of the feature
item_shape: tuple[int]
shape for one event of this feature, e.g. for a scalar
event, the shape would be `(1,)` and for an image, the
shape could be `(80, 300)`.
feat_dtype: np.dtype
dtype of the feature
ds_kwds: dict
HDF5 Dataset keyword arguments (e.g. compression, fletcher32)
group_name: str
name of the HDF5 group where the feature should be written to;
defaults to the "events" group, but a different group can be
specified for storing e.g. internal basin features.
"""
if group_name == "events":
egroup = self.events
else:
egroup = self.h5.require_group(group_name)
if feat not in egroup:
if ds_kwds is None:
ds_kwds = {}
for key in self.ds_kwds:
ds_kwds.setdefault(key, self.ds_kwds[key])
dset = egroup.create_dataset(
feat,
shape=tuple([0] + list(item_shape)),
dtype=feat_dtype,
maxshape=tuple([None] + list(item_shape)),
chunks=self.get_best_nd_chunks(item_shape,
feat_dtype=feat_dtype),
**ds_kwds)
if len(item_shape) == 2:
dset.attrs.create('CLASS', np.bytes_('IMAGE'))
dset.attrs.create('IMAGE_VERSION', np.bytes_('1.2'))
dset.attrs.create('IMAGE_SUBCLASS',
np.bytes_('IMAGE_GRAYSCALE'))
offset = 0
else:
dset = egroup[feat]
offset = dset.shape[0]
return dset, offset
[docs]
def store_basin(self,
name: str,
paths: list[str | pathlib.Path] | None = None,
features: list[str] | None = None,
description: str | None = None,
mapping: np.ndarray | None = None,
internal_data: dict | None = None,
identifier: str | None = None,
):
"""Write an HDF5-based file basin
Parameters
----------
name: str
basin name; Names do not have to be unique.
paths: list of str or pathlib.Path or None
location(s) of the basin; must be None when storing internal
data, a list of paths otherwise
features: list of str
list of features provided by `paths`
description: str
optional string describing the basin
mapping: 1D array
integer array with indices that map the basin dataset
to this dataset
internal_data: dict of ndarrays
internal basin data to store; If this is set, then `features`
and `paths` must be set to `None`.
identifier: str
the measurement identifier of the basin as computed by
the :func:`~dcnum.read.hdf5_data.get_measurement_identifier`
function.
"""
bdat = {
"description": description,
"name": name,
}
if internal_data:
if features is not None:
raise ValueError("`features` must be set to None when storing "
"internal basin features")
if paths is not None:
raise ValueError("`paths` must be set to None when storing "
"internal basin features")
if identifier is not None:
warnings.warn(f"Not storing identifier for internal "
f"basin '{name}' (got '{identifier}')")
# store the internal basin information
for feat in internal_data:
if feat in self.h5.require_group("basin_events"):
raise ValueError(f"Feature '{feat}' is already defined "
f"as an internal basin feature")
self.store_feature_chunk(feat=feat,
data=internal_data[feat],
group_name="basin_events")
features = sorted(internal_data.keys())
bdat["format"] = "h5dataset"
bdat["paths"] = ["basin_events"]
bdat["type"] = "internal"
else:
if paths is None:
raise ValueError("`paths` must not be set to None when "
"storing external basin features")
bdat["format"] = "hdf5"
bdat["paths"] = [str(pp) for pp in paths]
bdat["type"] = "file"
# identifier only makes sense here (not for internal basins)
bdat["identifier"] = identifier
# Explicit features stored in basin file
if features is not None and len(features):
bdat["features"] = features
# Mapped basin information
if mapping is not None:
events = self.h5.require_group("events")
# Reserve a mapping feature for this dataset
for ii in range(10): # basinmap0 to basinmap9
bm_cand = f"basinmap{ii}"
if bm_cand in events:
# There is a basin mapping defined in the file. Check
# whether it is identical to ours.
if np.all(events[bm_cand] == mapping):
# Great, we are done here.
feat_basinmap = bm_cand
break
else:
# This mapping belongs to a different basin,
# try the next mapping.
continue
else:
# The mapping is not defined in the dataset, and we may
# write it to a new feature.
feat_basinmap = bm_cand
self.store_feature_chunk(feat=feat_basinmap, data=mapping)
break
else:
raise ValueError(
"You have exhausted the usage of mapped basins for "
"the current dataset. Please revise your analysis "
"pipeline.")
bdat["mapping"] = feat_basinmap
bstring = json.dumps(bdat, indent=2)
# basin key is its hash
key = hashlib.md5(bstring.encode("utf-8",
errors="ignore")).hexdigest()
# write json-encoded basin to "basins" group
basins = self.h5.require_group("basins")
if key not in basins:
blines = bstring.split("\n")
basins.create_dataset(
name=key,
data=blines,
shape=(len(blines),),
# maximum line length
dtype=f"S{max([len(b) for b in blines])}",
chunks=True,
**self.ds_kwds)
[docs]
def store_feature_chunk(self, feat, data, group_name="events"):
"""Store feature data
The "chunk" implies that always chunks of data are stored,
never single events.
"""
if feat == "mask" and data.dtype == bool:
data = 255 * np.asarray(data, dtype=np.uint8)
ds, offset = self.require_feature(feat=feat,
item_shape=data.shape[1:],
feat_dtype=data.dtype,
group_name=group_name)
dsize = data.shape[0]
ds.resize(offset + dsize, axis=0)
ds[offset:offset + dsize] = data
[docs]
def store_log(self,
log: str,
data: list[str],
override: bool = False
) -> h5py.Dataset: # type: ignore
"""Store log data
Store the log data under the key `log`. The `data`
kwarg must be a list of strings. If the log entry
already exists, `ValueError` is raised unless
``override`` is set to True.
"""
logs = self.h5.require_group("logs")
if log in logs:
if override:
del logs[log]
else:
raise ValueError(
f"Log '{log}' already exists in {self.h5.filename}!")
log_ds = logs.create_dataset(
name=log,
data=data,
shape=(len(data),),
# maximum line length
dtype=f"S{max([len(ll) for ll in data])}",
chunks=True,
**self.ds_kwds)
return log_ds
[docs]
def create_with_basins(
path_out: str | pathlib.Path,
basin_paths: list[str | pathlib.Path] | list[list[str | pathlib.Path]]
):
"""Create an .rtdc file with basins
Parameters
----------
path_out:
The output .rtdc file where basins are written to
basin_paths:
The paths to the basins written to `path_out`. This can be
either a list of paths (to different basins) or a list of
lists for paths (for basins containing the same information,
commonly used for relative and absolute paths).
"""
path_out = pathlib.Path(path_out)
if not basin_paths:
warnings.warn(f"Creating basin-based file '{path_out}' without any "
f"basins, since the list `basin_paths' is empty!",
CreatingFileWithoutBasinWarning)
basin_paths = []
with HDF5Writer(path_out, mode="w") as hw:
# Get the metadata from the first available basin path
for bp in basin_paths:
if isinstance(bp, (str, pathlib.Path)):
# We have a single basin file
bps = [bp]
else: # list or tuple
bps = bp
# We need to make sure that we are not resolving a relative
# path to the working directory when we copy over data. Get
# a representative path for metadata extraction.
for pp in bps:
pp = pathlib.Path(pp)
if pp.is_absolute() and pp.exists():
prep = pp
break
else:
# try relative path
prel = pathlib.Path(path_out).parent / pp
if prel.exists():
prep = prel
break
else:
prep = None
# Copy the metadata from the representative path.
if prep is not None:
# copy metadata
with h5py.File(prep, libver="latest") as h5:
copy_metadata(h5_src=h5, h5_dst=hw.h5)
copy_basins(h5_src=h5, h5_dst=hw.h5, internal_basins=False)
# extract features
features = list(h5["events"].keys())
if "basin_events" in h5:
# add features from internal basins
features += list(h5["basin_events"].keys())
features = sorted([f for f in features if
not f.startswith("basinmap")])
basin_identifier = get_measurement_identifier(h5)
name = prep.name
else:
features = None
name = str(bps[0])
basin_identifier = None
# Write the basin data
hw.store_basin(name=name,
paths=bps,
features=features,
description=f"Created by dcnum {version}",
identifier=basin_identifier,
)
[docs]
def copy_basins(h5_src: "h5py.File",
h5_dst: "h5py.File",
internal_basins: bool = True
):
"""Reassemble basin data in the output file
This does not just copy the datasets defined in the "basins"
group, but it also loads the "basinmap?" features and stores
them as new "basinmap?" features in the output file.
"""
basins = HDF5Data.extract_basin_dicts(h5_src, check=False)
hw = HDF5Writer(h5_dst)
for bn_dict in basins:
if bn_dict["type"] == "internal" and internal_basins:
internal_data = {}
for feat in bn_dict["features"]:
internal_data[feat] = h5_src["basin_events"][feat]
hw.store_basin(name=bn_dict["name"],
description=bn_dict["description"],
mapping=h5_src["events"][bn_dict["mapping"]][:],
internal_data=internal_data,
)
elif bn_dict["type"] == "file":
if bn_dict.get("mapping") is not None:
mapping = h5_src["events"][bn_dict["mapping"]][:]
else:
mapping = None
hw.store_basin(name=bn_dict["name"],
description=bn_dict["description"],
paths=bn_dict["paths"],
features=bn_dict["features"],
mapping=mapping,
identifier=bn_dict.get("identifier"),
)
else:
if bn_dict['type'] == "internal" and not internal_basins:
# User explicitly ignored internal basin.
pass
else:
warnings.warn(f"Ignored basin of type '{bn_dict['type']}'",
IgnoringBasinTypeWarning)
[docs]
def copy_features(h5_src: h5py.File, # type: ignore
h5_dst: h5py.File, # type: ignore
features: list[str],
mapping: np.ndarray | None = None,
ds_kwds: dict | None = None,
):
"""Copy feature data from one HDF5 file to another
The feature must not exist in the destination file.
Parameters
----------
h5_src: h5py.File
Input HDF5File containing `features` in the "events" group
h5_dst: h5py.File
Output HDF5File opened in write mode not containing `features`
features: list[str]
List of features to copy from source to destination
mapping: 1D array
If given, contains indices in the input file that should be
written to the output file. If set to None, all features are written.
ds_kwds:
keyword arguments with which to initialize new Datasets
(e.g. compression); only relevant when `mapping is not None`
"""
ei = h5_src["events"]
eo = h5_dst.require_group("events")
hw = HDF5Writer(h5_dst, ds_kwds=ds_kwds)
for feat in features:
if feat in eo:
if ei[feat].shape == eo[feat].shape:
# Feature already exists in output file
if (len(ei[feat].shape) == 1
and np.all(ei[feat][:] == eo[feat][:])):
# Scalar features are identical, nothing to do.
continue
# TODO: Check non-scalar features with loop as well (OOM)?
raise ValueError(f"Output file {h5_dst.filename} already contains "
f"the feature {feat}.")
if not isinstance(ei[feat], h5py.Dataset):
raise NotImplementedError(
f"Only dataset-based features are supported here, not {feat}")
if mapping is None:
# Just copy the data as-is.
h5py.h5o.copy(src_loc=ei.id,
src_name=feat.encode(),
dst_loc=eo.id,
dst_name=feat.encode(),
)
else:
# We have to perform mapping.
# Since h5py is very slow at indexing with arrays,
# we instead read the data in chunks from the input file,
# and perform the mapping afterward using the numpy arrays.
dsi = ei[feat]
chunk_size = hw.get_best_nd_chunks(dsi[0].shape, dsi.dtype)[0]
size_in = dsi.shape[0]
start = 0
while start < size_in:
# Get a big chunk of data
big_chunk = 10 * chunk_size
stop = start + big_chunk
data_in = dsi[start:stop]
# Determine the indices that we need from that chunk.
mapping_idx = (start <= mapping) * (mapping < stop)
mapping_chunk = mapping[mapping_idx] - start
data = data_in[mapping_chunk]
# Note that HDF5 does its own caching, properly handling
# partial chunk writes.
hw.store_feature_chunk(feat, data)
# increment start
start = stop
[docs]
def set_default_filter_kwargs(ds_kwds: dict | None = None,
compression: bool = True):
if ds_kwds is None:
ds_kwds = {}
if compression and "compression" not in ds_kwds:
# Zstandard compression
for key, val in dict(hdf5plugin.Zstd(clevel=5)).items():
ds_kwds.setdefault(key, val)
else:
# No compression
ds_kwds.setdefault("compression", None)
ds_kwds.setdefault("compression_opts", None)
# checksums
ds_kwds.setdefault("fletcher32", True)
return ds_kwds