Source code for dcnum.logic.chunk_slot

import multiprocessing as mp

import numpy as np

from ..segm import get_segmenters

from .chunk_slot_data import ChunkSlotData


mp_spawn = mp.get_context("spawn")


[docs] class ChunkSlot(ChunkSlotData): _instance_counter = 0 def __init__(self, job, data, is_remainder=False): ChunkSlot._instance_counter += 1 self.index = ChunkSlot._instance_counter self.job = job """Job information object""" self.data = data """Input data object""" self.is_remainder = is_remainder """Whether this slot only applies to the last chunk""" self.seg_cls = get_segmenters()[self.job["segmenter_code"]] """Segmentation class""" if self.is_remainder: length = self.data.image.get_chunk_size( chunk_index=self.data.image.num_chunks - 1) else: length = self.data.image.chunk_size super(ChunkSlot, self).__init__( shape=(length,) + self.data.image.shape[1:], available_features=self.data.keys(), )
[docs] def __repr__(self): return (f"<dcnum ChunkSlot {self.index} (state {self.state}) " f"with chunk {self.chunk} at {hex(id(self))}>")
[docs] def load(self, idx): """Load chunk `idx` into `self.mp_image` and return numpy views""" # create views on image arrays image = self.image image[:] = self.data.image.get_chunk(idx) if self.mp_image_bg is not None: image_bg = self.image_bg image_bg[:] = self.data.image_bg.get_chunk(idx) image_corr = self.image_corr image_corr[:] = np.asarray(image, dtype=np.int16) - image_bg else: image_bg = None image_corr = None if self.mp_bg_off is not None: bg_off = self.bg_off chunk_slice = self.data.image.get_chunk_slice(idx) bg_off[:] = self.data["bg_off"][chunk_slice] else: bg_off = None # TODO: Check for duplicate, consecutive images while loading data # and store that information in a boolean array. This can speed-up # segmentation and feature extraction. self.chunk = idx return image, image_bg, image_corr, bg_off