Source code for dcnum.write.chunk_writer

from __future__ import annotations

import collections
import logging
from multiprocessing.sharedctypes import Synchronized
import pathlib
import threading
import time
import traceback

from ..common import h5py

from .writer import HDF5Writer


[docs] class ChunkWriter(threading.Thread): def __init__(self, path_out: pathlib.Path | h5py.File, # type: ignore dq: collections.deque, write_queue_size: Synchronized, ds_kwds: dict | None = None, mode: str = "a", parent_logger: logging.Logger | None = None, *args, **kwargs): """Convenience class for writing to data outside the main loop Data are numpy arrays collected from a `dequeue` object Parameters ---------- path_out: Path to the output HDF5 file dq: collections.deque `collections.deque` object from which data are taken using `popleft()`. write_queue_size: Multiprocessing value to which the size of `dq` is written periodically ds_kwds: keyword arguments for dataset creation, passed to :class:`.HDF5Writer` mode: HDF5 file opening mode, passed to :class:`.HDF5Writer` """ super(ChunkWriter, self).__init__(*args, **kwargs) if parent_logger is None: self.logger = logging.getLogger("dcnum.write.ChunkWriter") else: self.logger = parent_logger.getChild("ChunkWriter") if mode == "w": path_out.unlink(missing_ok=True) self.writer = HDF5Writer(path_out, mode=mode, ds_kwds=ds_kwds) self.dq = dq self.may_stop_loop = False self.must_stop_loop = False self.write_queue_size = write_queue_size
[docs] def abort_loop(self): """Force aborting the loop as soon as possible""" self.must_stop_loop = True
[docs] def finished_when_queue_empty(self): """Stop the loop as soon as `self.dq` is empty""" self.may_stop_loop = True
[docs] def run(self): time_tot = 0 try: while True: ldq = len(self.dq) self.write_queue_size.value = ldq if self.must_stop_loop: break elif ldq: t0 = time.perf_counter() for _ in range(ldq): feat, data = self.dq.popleft() self.writer.store_feature_chunk(feat=feat, data=data) self.write_queue_size.value -= 1 time_tot += time.perf_counter() - t0 elif self.may_stop_loop: break else: # wait for the next item to arrive time.sleep(.1) except BaseException: self.logger.error(traceback.format_exc()) self.logger.info(f"Disk time: {time_tot:.1f}s") self.writer.close()