dcnum.write.queue_writer_base ============================= .. py:module:: dcnum.write.queue_writer_base Attributes ---------- .. autoapisummary:: dcnum.write.queue_writer_base.mp_spawn Classes ------- .. autoapisummary:: dcnum.write.queue_writer_base.QueueWriterBase Module Contents --------------- .. py:data:: mp_spawn :value: None .. py:class:: QueueWriterBase(event_queue: multiprocessing.Queue, write_queue_size: multiprocessing.Value, feat_nevents: multiprocessing.Array, path_out: pathlib.Path, hdf5_dataset_kwargs: dict = None, write_threshold: int = 500, *args, **kwargs) Write events from a queue to an .rtdc file Events coming from a queue cannot be guaranteed to be in order. The :class:`.QueueWriterThread` uses a :class:`.EventStash` to sort events into the correct order before sending them to the :class:`ChunkWriter` for storage. :param event_queue: A queue object to which other processes or threads write events as tuples `(frame_index, events_dict)`. :param write_queue_size: A `mp.Value` that is populated with the number of event chunks waiting to be written to the output file by the `ChunkWriter`. :param feat_nevents: This 1D array contains the number of events for each frame in the input data. This serves two purposes: (1) it allows us to determine how many events we are writing when we are writing data from `write_threshold` frames, and (2) it allows us to keep track how many frames have actually been processed (and thus we can expect entries in `event_queue` for). If an entry in this array is -1, this means that there is no event in `event_queue`. See `write_threshold` below. :param path_out: Output path for writer :param hdf5_dataset_kwargs: Dictionary of keyword arguments (e.g. "compression") for HDF5 dataset creation. :param write_threshold: This integer defines how many frames should be collected at once and put into `writer_dq`. For instance, with a value of 500, at least 500 items are taken from the `event_queue` (they should match the expected frame index, frame indices that do not match are kept in a :class:`.EventStash`). Then, for each frame, we may have multiple or None events, so the output size could be 513 which is computed via `np.sum(feat_nevents[idx:idx+write_threshold])`. .. py:attribute:: log_level :value: 0 logging level (inherited from 'dcnum' logger) .. py:attribute:: event_queue Event queue from which to collect event data .. py:attribute:: write_queue_size Number of event chunks waiting to be written to the output file .. py:attribute:: feat_nevents shared array containing the number of events for each frame in `data`. .. py:attribute:: path_out HDF5 output path .. py:attribute:: hdf5_dataset_kwargs :value: None HDF5 dataset creation keyword arguments .. py:attribute:: write_threshold :value: 500 Number of frames to send to `writer_dq` at a time. .. py:attribute:: written_events Number of events sent to `writer_dq` .. py:attribute:: written_frames Number of frames from `data` written to `writer_dq` .. py:method:: get_logger() .. py:method:: run(buffer_dq=None, writer_dq=None, logger=None) Start the writing process This method is intended to be run in a thread or process. `buffer_dq` and `writer_dq` are used for testing.