dcnum.write =========== .. py:module:: dcnum.write Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/dcnum/write/chunk_writer/index /autoapi/dcnum/write/event_stash/index /autoapi/dcnum/write/queue_writer_base/index /autoapi/dcnum/write/queue_writer_process/index /autoapi/dcnum/write/queue_writer_thread/index /autoapi/dcnum/write/writer/index Classes ------- .. autoapisummary:: dcnum.write.ChunkWriter dcnum.write.EventStash dcnum.write.QueueWriterProcess dcnum.write.QueueWriterThread dcnum.write.HDF5Writer Functions --------- .. autoapisummary:: dcnum.write.copy_basins dcnum.write.copy_features dcnum.write.copy_metadata dcnum.write.create_with_basins dcnum.write.set_default_filter_kwargs Package Contents ---------------- .. py:class:: ChunkWriter(path_out: pathlib.Path | dcnum.common.h5py.File, dq: collections.deque, write_queue_size: multiprocessing.sharedctypes.Synchronized, ds_kwds: dict | None = None, mode: str = 'a', parent_logger: logging.Logger | None = None, *args, **kwargs) Bases: :py:obj:`threading.Thread` Convenience class for writing to data outside the main loop Data are numpy arrays collected from a `dequeue` object :param path_out: Path to the output HDF5 file :param dq: `collections.deque` object from which data are taken using `popleft()`. :type dq: collections.deque :param write_queue_size: Multiprocessing value to which the size of `dq` is written periodically :param ds_kwds: keyword arguments for dataset creation, passed to :class:`.HDF5Writer` :param mode: HDF5 file opening mode, passed to :class:`.HDF5Writer` .. py:attribute:: writer .. py:attribute:: dq .. py:attribute:: may_stop_loop :value: False .. py:attribute:: must_stop_loop :value: False .. py:attribute:: write_queue_size .. py:method:: abort_loop() Force aborting the loop as soon as possible .. py:method:: finished_when_queue_empty() Stop the loop as soon as `self.dq` is empty .. py:method:: run() Method representing the thread's activity. You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. .. py:class:: EventStash(index_offset: int, feat_nevents: list[int]) Sort events into predefined arrays for bulk access :param index_offset: This is the index offset at which we are working on. Normally, `feat_nevents` is just a slice of a larger array and `index_offset` defines at which position it is taken. :param feat_nevents: List that defines how many events there are for each input frame. If summed up, this defines `self.size`. .. py:attribute:: events Dictionary containing the event arrays .. py:attribute:: feat_nevents List containing the number of events per input frame .. py:attribute:: nev_idx Cumulative sum of `feat_nevents` for determining sorting offsets .. py:attribute:: size Number of events in this stash .. py:attribute:: num_frames Number of frames in this stash .. py:attribute:: index_offset Global offset compared to the original data instance. .. py:attribute:: indices_for_data Array containing the indices in the original data instance. These indices correspond to the events in `events`. .. py:attribute:: _tracker Private array that tracks the progress. .. py:method:: is_complete() Determine whether the event stash is complete (all events added) .. py:method:: add_events(index, events) Add events to this stash :param index: Global index (from input dataset) :type index: int :param events: Event dictionary :type events: dict .. py:method:: require_feature(feat, sample_data) Create a new empty feature array in `self.events` and return it :param feat: Feature name :param sample_data: Sample data for one event of the feature (used to determine shape and dtype of the feature array) .. py:class:: QueueWriterProcess(log_queue: multiprocessing.Queue, *args, **kwargs) Bases: :py:obj:`dcnum.write.queue_writer_base.QueueWriterBase`, :py:obj:`mp_spawn` Write events from a queue to an .rtdc file Events coming from a queue cannot be guaranteed to be in order. The :class:`.QueueWriterThread` uses a :class:`.EventStash` to sort events into the correct order before sending them to the :class:`ChunkWriter` for storage. :param event_queue: A queue object to which other processes or threads write events as tuples `(frame_index, events_dict)`. :param write_queue_size: A `mp.Value` that is populated with the number of event chunks waiting to be written to the output file by the `ChunkWriter`. :param feat_nevents: This 1D array contains the number of events for each frame in the input data. This serves two purposes: (1) it allows us to determine how many events we are writing when we are writing data from `write_threshold` frames, and (2) it allows us to keep track how many frames have actually been processed (and thus we can expect entries in `event_queue` for). If an entry in this array is -1, this means that there is no event in `event_queue`. See `write_threshold` below. :param path_out: Output path for writer :param hdf5_dataset_kwargs: Dictionary of keyword arguments (e.g. "compression") for HDF5 dataset creation. :param write_threshold: This integer defines how many frames should be collected at once and put into `writer_dq`. For instance, with a value of 500, at least 500 items are taken from the `event_queue` (they should match the expected frame index, frame indices that do not match are kept in a :class:`.EventStash`). Then, for each frame, we may have multiple or None events, so the output size could be 513 which is computed via `np.sum(feat_nevents[idx:idx+write_threshold])`. .. py:attribute:: log_queue queue for logging .. py:method:: run(**kwargs) Start the writing process This method is intended to be run in a thread or process. `buffer_dq` and `writer_dq` are used for testing. .. py:class:: QueueWriterThread(*args, **kwargs) Bases: :py:obj:`dcnum.write.queue_writer_base.QueueWriterBase`, :py:obj:`threading.Thread` Write events from a queue to an .rtdc file Events coming from a queue cannot be guaranteed to be in order. The :class:`.QueueWriterThread` uses a :class:`.EventStash` to sort events into the correct order before sending them to the :class:`ChunkWriter` for storage. :param event_queue: A queue object to which other processes or threads write events as tuples `(frame_index, events_dict)`. :param write_queue_size: A `mp.Value` that is populated with the number of event chunks waiting to be written to the output file by the `ChunkWriter`. :param feat_nevents: This 1D array contains the number of events for each frame in the input data. This serves two purposes: (1) it allows us to determine how many events we are writing when we are writing data from `write_threshold` frames, and (2) it allows us to keep track how many frames have actually been processed (and thus we can expect entries in `event_queue` for). If an entry in this array is -1, this means that there is no event in `event_queue`. See `write_threshold` below. :param path_out: Output path for writer :param hdf5_dataset_kwargs: Dictionary of keyword arguments (e.g. "compression") for HDF5 dataset creation. :param write_threshold: This integer defines how many frames should be collected at once and put into `writer_dq`. For instance, with a value of 500, at least 500 items are taken from the `event_queue` (they should match the expected frame index, frame indices that do not match are kept in a :class:`.EventStash`). Then, for each frame, we may have multiple or None events, so the output size could be 513 which is computed via `np.sum(feat_nevents[idx:idx+write_threshold])`. .. py:class:: HDF5Writer(obj: dcnum.common.h5py.File | pathlib.Path | str, mode: str = 'a', ds_kwds: dict | None = None) Write deformability cytometry HDF5 data :param obj: object to instantiate the writer from; If this is already a :class:`h5py.File` object, then it is used, otherwise the argument is passed to :class:`h5py.File` :type obj: h5py.File | pathlib.Path | str :param mode: opening mode when using :class:`h5py.File` :type mode: str :param ds_kwds: keyword arguments with which to initialize new Datasets (e.g. compression) :type ds_kwds: dict .. py:attribute:: events .. py:attribute:: ds_kwds :value: None .. py:method:: __enter__() .. py:method:: __exit__(exc_type, exc_val, exc_tb) .. py:method:: close() .. py:method:: get_best_nd_chunks(item_shape, feat_dtype=np.float64) :staticmethod: Return best chunks for HDF5 datasets Chunking has performance implications. It’s recommended to keep the total size of dataset chunks between 10 KiB and 1 MiB. This number defines the maximum chunk size as well as half the maximum cache size for each dataset. .. py:method:: require_feature(feat: str, item_shape: tuple[int], feat_dtype: numpy.dtype, ds_kwds: dict | None = None, group_name: str = 'events') Create a new feature in the "events" group :param feat: name of the feature :type feat: str :param item_shape: shape for one event of this feature, e.g. for a scalar event, the shape would be `(1,)` and for an image, the shape could be `(80, 300)`. :type item_shape: tuple[int] :param feat_dtype: dtype of the feature :type feat_dtype: np.dtype :param ds_kwds: HDF5 Dataset keyword arguments (e.g. compression, fletcher32) :type ds_kwds: dict :param group_name: name of the HDF5 group where the feature should be written to; defaults to the "events" group, but a different group can be specified for storing e.g. internal basin features. :type group_name: str .. py:method:: store_basin(name: str, paths: list[str | pathlib.Path] | None = None, features: list[str] | None = None, description: str | None = None, mapping: numpy.ndarray | None = None, internal_data: dict | None = None, identifier: str | None = None) Write an HDF5-based file basin :param name: basin name; Names do not have to be unique. :type name: str :param paths: location(s) of the basin; must be None when storing internal data, a list of paths otherwise :type paths: list of str or pathlib.Path or None :param features: list of features provided by `paths` :type features: list of str :param description: optional string describing the basin :type description: str :param mapping: integer array with indices that map the basin dataset to this dataset :type mapping: 1D array :param internal_data: internal basin data to store; If this is set, then `features` and `paths` must be set to `None`. :type internal_data: dict of ndarrays :param identifier: the measurement identifier of the basin as computed by the :func:`~dcnum.read.hdf5_data.get_measurement_identifier` function. :type identifier: str .. py:method:: store_feature_chunk(feat, data, group_name='events') Store feature data The "chunk" implies that always chunks of data are stored, never single events. .. py:method:: store_log(log: str, data: list[str], override: bool = False) -> dcnum.common.h5py.Dataset Store log data Store the log data under the key `log`. The `data` kwarg must be a list of strings. If the log entry already exists, `ValueError` is raised unless ``override`` is set to True. .. py:function:: copy_basins(h5_src: dcnum.common.h5py.File, h5_dst: dcnum.common.h5py.File, internal_basins: bool = True) Reassemble basin data in the output file This does not just copy the datasets defined in the "basins" group, but it also loads the "basinmap?" features and stores them as new "basinmap?" features in the output file. .. py:function:: copy_features(h5_src: dcnum.common.h5py.File, h5_dst: dcnum.common.h5py.File, features: list[str], mapping: numpy.ndarray | None = None, ds_kwds: dict | None = None) Copy feature data from one HDF5 file to another The feature must not exist in the destination file. :param h5_src: Input HDF5File containing `features` in the "events" group :type h5_src: h5py.File :param h5_dst: Output HDF5File opened in write mode not containing `features` :type h5_dst: h5py.File :param features: List of features to copy from source to destination :type features: list[str] :param mapping: If given, contains indices in the input file that should be written to the output file. If set to None, all features are written. :type mapping: 1D array :param ds_kwds: keyword arguments with which to initialize new Datasets (e.g. compression); only relevant when `mapping is not None` .. py:function:: copy_metadata(h5_src: dcnum.common.h5py.File, h5_dst: dcnum.common.h5py.File) Copy attributes, tables, and logs from one H5File to another .. rubric:: Notes Metadata in `h5_dst` are never overridden, only metadata that are not defined already are added. .. py:function:: create_with_basins(path_out: str | pathlib.Path, basin_paths: list[str | pathlib.Path] | list[list[str | pathlib.Path]]) Create an .rtdc file with basins :param path_out: The output .rtdc file where basins are written to :param basin_paths: The paths to the basins written to `path_out`. This can be either a list of paths (to different basins) or a list of lists for paths (for basins containing the same information, commonly used for relative and absolute paths). .. py:function:: set_default_filter_kwargs(ds_kwds: dict | None = None, compression: bool = True)