Source code for dcnum.read.hdf5_concat

import io
import pathlib
import tempfile
import warnings

import numpy as np

from ..common import h5py

from .hdf5_data import HDF5Data


[docs] def concatenated_hdf5_data(paths: list[pathlib.Path], path_out: bool | pathlib.Path | None = True, compute_frame: bool = True, features: list[str] | None = None): """Return a virtual dataset concatenating all the input paths Parameters ---------- paths: Path of the input HDF5 files that will be concatenated along the feature axis. The metadata will be taken from the first file. path_out: If `None`, then the dataset is created in memory. If `True` (default), create a file on disk. If a pathlib.Path is specified, the dataset is written to that file. Note that datasets in memory are likely not pickable (so don't use them for multiprocessing). compute_frame: Whether to compute the "events/frame" feature, taking the frame data from the input files and properly incrementing them along the file index. features: List of features to take from the input files. Notes ----- - If one of the input files does not contain a feature from the first input `paths`, then a `ValueError` is raised. Use the `features` argument to specify which features you need instead. - Basins are not considered. """ h5kwargs = {"mode": "w", "libver": "latest"} if isinstance(path_out, (pathlib.Path, str)): h5kwargs["name"] = path_out elif path_out is True: tf = tempfile.NamedTemporaryFile(prefix="dcnum_vc_", suffix=".hdf5", delete=False) tf.write(b"dummy") h5kwargs["name"] = tf.name tf.close() elif path_out is None: h5kwargs["name"] = io.BytesIO() else: raise ValueError( f"Invalid type for `path_out`: {type(path_out)} ({path_out}") if len(paths) == 0: raise ValueError("Please specify at least one file in `paths`!") elif len(paths) == 1: warnings.warn("Only one file passed to `concatenated_hdf5_data`; this " "is equivalent to using `HDF5Data`, but slower.") frames = [] with h5py.File(**h5kwargs) as hv: # determine the sizes of the input files shapes = {} dtypes = {} size = 0 for ii, pp in enumerate(paths): pp = pathlib.Path(pp).resolve() with h5py.File(pp, libver="latest") as h5: # get all feature keys featsi = sorted(h5["events"].keys()) # get metadata if ii == 0: meta = dict(h5.attrs) if not features: features = featsi # make sure number of features are consistent if not set(features) <= set(featsi): raise ValueError( f"File {pp} contains more features than {paths[0]}!") # populate shapes for all features for feat in features: if not isinstance(h5["events"][feat], h5py.Dataset): warnings.warn( f"Ignoring {feat}; not implemented yet!") continue if feat in ["frame", "time"]: continue shapes.setdefault(feat, []).append( h5["events"][feat].shape) if ii == 0: dtypes[feat] = h5["events"][feat].dtype # increment size size += h5["events"][features[0]].shape[0] # remember the frame feature if requested if compute_frame: frames.append(h5["events/frame"][:]) # write metadata hv.attrs.update(meta) # Create the virtual datasets for feat in shapes: if len(shapes[feat][0]) == 1: # scalar feature shape = (sum([sh[0] for sh in shapes[feat]])) else: # non-scalar feature length = (sum([sh[0] for sh in shapes[feat]])) shape = list(shapes[feat][0]) shape[0] = length shape = tuple(shape) layout = h5py.VirtualLayout(shape=shape, dtype=dtypes[feat]) loc = 0 for jj, pp in enumerate(paths): vsource = h5py.VirtualSource(pp, f"events/{feat}", shape=shapes[feat][jj]) cursize = shapes[feat][jj][0] layout[loc:loc+cursize] = vsource loc += cursize hv.create_virtual_dataset(f"/events/{feat}", layout, fillvalue=0) if compute_frame: # concatenate frames and store in dataset frame_concat = np.zeros(size, dtype=np.uint64) locf = 0 # indexing location prevmax = 0 # maximum frame number stored so far in array for fr in frames: offset = prevmax + 1 - fr[0] frame_concat[locf:locf+fr.size] = fr + offset locf += fr.size prevmax = fr[-1] + offset hv.create_dataset("/events/frame", data=frame_concat) # write metadata hv.attrs["experiment:event count"] = size data = HDF5Data(h5kwargs["name"]) return data