Source code for dcnum.logic.ctrl

import datetime
import importlib
import json
import logging
from logging.handlers import QueueListener
import multiprocessing as mp
import os
import pathlib
import platform
import socket
import threading
import time
import traceback
import uuid

import numpy as np

from ..common import h5py, join_worker, start_workers_threaded
from ..feat.feat_background.base import get_available_background_methods
from ..segm import SegmenterManagerThread, get_segmenters
from ..meta import ppid
from ..read import HDF5Data, get_measurement_identifier, get_mapping_indices
from .._version import version, version_tuple
from ..write import (
    HDF5Writer, QueueWriterProcess, QueueWriterThread, copy_features,
    copy_metadata, create_with_basins,
)

from .job import DCNumPipelineJob
from .json_encoder import ExtendedJSONEncoder
from .slot_register import SlotRegister
from .universal_worker import UniversalWorkerProcess, UniversalWorkerThread


# Force using "spawn" method for multiprocessing, because we are using
# queues and threads and would end up with race conditions otherwise.
mp_spawn = mp.get_context("spawn")

valid_states = [
    "created",
    "init",
    "setup",
    "background",
    "segmentation",
    "plumbing",
    "cleanup",
    "done",
    "error",
]
"""Valid states for a `DCNumJobRunner`.
The states must be in logical order, not in alphabetical order.
"""


[docs] class DCNumJobRunner(threading.Thread): def __init__(self, job: DCNumPipelineJob, tmp_suffix: str | None = None, *args, **kwargs): """Run a pipeline as defined by a :class:`.job.DCNumPipelineJob` Parameters ---------- job: .job.DCNumPipelineJob pipeline job to run tmp_suffix: str optional unique string for creating temporary files (defaults to hostname) """ super(DCNumJobRunner, self).__init__(*args, **kwargs) self.error_tb = None self.job = job if tmp_suffix is None: tmp_suffix = f"{socket.gethostname()}_{str(uuid.uuid4())[:5]}" self.tmp_suffix = tmp_suffix self.ppid, self.pphash, self.ppdict = job.get_ppid(ret_hash=True, ret_dict=True) self.event_count = 0 self._data_raw = None self._data_temp_in = None # current job state self._state = "init" # overall progress [0, 1] self._progress_bg = None # background self._progress_ex = None # segmentation self._progress_bn = None # creating basins # segmentation frame rate self._segm_rate = 0 # Set up logging # General logger for this job self.main_logger = logging.getLogger("dcnum") self.main_logger.setLevel(job["log_level"]) # Log file output in target directory self.path_log = job["path_out"].with_suffix(".log") self.path_log.parent.mkdir(exist_ok=True, parents=True) self.path_log.unlink(missing_ok=True) self._log_file_handler = logging.FileHandler( filename=self.path_log, encoding="utf-8", delay=True, errors="ignore", ) # Set the log file handler level to DEBUG, so it logs everything # presented to it. self._log_file_handler.setLevel(logging.DEBUG) fmt = logging.Formatter( fmt="%(asctime)s %(levelname)s %(name)s: %(message)s", datefmt='%H:%M:%S' ) self._log_file_handler.setFormatter(fmt) self.main_logger.addHandler(self._log_file_handler) handlers = list(self.main_logger.handlers) # Queue for subprocesses to log to self.log_queue = mp_spawn.Queue() self._qlisten = QueueListener(self.log_queue, *handlers, respect_handler_level=True) self._qlisten.start() if job["debug"]: self.main_logger.info("Note that in debugging mode, duplicate " "log entries may appear (logs that are " "recorded via queues)") self.logger = logging.getLogger(f"dcnum.Runner-{self.pphash[:2]}")
[docs] def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): # If an error occurred, don't delete the log and basin files. delete_temporary_files = exc_type is None self.close(delete_temporary_files=delete_temporary_files)
@property def draw(self) -> HDF5Data: """Raw input data""" if self._data_raw is None: # Initialize with the proper kwargs (pixel_size) self._data_raw = HDF5Data(self.job["path_in"], **self.job["data_kwargs"]) return self._data_raw @property def dtin(self) -> HDF5Data: """Input data with (corrected) background image""" if self._data_temp_in is None: if not self.path_temp_in.exists(): # create basin-based input file create_with_basins(path_out=self.path_temp_in, basin_paths=[self.draw.path]) # Initialize with the proper kwargs (pixel_size) self._data_temp_in = HDF5Data(self.path_temp_in, **self.job["data_kwargs"]) assert len(self._data_temp_in) > 0 assert "image_bg" in self._data_temp_in return self._data_temp_in @property def path_temp_in(self): po = pathlib.Path(self.job["path_out"]) return po.with_name(po.stem + f"_input_bb_{self.tmp_suffix}.rtdc~") @property def path_temp_out(self): po = pathlib.Path(self.job["path_out"]) return po.with_name(po.stem + f"_output_{self.tmp_suffix}.rtdc~") @property def state(self): return self._state @state.setter def state(self, state): if state not in valid_states: raise ValueError(f"Invalid state '{state}' specified!") self._state = state
[docs] def close(self, delete_temporary_files=True): if self._data_raw is not None: self._data_raw.close() self._data_raw = None if self._data_temp_in is not None: self._data_temp_in.close() self._data_temp_in = None # clean up logging if self._log_file_handler in self.main_logger.handlers: self.main_logger.removeHandler(self._log_file_handler) self._log_file_handler.flush() self._log_file_handler.close() if self._qlisten is not None: self._qlisten.stop() self._qlisten = None self.log_queue.cancel_join_thread() self.log_queue.close() if delete_temporary_files: # Delete log file on disk self.path_log.unlink(missing_ok=True) # Delete temporary input file self.path_temp_in.unlink(missing_ok=True)
# We don't have to delete self.path_temp_out, since this one # is `rename`d to `self.job["path_out"]`.
[docs] def join(self, delete_temporary_files=True, *args, **kwargs): super(DCNumJobRunner, self).join(*args, **kwargs) # Close only after join self.close(delete_temporary_files=delete_temporary_files)
[docs] def get_status(self): # Compute the total progress. The following weights indicate # how much fractional time each processing step takes. bgw = 4 # fraction of background exw = 27 # fraction of segmentation and feature extraction if self.job["basin_strategy"] == "drain": drw = 15 # because data need to be copied else: drw = 1 # just creating the basins in output file clw = 1 # fraction of cleanup operations tot = bgw + exw + drw + clw progress = 0 st = self.state # background if valid_states.index(st) > valid_states.index("background"): # background already computed progress += bgw / tot elif self._progress_bg is not None: # This is the image count of the input dataset. progress += self._progress_bg.value * bgw / tot # segmentation if valid_states.index(st) > valid_states.index("segmentation"): # segmentation already done progress += exw / tot elif self._progress_ex is not None: progress += self._progress_ex * exw / tot # draining basins if valid_states.index(st) > valid_states.index("plumbing"): # plumbing already done progress += drw / tot if self._progress_bn is not None: progress += self._progress_bn * drw / tot if self.state == "done": progress = 1 return { "progress": progress, "segm rate": self._segm_rate, "state": self._state, }
[docs] def run(self): try: self.run_pipeline() except BaseException: self.state = "error" self.error_tb = traceback.format_exc() if not self.is_alive(): # Thread has not been started. This means we are not running # in a thread but in the main process. Raise the exception. raise
[docs] def run_pipeline(self): """Execute the pipeline job""" time_start = time.perf_counter() time_string = time.strftime("%Y-%m-%d-%H.%M.%S", time.gmtime()) self.logger.info(f"Run start: {time_string}") if self.job["path_out"].exists(): raise FileExistsError( f"Output file {self.job['path_out']} already exists!") # Make sure the output directory exists. self.job["path_out"].parent.mkdir(parents=True, exist_ok=True) self.state = "setup" # First get a list of all pipeline IDs. If the input file has # already been processed by dcnum, then we do not have to redo # everything. # Crucial here is the fact that we also compare the # "pipeline:dcnum hash" in case individual steps of the pipeline # have been run by a rogue data analyst. datdict = { "gen_id": self.draw.h5.attrs.get("pipeline:dcnum generation", "0"), "dat_id": self.draw.h5.attrs.get("pipeline:dcnum data", "0"), "bg_id": self.draw.h5.attrs.get("pipeline:dcnum background", "0"), "seg_id": self.draw.h5.attrs.get("pipeline:dcnum segmenter", "0"), "feat_id": self.draw.h5.attrs.get("pipeline:dcnum feature", "0"), "gate_id": self.draw.h5.attrs.get("pipeline:dcnum gate", "0"), } # The hash of a potential previous pipeline run. dathash = self.draw.h5.attrs.get("pipeline:dcnum hash", "0") # The number of events extracted in a potential previous pipeline run. evyield = self.draw.h5.attrs.get("pipeline:dcnum yield", -1) redo_sanity = ( # Whether pipeline hash is invalid. ppid.compute_pipeline_hash(**datdict) != dathash # Whether the input file is the original output of the pipeline. or len(self.draw) != evyield # If index mapping is defined, then we always redo the pipeline. # If the pipeline hashes are identical and index mapping is not # None, then both pipelines were done with index mapping. # But applying the same pipeline with index mapping in series # will lead to a different result in the second run (e.g. 1st # pipeline run: take every 2nd event; 2nd pipeline run: take # every second event -> results in every 4th event in output of # second pipeline run). or self.draw.index_mapping is not None ) # Do we have to recompute the background data? In addition to the # hash sanity check above, check the generation, input data, # and background pipeline identifiers. redo_bg = ( "image_bg" not in self.draw or (datdict["gen_id"] != self.ppdict["gen_id"]) or (datdict["dat_id"] != self.ppdict["dat_id"]) or (datdict["bg_id"] != self.ppdict["bg_id"])) # Do we have to rerun segmentation and feature extraction? Check # the segmentation, feature extraction, and gating pipeline # identifiers. redo_seg = ( redo_sanity or redo_bg or (datdict["seg_id"] != self.ppdict["seg_id"]) or (datdict["feat_id"] != self.ppdict["feat_id"]) or (datdict["gate_id"] != self.ppdict["gate_id"])) self.state = "background" if redo_bg: # The 'image_bg' feature is written to `self.path_temp_in`. # If `job["path_in"]` already has the correct 'image_bg' # feature, then we never reach this case here # (note that `self.path_temp_in` is basin-based). self.task_background() self.state = "segmentation" # We have the input data covered, and we have to run the # long-lasting segmentation and feature extraction step. # We are taking into account two scenarios: # A) The segmentation step is exactly the one given in the input # file. Here it is sufficient to use a basin-based # output file `self.path_temp_out`. # B) Everything else (including background pipeline mismatch or # different segmenters); Here, we simply populate `path_temp_out` # with the data from the segmenter. if redo_seg: # scenario B (Note this implies `redo_bg`) self.task_segment_extract() else: # scenario A # Access the temporary input HDF5Data so that the underlying # basin file is created and close it immediately afterward. self.dtin.close() self._data_temp_in = None # Note any new actions that work on `self.path_temp_in` are not # reflected in `self.path_temp_out`. self.path_temp_in.rename(self.path_temp_out) # Since no segmentation was done, the output file now does not # contain any events. This is not really what we wanted, but we # can still store all features in the output file if required. if self.job["basin_strategy"] == "drain": orig_feats = [] for feat in self.draw.h5["events"].keys(): if isinstance(self.draw.h5["events"][feat], h5py.Dataset): # copy_features does not support Groups orig_feats.append(feat) with h5py.File(self.path_temp_out, "a") as h5_dst: copy_features(h5_src=self.draw.h5, h5_dst=h5_dst, features=orig_feats, mapping=None) # Handle basin data according to the user's request self.state = "plumbing" self.task_enforce_basin_strategy() self.state = "cleanup" with HDF5Writer(self.path_temp_out) as hw: # pipeline metadata hw.h5.attrs["pipeline:dcnum generation"] = self.ppdict["gen_id"] hw.h5.attrs["pipeline:dcnum data"] = self.ppdict["dat_id"] hw.h5.attrs["pipeline:dcnum background"] = self.ppdict["bg_id"] hw.h5.attrs["pipeline:dcnum segmenter"] = self.ppdict["seg_id"] hw.h5.attrs["pipeline:dcnum feature"] = self.ppdict["feat_id"] hw.h5.attrs["pipeline:dcnum gate"] = self.ppdict["gate_id"] hw.h5.attrs["pipeline:dcnum hash"] = self.pphash hw.h5.attrs["pipeline:dcnum yield"] = self.event_count # index mapping information im = self.job.kwargs["data_kwargs"].get("index_mapping", None) dim = HDF5Data.get_ppid_index_mapping(im) hw.h5.attrs["pipeline:dcnum mapping"] = dim # regular metadata hw.h5.attrs["experiment:event count"] = self.event_count hw.h5.attrs["imaging:pixel size"] = self.draw.pixel_size # Add job information to resulting .rtdc file hw.store_log(f"dcnum-job-{time_string}", json.dumps({ "dcnum version": version_tuple, "job": self.job.__getstate__(), "pipeline": {"identifiers": self.ppdict, "hash": self.pphash, }, "python": { "build": ", ".join(platform.python_build()), "implementation": platform.python_implementation(), "libraries": get_library_versions_dict([ "cv2", "h5py", "mahotas", "mpmath", "networkx", "numba", "numpy", "scipy", "sympy", "torch", "torchvision", ]), "version": platform.python_version(), }, "system": { "info": platform.platform(), "machine": platform.machine(), "name": platform.system(), "release": platform.release(), "version": platform.version(), }, "tasks": {"background": redo_bg, "segmentation": redo_seg }, }, indent=2, sort_keys=True, cls=ExtendedJSONEncoder, ).split("\n")) # copy metadata/logs/tables from original file with h5py.File(self.job["path_in"]) as h5_src: copy_metadata(h5_src=h5_src, h5_dst=hw.h5) if redo_seg: # Store the correct measurement identifier. This is used to # identify this file as a correct basin in subsequent pipeline # steps, and it also makes sure that the original file cannot # become a basin by accident (we have different indexing). # This is the identifier appendix that we use to identify this # dataset. Note that we only override the run identifier when # segmentation did actually take place. mid_ap = f"dcn-{self.pphash[:7]}" # This is the current measurement identifier mid_cur = get_measurement_identifier(hw.h5) # The new measurement identifier is a combination of both. mid_new = f"{mid_cur}_{mid_ap}" if mid_cur else mid_ap hw.h5.attrs["experiment:run identifier"] = mid_new trun = datetime.timedelta( seconds=round(time.perf_counter() - time_start)) self.logger.info(f"Run duration: {str(trun)}") self.logger.info(time.strftime("Run stop: %Y-%m-%d-%H.%M.%S", time.gmtime())) # Add the log file to the resulting .rtdc file if self.path_log.exists(): with HDF5Writer(self.path_temp_out) as hw: hw.store_log( f"dcnum-log-{time_string}", self.path_log.read_text().strip().split("\n")) # Rename the output file self.path_temp_out.rename(self.job["path_out"]) self.state = "done"
[docs] def task_background(self): """Perform background computation task This populates the file `self.path_temp_in` with the 'image_bg' feature. """ self.logger.info("Starting background computation") if self._data_temp_in is not None: # Close the temporary input data file, so we can write to it. self._data_temp_in.close() self._data_temp_in = None # Start background computation bg_code = self.job["background_code"] bg_cls = get_available_background_methods()[bg_code] with bg_cls( input_data=self.job["path_in"], output_path=self.path_temp_in, # always compress, the disk is usually the bottleneck compress=True, num_cpus=self.job["num_procs"], # custom kwargs **self.job["background_kwargs"]) as bic: self._progress_bg = bic.image_proc bic.process() self.logger.info("Finished background computation")
[docs] def task_enforce_basin_strategy(self): """Transfer basin data from input files to output if requested The user specified the "basin_strategy" keyword argument in `self.job`. If this is set to "drain", then copy all basin information from the input file to the output file. If it is set to "tap", then only create basins in the output file. """ self._progress_bn = 0 t0 = time.perf_counter() # We have these points to consider: # - We must use the `basinmap` feature to map from the original # file to the output file. # - We must copy "bg_off" and "image_bg" to the output file. # - For the "drain" basin strategy, we also have to copy all the # other features. # - If "image_bg" is defined as an internal basin in the input # file, we have to convert the mapping and store a corresponding # internal basin in the output file. # Determine the basinmap feature with HDF5Writer(self.path_temp_out, ds_kwds=self.job.get_hdf5_dataset_kwargs(), ) as hw: hout = hw.h5 # First, we have to determine the basin mapping from input to # output. This information is stored by the QueueWriter # in the "basinmap0" feature, ready to be used by us. if "index_unmapped" in hout["events"]: # The unmapped indices enumerate the events in the output file # with indices from the mapped input file. E.g. if for the # first image in the input file, two events are found and for # the second image in the input file, three events are found, # then this would contain [0, 0, 1, 1, 1, ...]. If the index # mapping of the input file was set to slice(1, 100), then the # first image would not be there, and we would have # [1, 1, 1, ...]. idx_um = hout["events/index_unmapped"][:] # If we want to convert this to an actual basinmap feature, # then we have to convert those indices to indices that map # to the original input HDF5 file. raw_im = self.draw.index_mapping if raw_im is None: # Create a hard link to save time and space hout["events/basinmap0"] = hout["events/index_unmapped"] basinmap0 = idx_um else: self.logger.info("Converting input mapping") basinmap0 = get_mapping_indices(raw_im)[idx_um] # Store the mapped basin data in the output file. hw.store_feature_chunk("basinmap0", basinmap0) self.logger.info("Input mapped to output with basinmap0") # We don't need them anymore. del hout["events/index_unmapped"] # Note that `size_raw != (len(self.draw))` [sic!]. The former # is the size of the raw dataset and the latter is its mapped # size! size_raw = self.draw.h5.attrs["experiment:event count"] if (len(basinmap0) == size_raw and np.all(basinmap0 == np.arange(size_raw))): # This means that the images in the input overlap perfectly # with the images in the output, i.e. a "copy" segmenter # was used or something is very reproducible. # We set basinmap to None to be more efficient. basinmap0 = None else: # The input is identical to the output, because we are using # the same pipeline identifier. basinmap0 = None # List of features we have to copy from input to output. # We need to make sure that the features are correctly attributed # from the input files. E.g. if the input file already has # background images, but we recompute the background images, then # we have to use the data from the recomputed background file. # We achieve this by keeping a specific order and only copying # those features that we don't already have in the output file. feats_raw = [ # background data from the temporary input image [self.dtin.h5, ["bg_off"], "critical"], [self.draw.h5, self.draw.features_scalar_frame, "optional"], [self.draw.h5, ["image", "bg_off"], "optional"], ] # Store image_bg as an internal basin, if defined in input for idx in range(len(self.dtin.basins)): bn_dict = self.dtin.basins[idx] if (bn_dict["type"] == "internal" and "image_bg" in bn_dict["features"]): self.logger.info( "Copying internal basin background images") bn_grp, bn_feats, bn_map = self.dtin.get_basin_data(idx) assert "image_bg" in bn_feats # Load all images into memory (should only be ~600) bg_images1 = self.dtin.h5["basin_events"]["image_bg"][:] # Get the original internal mapping for these images # Note that `basinmap0` always refers to indices in the # original raw input file, and not to indices in an # optional mapped input file (using `index_mapping`). # Therefore, we do `self.dtin.h5["events"]["basinmap0"]` # instead of `self.dtin["basinmap0"]` basinmap_in = self.dtin.h5["events"][bn_dict["mapping"]][:] # Now we have to convert the indices in `basinmap_in` # to indices in the output file. basinmap1 = basinmap_in[basinmap0] # Store the internal mapping in the output file hw.store_basin(name=bn_dict["name"], description=bn_dict["description"], mapping=basinmap1, internal_data={"image_bg": bg_images1} ) break else: self.logger.info("Background images must be copied") # There is no internal image_bg feature, probably because # the user did not use the sparsemed background correction. # In this case, we simply add "image_bg" to the `feats_raw`. feats_raw += [ [self.dtin.h5, ["image_bg"], "critical"], [self.draw.h5, ["image_bg"], "optional"], ] # Copy the features required in the output file. for hin, feats, importance in feats_raw: # Only consider features that are available in the input # and that are not already in the output. feats = [f for f in feats if (f in hin["events"] and f not in hout["events"])] if not feats: continue elif (self.job["basin_strategy"] == "drain" or importance == "critical"): # DRAIN: Copy all features over to the output file. self.logger.debug(f"Transferring {feats} to output file") copy_features( h5_src=hin, h5_dst=hout, features=feats, mapping=basinmap0, ds_kwds=self.job.get_hdf5_dataset_kwargs(), ) else: # TAP: Create basins for the "optional" features in the # output file. Note that the "critical" features never # reach this case. self.logger.debug(f"Creating basin for {feats}") # Relative and absolute paths. pin = pathlib.Path(hin.filename).resolve() paths = [pin] pout = pathlib.Path(hout.filename).resolve().parent try: paths.append(os.path.relpath(pin, pout)) except ValueError: # This means it is impossible to compute a relative # path (e.g. different drive letter on Windows). pass hw.store_basin(name="dcnum basin", features=feats, mapping=basinmap0, paths=paths, description=f"Created with dcnum {version}", identifier=get_measurement_identifier(hin), ) self._progress_bn += 1 / len(feats_raw) t_tot = time.perf_counter() - t0 self.logger.info(f"Enforcing basin strategy time: {t_tot:.1f}s")
[docs] def task_segment_extract(self): self.logger.info("Starting segmentation and feature extraction") # Start segmentation thread seg_cls = get_segmenters()[self.job["segmenter_code"]] if self.job["debug"]: num_universal = 1 num_segmenters = 1 elif seg_cls.hardware_processor == "cpu": # MPO segmenter # Split segmentation and feature extraction workers evenly. num_universal = self.job["num_procs"] // 2 num_segmenters = self.job["num_procs"] - num_universal # Leave one CPU for the writer and the other threads. num_segmenters -= 1 else: # GPU segmenter num_universal = self.job["num_procs"] # Leave one CPU for the writer and the other threads. num_universal -= 1 num_segmenters = 1 num_universal = max(1, num_universal) num_segmenters = max(1, num_segmenters) self.job.kwargs["segmenter_kwargs"]["num_workers"] = num_segmenters # The number of ChunkSlots defines how well workers can operate in # parallel. This should be higher than the number states a ChunkSlot # can have to prevent workers from waiting on each other. Seven slots # results in a shared memory usage of roughly 1GB for a standard # blood measurement (image chunks of (1000, 80, 32). if self.job["debug"]: num_slots = 1 else: num_slots = 7 event_queue = mp_spawn.Queue() slot_register = SlotRegister(job=self.job, data=self.dtin, event_queue=event_queue, num_slots=num_slots) self.logger.debug(f"Number of slots: {num_slots}") self.logger.debug(f"Number of segmenters: {num_segmenters}") self.logger.debug(f"Number of universal workers: {num_universal}") if self.job["debug"]: worker_uni_cls = UniversalWorkerThread else: worker_uni_cls = UniversalWorkerProcess uni_workers = [] for _ in range(num_universal): uni_workers.append(worker_uni_cls(slot_register=slot_register, log_queue=self.log_queue, log_level=self.logger.level, )) thr_uw = start_workers_threaded(worker_list=uni_workers, logger=self.logger, name="UniversalWorker", ) # Initialize segmenter manager thread worker_segm = SegmenterManagerThread( segmenter=seg_cls(debug=self.job["debug"], **self.job["segmenter_kwargs"]), slot_register=slot_register, ) worker_segm.start() # Start the data collection and writer thread if self.job["debug"]: worker_write = QueueWriterThread( event_queue=event_queue, write_queue_size=slot_register.counters["write_queue_size"], feat_nevents=slot_register.feat_nevents, path_out=self.path_temp_out, hdf5_dataset_kwargs=self.job.get_hdf5_dataset_kwargs(), write_threshold=500, ) else: worker_write = QueueWriterProcess( event_queue=event_queue, write_queue_size=slot_register.counters["write_queue_size"], feat_nevents=slot_register.feat_nevents, path_out=self.path_temp_out, hdf5_dataset_kwargs=self.job.get_hdf5_dataset_kwargs(), write_threshold=500, log_queue=self.log_queue, ) worker_write.start() data_size = len(self.dtin) t0 = time.perf_counter() # So in principle we are done here. We do not have to do anything # besides monitoring the progress. while True: counted_frames = worker_write.written_frames.value self.event_count = worker_write.written_events.value td = time.perf_counter() - t0 # set the current status self._progress_ex = counted_frames / data_size self._segm_rate = counted_frames / (td or 0.03) time.sleep(.1) if counted_frames == data_size: break slot_register.state = "q" self.logger.info( f"Data load time: " f"{slot_register.get_time('task_load_all'):.1f}s") self.logger.info( f"Labeling time: " f"{slot_register.get_time('task_label_masks'):.1f}s") self.logger.info( f"Label processing time: " f"{slot_register.get_time('task_process_labels'):.1f}s") self.logger.info( f"Feature extraction time: " f"{slot_register.get_time('task_extract_features'):.1f}s") inv_masks = slot_register.masks_dropped if inv_masks: self.logger.info(f"Encountered {inv_masks} invalid masks") inv_frac = inv_masks / slot_register.num_frames if inv_frac > 0.005: # warn above one half percent self.logger.warning(f"Discarded {inv_frac:.1%} of the masks, " f"please check segmenter applicability") self.logger.debug("Flushing data to disk") slot_register.close() # join threads join_worker(worker=thr_uw, logger=self.logger, name="worker starter") join_worker(worker=worker_segm, logger=self.logger, name="segmentation") join_worker(worker=worker_write, timeout=600, logger=self.logger, name="collector for writer") # Join universal workers after writer, because the writer will make # sure that all frames are accounted for, and there might be # a problem with joining the universal workers when the queues are # not empty. for worker_uni in uni_workers: join_worker(worker=worker_uni, logger=self.logger, name="universal worker") self.event_count = worker_write.written_events.value if self.event_count == 0: self.logger.error( f"No events found in {self.draw.path}! Please check the " f"input file or revise your pipeline") else: self.logger.info("Finished segmentation and feature extraction")
[docs] def get_library_versions_dict(library_name_list): version_dict = {} for library_name in library_name_list: try: lib = importlib.import_module(library_name) except BaseException: version = None else: version = lib.__version__ version_dict[library_name] = version return version_dict