dcnum.write

Submodules

Classes

ChunkWriter

Convenience class for writing to data outside the main loop

EventStash

Sort events into predefined arrays for bulk access

QueueWriterProcess

Write events from a queue to an .rtdc file

QueueWriterThread

Write events from a queue to an .rtdc file

HDF5Writer

Write deformability cytometry HDF5 data

Functions

copy_basins(h5_src, h5_dst[, internal_basins])

Reassemble basin data in the output file

copy_features(h5_src, h5_dst, features[, mapping, ds_kwds])

Copy feature data from one HDF5 file to another

copy_metadata(h5_src, h5_dst)

Copy attributes, tables, and logs from one H5File to another

create_with_basins(path_out, basin_paths)

Create an .rtdc file with basins

set_default_filter_kwargs([ds_kwds, compression])

Package Contents

class dcnum.write.ChunkWriter(path_out: pathlib.Path | dcnum.common.h5py.File, dq: collections.deque, write_queue_size: multiprocessing.sharedctypes.Synchronized, ds_kwds: dict | None = None, mode: str = 'a', parent_logger: logging.Logger | None = None, *args, **kwargs)[source]

Bases: threading.Thread

Convenience class for writing to data outside the main loop

Data are numpy arrays collected from a dequeue object

Parameters:
  • path_out – Path to the output HDF5 file

  • dq (collections.deque) – collections.deque object from which data are taken using popleft().

  • write_queue_size – Multiprocessing value to which the size of dq is written periodically

  • ds_kwds – keyword arguments for dataset creation, passed to HDF5Writer

  • mode – HDF5 file opening mode, passed to HDF5Writer

writer
dq
may_stop_loop = False
must_stop_loop = False
write_queue_size
abort_loop()[source]

Force aborting the loop as soon as possible

finished_when_queue_empty()[source]

Stop the loop as soon as self.dq is empty

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class dcnum.write.EventStash(index_offset: int, feat_nevents: list[int])[source]

Sort events into predefined arrays for bulk access

Parameters:
  • index_offset – This is the index offset at which we are working on. Normally, feat_nevents is just a slice of a larger array and index_offset defines at which position it is taken.

  • feat_nevents – List that defines how many events there are for each input frame. If summed up, this defines self.size.

events

Dictionary containing the event arrays

feat_nevents

List containing the number of events per input frame

nev_idx

Cumulative sum of feat_nevents for determining sorting offsets

size

Number of events in this stash

num_frames

Number of frames in this stash

index_offset

Global offset compared to the original data instance.

indices_for_data

Array containing the indices in the original data instance. These indices correspond to the events in events.

_tracker

Private array that tracks the progress.

is_complete()[source]

Determine whether the event stash is complete (all events added)

add_events(index, events)[source]

Add events to this stash

Parameters:
  • index (int) – Global index (from input dataset)

  • events (dict) – Event dictionary

require_feature(feat, sample_data)[source]

Create a new empty feature array in self.events and return it

Parameters:
  • feat – Feature name

  • sample_data – Sample data for one event of the feature (used to determine shape and dtype of the feature array)

class dcnum.write.QueueWriterProcess(log_queue: multiprocessing.Queue, *args, **kwargs)[source]

Bases: dcnum.write.queue_writer_base.QueueWriterBase, mp_spawn

Write events from a queue to an .rtdc file

Events coming from a queue cannot be guaranteed to be in order. The QueueWriterThread uses a EventStash to sort events into the correct order before sending them to the ChunkWriter for storage.

Parameters:
  • event_queue – A queue object to which other processes or threads write events as tuples (frame_index, events_dict).

  • write_queue_size – A mp.Value that is populated with the number of event chunks waiting to be written to the output file by the ChunkWriter.

  • feat_nevents – This 1D array contains the number of events for each frame in the input data. This serves two purposes: (1) it allows us to determine how many events we are writing when we are writing data from write_threshold frames, and (2) it allows us to keep track how many frames have actually been processed (and thus we can expect entries in event_queue for). If an entry in this array is -1, this means that there is no event in event_queue. See write_threshold below.

  • path_out – Output path for writer

  • hdf5_dataset_kwargs – Dictionary of keyword arguments (e.g. “compression”) for HDF5 dataset creation.

  • write_threshold – This integer defines how many frames should be collected at once and put into writer_dq. For instance, with a value of 500, at least 500 items are taken from the event_queue (they should match the expected frame index, frame indices that do not match are kept in a EventStash). Then, for each frame, we may have multiple or None events, so the output size could be 513 which is computed via np.sum(feat_nevents[idx:idx+write_threshold]).

log_queue

queue for logging

run(**kwargs)[source]

Start the writing process

This method is intended to be run in a thread or process.

buffer_dq and writer_dq are used for testing.

class dcnum.write.QueueWriterThread(*args, **kwargs)[source]

Bases: dcnum.write.queue_writer_base.QueueWriterBase, threading.Thread

Write events from a queue to an .rtdc file

Events coming from a queue cannot be guaranteed to be in order. The QueueWriterThread uses a EventStash to sort events into the correct order before sending them to the ChunkWriter for storage.

Parameters:
  • event_queue – A queue object to which other processes or threads write events as tuples (frame_index, events_dict).

  • write_queue_size – A mp.Value that is populated with the number of event chunks waiting to be written to the output file by the ChunkWriter.

  • feat_nevents – This 1D array contains the number of events for each frame in the input data. This serves two purposes: (1) it allows us to determine how many events we are writing when we are writing data from write_threshold frames, and (2) it allows us to keep track how many frames have actually been processed (and thus we can expect entries in event_queue for). If an entry in this array is -1, this means that there is no event in event_queue. See write_threshold below.

  • path_out – Output path for writer

  • hdf5_dataset_kwargs – Dictionary of keyword arguments (e.g. “compression”) for HDF5 dataset creation.

  • write_threshold – This integer defines how many frames should be collected at once and put into writer_dq. For instance, with a value of 500, at least 500 items are taken from the event_queue (they should match the expected frame index, frame indices that do not match are kept in a EventStash). Then, for each frame, we may have multiple or None events, so the output size could be 513 which is computed via np.sum(feat_nevents[idx:idx+write_threshold]).

class dcnum.write.HDF5Writer(obj: dcnum.common.h5py.File | pathlib.Path | str, mode: str = 'a', ds_kwds: dict | None = None)[source]

Write deformability cytometry HDF5 data

Parameters:
  • obj (h5py.File | pathlib.Path | str) – object to instantiate the writer from; If this is already a h5py.File object, then it is used, otherwise the argument is passed to h5py.File

  • mode (str) – opening mode when using h5py.File

  • ds_kwds (dict) – keyword arguments with which to initialize new Datasets (e.g. compression)

events
ds_kwds = None
__enter__()[source]
__exit__(exc_type, exc_val, exc_tb)[source]
close()[source]
static get_best_nd_chunks(item_shape, feat_dtype=np.float64)[source]

Return best chunks for HDF5 datasets

Chunking has performance implications. It’s recommended to keep the total size of dataset chunks between 10 KiB and 1 MiB. This number defines the maximum chunk size as well as half the maximum cache size for each dataset.

require_feature(feat: str, item_shape: tuple[int], feat_dtype: numpy.dtype, ds_kwds: dict | None = None, group_name: str = 'events')[source]

Create a new feature in the “events” group

Parameters:
  • feat (str) – name of the feature

  • item_shape (tuple[int]) – shape for one event of this feature, e.g. for a scalar event, the shape would be (1,) and for an image, the shape could be (80, 300).

  • feat_dtype (np.dtype) – dtype of the feature

  • ds_kwds (dict) – HDF5 Dataset keyword arguments (e.g. compression, fletcher32)

  • group_name (str) – name of the HDF5 group where the feature should be written to; defaults to the “events” group, but a different group can be specified for storing e.g. internal basin features.

store_basin(name: str, paths: list[str | pathlib.Path] | None = None, features: list[str] | None = None, description: str | None = None, mapping: numpy.ndarray | None = None, internal_data: dict | None = None, identifier: str | None = None)[source]

Write an HDF5-based file basin

Parameters:
  • name (str) – basin name; Names do not have to be unique.

  • paths (list of str or pathlib.Path or None) – location(s) of the basin; must be None when storing internal data, a list of paths otherwise

  • features (list of str) – list of features provided by paths

  • description (str) – optional string describing the basin

  • mapping (1D array) – integer array with indices that map the basin dataset to this dataset

  • internal_data (dict of ndarrays) – internal basin data to store; If this is set, then features and paths must be set to None.

  • identifier (str) – the measurement identifier of the basin as computed by the get_measurement_identifier() function.

store_feature_chunk(feat, data, group_name='events')[source]

Store feature data

The “chunk” implies that always chunks of data are stored, never single events.

store_log(log: str, data: list[str], override: bool = False) dcnum.common.h5py.Dataset[source]

Store log data

Store the log data under the key log. The data kwarg must be a list of strings. If the log entry already exists, ValueError is raised unless override is set to True.

dcnum.write.copy_basins(h5_src: dcnum.common.h5py.File, h5_dst: dcnum.common.h5py.File, internal_basins: bool = True)[source]

Reassemble basin data in the output file

This does not just copy the datasets defined in the “basins” group, but it also loads the “basinmap?” features and stores them as new “basinmap?” features in the output file.

dcnum.write.copy_features(h5_src: dcnum.common.h5py.File, h5_dst: dcnum.common.h5py.File, features: list[str], mapping: numpy.ndarray | None = None, ds_kwds: dict | None = None)[source]

Copy feature data from one HDF5 file to another

The feature must not exist in the destination file.

Parameters:
  • h5_src (h5py.File) – Input HDF5File containing features in the “events” group

  • h5_dst (h5py.File) – Output HDF5File opened in write mode not containing features

  • features (list[str]) – List of features to copy from source to destination

  • mapping (1D array) – If given, contains indices in the input file that should be written to the output file. If set to None, all features are written.

  • ds_kwds – keyword arguments with which to initialize new Datasets (e.g. compression); only relevant when mapping is not None

dcnum.write.copy_metadata(h5_src: dcnum.common.h5py.File, h5_dst: dcnum.common.h5py.File)[source]

Copy attributes, tables, and logs from one H5File to another

Notes

Metadata in h5_dst are never overridden, only metadata that are not defined already are added.

dcnum.write.create_with_basins(path_out: str | pathlib.Path, basin_paths: list[str | pathlib.Path] | list[list[str | pathlib.Path]])[source]

Create an .rtdc file with basins

Parameters:
  • path_out – The output .rtdc file where basins are written to

  • basin_paths – The paths to the basins written to path_out. This can be either a list of paths (to different basins) or a list of lists for paths (for basins containing the same information, commonly used for relative and absolute paths).

dcnum.write.set_default_filter_kwargs(ds_kwds: dict | None = None, compression: bool = True)[source]