Source code for dcnum.feat.queue_event_extractor

"""Feature Extraction: event extractor worker"""
from __future__ import annotations

import logging
import multiprocessing as mp
import typing

import numpy as np

from ..common import LazyLoader
from ..meta.ppid import kwargs_to_ppid, ppid_to_kwargs

from .gate import Gate


if typing.TYPE_CHECKING:
    from ..logic import SlotRegister


feat_brightness = LazyLoader("feat_brightness", __name__)
feat_contour = LazyLoader("feat_contour", __name__)
feat_texture = LazyLoader("feat_texture", __name__)


[docs] class QueueEventExtractor: def __init__(self, slot_register: SlotRegister, pixel_size: float, gate: Gate, event_queue: mp.Queue, extract_kwargs: dict | None = None, logger: logging.Logger | None = None): """Event extraction from label images This class is used for extracting events from label images. Events are appended to the `event_queue` for the writer. Parameters ---------- slot_register: .logic.slot_register.SlotRegister Chunk slot register pixel_size: Imaging pixel size gate: .gate.Gate Gating rules. event_queue: Queue in which the worker puts the extracted event feature data. extract_kwargs: Keyword arguments for the extraction process. See the keyword-only arguments in :func:`QueueEventExtractor.get_events_from_masks`. logger: Logger to use """ self.slot_register = slot_register """Chunk slot register""" self.pixel_size = pixel_size """Imaging pixel size""" self.gate = gate """Gating information""" self.event_queue = event_queue """queue with event-wise feature dictionaries""" self.logger = logger or logging.getLogger(__name__) # Keyword arguments for data extraction if extract_kwargs is None: extract_kwargs = {} extract_kwargs.setdefault("brightness", True) extract_kwargs.setdefault("haralick", True) self.extract_kwargs = extract_kwargs """Feature extraction keyword arguments."""
[docs] def get_events_from_masks(self, masks, chunk_slot, sub_index, *, brightness: bool = True, haralick: bool = True, volume: bool = True, ): """Get events dictionary, performing event-based gating""" events = {"mask": masks} image = chunk_slot.image[sub_index][np.newaxis] image_bg = chunk_slot.image_bg[sub_index][np.newaxis] image_corr = chunk_slot.image_corr[sub_index][np.newaxis] if chunk_slot.bg_off is not None: bg_off = chunk_slot.bg_off[sub_index] else: bg_off = None events.update( feat_contour.moments_based_features( masks, pixel_size=self.pixel_size, ret_contour=volume, )) if brightness: events.update(feat_brightness.brightness_features( mask=masks, image=image, image_bg=image_bg, bg_off=bg_off, image_corr=image_corr )) if haralick: events.update(feat_texture.haralick_texture_features( mask=masks, image=image, image_corr=image_corr, )) if volume: events.update(feat_contour.volume_from_contours( contour=events.pop("contour"), # remove contour from events! pos_x=events["pos_x"], pos_y=events["pos_y"], pixel_size=self.pixel_size, )) # gating on feature arrays if self.gate.box_gates: valid = self.gate.gate_events(events) gated_events = {} for key in events: gated_events[key] = events[key][valid] else: gated_events = events # removing events with invalid features valid_events = {} # the valid key-value pair was added in `moments_based_features` and # its only purpose is to mark events with invalid contours as such, # so they can be removed here. Resolves issue #9. valid = gated_events.pop("valid") invalid = ~valid # The following might lead to a computational overhead, if only a few # events are invalid, because then all 2d-features need to be copied # over from gated_events to valid_events. In our experience, and # especially with U-Net-based segmentation, invalid events happen # rarely. if np.any(invalid): with self.slot_register.get_counter_lock("masks_dropped"): self.slot_register.masks_dropped += np.sum(invalid) for key in gated_events: valid_events[key] = gated_events[key][valid] else: valid_events = gated_events return valid_events
[docs] def get_masks_from_label(self, label): """Get masks, performing mask-based gating""" # Using np.unique is a little slower than iterating over lmax # unu = np.unique(label) # background is 0 lmax = np.max(label) masks = [] for jj in range(1, lmax+1): # first item is 0 mask_jj = label == jj mask_sum = np.sum(mask_jj) if mask_sum and self.gate.gate_mask(mask_jj, mask_sum=mask_sum): masks.append(mask_jj) return np.array(masks)
[docs] def get_ppid(self): """Return a unique feature extractor pipeline identifier The pipeline identifier is universally applicable and must be backwards-compatible (future versions of dcnum will correctly acknowledge the ID). The feature extractor pipeline ID is defined as:: KEY:KW_APPROACH Where KEY is e.g. "legacy", and KW_APPROACH is a list of keyword-only arguments for `get_events_from_masks`, e.g.:: brightness=True^haralick=True which may be abbreviated to:: b=1^h=1 """ return self.get_ppid_from_ppkw(self.extract_kwargs)
[docs] @classmethod def get_ppid_code(cls): return "legacy"
[docs] @classmethod def get_ppid_from_ppkw(cls, kwargs): """Return the pipeline ID for this event extractor""" code = cls.get_ppid_code() cback = kwargs_to_ppid(cls, "get_events_from_masks", kwargs) return ":".join([code, cback])
[docs] @staticmethod def get_ppkw_from_ppid(extr_ppid): code, pp_extr_kwargs = extr_ppid.split(":") if code != QueueEventExtractor.get_ppid_code(): raise ValueError( f"Could not find extraction method '{code}'!") kwargs = ppid_to_kwargs(cls=QueueEventExtractor, method="get_events_from_masks", ppid=pp_extr_kwargs) return kwargs
[docs] def process_label(self, index): """Process one label image, extracting masks and features""" chunk = index // self.slot_register.chunk_size sub_index = index % self.slot_register.chunk_size # Fetch the chunk slot we are supposed to be working on for chunk_slot in self.slot_register: if chunk_slot.chunk == chunk: break else: raise ValueError(f"Could not find slot for {chunk=}") images = chunk_slot.image # Check for duplicates # TODO: Check for duplicate images when loading data in ChunkSlot, # and make that information available as a boolean array. if sub_index == 0: # We have to check whether the last image from the previous # chunk matches the current image. data = self.slot_register.data if (chunk != 0 and np.all(images[sub_index] == data.image[index - 1])): # skip duplicate events that have been analyzed already return None else: if np.all(images[sub_index] == images[sub_index - 1]): # skip duplicate events that have been analyzed already return None masks = self.get_masks_from_label(chunk_slot.labels[sub_index]) if masks.size: events = self.get_events_from_masks( masks=masks, chunk_slot=chunk_slot, sub_index=sub_index, **self.extract_kwargs) else: events = None return events