import queue
import time
import numpy as np
from ...common import LazyLoader
from ...os_env_st import RequestSingleThreaded, confirm_single_threaded
from ...read import HDF5Data
from .base import mp_spawn, Background
ndi = LazyLoader('scipy.ndimage')
[docs]
class BackgroundSparseMed(Background):
def __init__(self, input_data, output_path, kernel_size=200,
split_time=1., thresh_cleansing=0, frac_cleansing=.8,
offset_correction=True,
compress=True,
num_cpus=None):
"""Sparse median background correction with cleansing
In contrast to the rolling median background correction,
this algorithm only computes the background image every
``split_time`` seconds, but with a larger window (default kernel
size is 200 frames instead of 100 frames).
1. At time stamps every `split_time` seconds, a background image is
computed, resulting in a background series.
2. Cleansing: The background series is checked for images that
contain event data using a lengthy algorithm that is documented
in the source code (sorry). In short, this gets rid of
background images that contain streaks of RBCs.
3. Each frame gets the background image closest to it
based on time from the background series.
Parameters
----------
input_data: array-like or pathlib.Path
The input data can be either a path to an HDF5 file with
the "evtens/image" dataset or an array-like object that
behaves like an image stack (first axis enumerates events).
output_path: pathlib.Path
Path to the output file. If `input_data` is a path, you can
set `output_path` to the same path to write directly to the
input file. The data are written in the "events/image_bg"
dataset in the output file.
kernel_size: int
Kernel size for median computation. This is the number of
events that are used to compute the median for each pixel.
split_time: float
Time between background images in the background series
thresh_cleansing: float
A positive floating point value for scaling the thresholding
operation when excluding background images from the series.
Larger values mean more background images are excluded.
Set to zero to enforce a fixed fraction via `frac_cleansing`.
frac_cleansing: float
Fraction between 0 and 1 indicating how many background images
must still be present after cleansing (in case the cleansing
factor is too large). Set to 1 to disable cleansing altogether.
offset_correction: bool
The sparse median background correction produces one median
image for multiple input frames (BTW this also leads to very
efficient data storage with internal HDF5 basins). In
case the input frames are subject to frame-by-frame brightness
variations (e.g. flickering of the illumination source), it
is useful to have an offset value per frame that can then be
used in a later step to perform a more accurate background
correction. This offset is computed here by taking a 20px wide
slice from each frame (where the channel wall is located)
and computing the median therein relative to the computed
background image. The data are written to the "bg_off" feature
in the output file alongside "image_bg". To obtain the
corrected background image, add "image_bg" and "bg_off".
Set this to False if you don't need the "bg_off" feature.
compress: bool
Whether to compress background data. Set this to False
for faster processing.
num_cpus: int
Number of CPUs to use for median computation. Defaults to
`dcnum.common.cpu_count()`.
.. versionchanged:: 0.23.5
The background image data are stored as an internal
mapped basin to reduce the output file size.
"""
super(BackgroundSparseMed, self).__init__(
input_data=input_data,
output_path=output_path,
compress=compress,
num_cpus=num_cpus,
kernel_size=kernel_size,
split_time=split_time,
thresh_cleansing=thresh_cleansing,
frac_cleansing=frac_cleansing,
offset_correction=offset_correction,
)
if kernel_size > len(self.input_data):
self.logger.warning(
f"The kernel size {kernel_size} is too large for input data"
f"size {len(self.input_data)}. Setting it to input data size!")
kernel_size = len(self.input_data)
self.kernel_size = kernel_size
"""kernel size used for median filtering"""
self.split_time = split_time
"""time between background images in the background series"""
self.thresh_cleansing = thresh_cleansing
"""cleansing threshold factor"""
self.frac_cleansing = frac_cleansing
"""keep at least this many background images from the series"""
self.offset_correction = offset_correction
"""offset/flickering correction"""
# time axis
self.time = None
if self.h5in is not None:
hd = HDF5Data(self.h5in)
if "time" in hd:
# use actual time from dataset
self.time = hd["time"][:]
self.time -= self.time[0]
elif "imaging:frame rate" in hd.meta:
fr = hd.meta["imaging:frame rate"]
if "frame" in hd:
# compute time from frame rate and frame numbers
self.time = hd["frame"] / fr
self.time -= self.time[0]
else:
# compute time using frame rate (approximate)
dur = self.image_count / fr * 1.5
self.logger.info(
f"Approximating duration: {dur/60:.1f}min")
self.time = np.linspace(0, dur, self.image_count,
endpoint=True)
if self.time is None:
# No HDF5 file or no information therein; Make an educated guess.
dur = self.image_count / 3600 * 1.5
self.logger.info(f"Guessing duration: {dur/60:.1f}min")
self.time = np.linspace(0, dur, self.image_count,
endpoint=True)
self.duration = self.time[-1] - self.time[0]
"""duration of the measurement"""
self.step_times = np.arange(0, self.duration, self.split_time)
self.bg_images = np.zeros((self.step_times.size,
self.image_shape[0],
self.image_shape[1]),
dtype=np.uint8)
"""array containing all background images"""
self.shared_input_raw = mp_spawn.RawArray(
np.ctypeslib.ctypes.c_uint8,
int(np.prod(self.image_shape)) * kernel_size)
"""mp.RawArray for temporary batch input data"""
self.shared_output_raw = mp_spawn.RawArray(
np.ctypeslib.ctypes.c_uint8,
int(np.prod(self.image_shape)))
"""mp.RawArray for the median background image"""
# Convert the RawArray to something we can write to fast
# (similar to memoryview, but without having to cast) using
# np.ctypeslib.as_array. See discussion in
# https://stackoverflow.com/questions/37705974
self.shared_input = np.ctypeslib.as_array(
self.shared_input_raw).reshape(kernel_size, -1)
"""numpy array reshaped view on `self.shared_input_raw`.
The First axis enumerating the events
"""
self.shared_output = np.ctypeslib.as_array(
self.shared_output_raw).reshape(self.image_shape)
"""numpy array reshaped view on `self.shared_output_raw`.
The First axis enumerating the events
"""
self.worker_counter = mp_spawn.Value("q", 0)
"""counter tracking process of workers"""
self.queue = mp_spawn.Queue()
"""queue for median computation jobs"""
self.workers = [WorkerSparseMed(self.queue,
self.worker_counter,
self.shared_input_raw,
self.shared_output_raw,
self.kernel_size)
for _ in range(self.num_cpus)]
"""list of workers (processes)"""
tw0 = time.perf_counter()
[w.start() for w in self.workers]
self.logger.info(f"{len(self.workers)} worker spawn time: "
f"{time.perf_counter() - tw0:.1}s")
[docs]
def __enter__(self):
return self
[docs]
def __exit__(self, type, value, tb):
self.worker_counter.value = -1000
[w.join() for w in self.workers]
super(BackgroundSparseMed, self).__exit__(type, value, tb)
[docs]
@staticmethod
def check_user_kwargs(*,
kernel_size: int = 200,
split_time: float = 1.,
thresh_cleansing: float = 0,
frac_cleansing: float = .8,
offset_correction: bool = True,
):
"""Initialize user-defined properties of this class
This method primarily exists so that the CLI knows which
keyword arguments can be passed to this class.
Parameters
----------
kernel_size: int
Kernel size for median computation. This is the number of
events that are used to compute the median for each pixel.
split_time: float
Time between background images in the background series
thresh_cleansing: float
A positive floating point value for scaling the thresholding
operation when excluding background images from the series.
Larger values mean more background images are excluded.
Set to 0 (default) to enforce a fixed fraction `frac_cleansing`.
frac_cleansing: float
Fraction between 0 and 1 indicating how many background images
must still be present after cleansing (in case the cleansing
factor is too large). Set to 1 to disable cleansing altogether.
offset_correction: bool
The sparse median background correction produces one median
image for multiple input frames (BTW this also leads to very
efficient data storage with internal HDF5 basins). In
case the input frames are subject to frame-by-frame brightness
variations (e.g. flickering of the illumination source), it
is useful to have an offset value per frame that can then be
used in a later step to perform a more accurate background
correction. This offset is computed here by taking a 20px wide
slice from each frame (where the channel wall is located)
and computing the median therein relative to the computed
background image. The data are written to the "bg_off" feature
in the output file alongside "image_bg". To obtain the
corrected background image, add "image_bg" and "bg_off".
Set this to False if you don't need the "bg_off" feature.
"""
assert kernel_size > 0
assert split_time > 0
assert thresh_cleansing >= 0, "Cleansing threshold must be >=0"
assert frac_cleansing > 0, "Cleansing fraction must be >0"
assert frac_cleansing <= 1, "Cleansing fraction must be <=1"
[docs]
def process_approach(self):
"""Perform median computation on entire input data"""
# Compute initial background images (populates self.bg_images)
for ii, ti in enumerate(self.step_times):
self.process_second(ii, ti)
if self.frac_cleansing != 1:
# The following algorithm finds background images that contain
# event information (this happens when the median background step
# gets input images where at a certain position there are mostly
# cells, i.e. there are too many cells in all images). Ideally,
# background images don't contain event information. Normally,
# the events are red blood cells which are dark and have a strong
# effect.
# For each of those images, compute the ptp profile (ptp along the
# channel axis)
bg_prof = np.ptp(self.bg_images, axis=2)
# compute the median of those profiles along the channel axis
bg_prof_med = np.median(bg_prof, axis=0)
# normalize the profiles
bg_prof_norm = bg_prof - bg_prof_med.reshape(1, -1)
# compute the mean at the center for each profile;
width = bg_prof.shape[1]
spread = max(20, width // 4)
cslice = slice(width // 2 - spread, width // 2 + spread)
# If you plot this line, you will already see outliers (e.g. use
# the leukocytes.rtdc reference measurement).
bg_prof_norm_cent = np.mean(bg_prof_norm[:, cslice], axis=1)
# To reliably remove outliers, we still need a constant baseline
# which we achieve by computing the median filter. The size
# of 10 is just a best-guess and makes sense since it's time-based
# and not frame-based. This would most-likely not work if you
# applied this in `BackgroundRollMed`.
bg_profiles_norm_cent_med = ndi.median_filter(
bg_prof_norm_cent, size=10)
# Until now, we are basically looking at the peak-to-peak grayscale
# values from which a median was subtracted.
x = bg_prof_norm_cent - bg_profiles_norm_cent_med
ref = np.abs(x - np.median(x))
thresh_fact = np.var(ref) * 150
if self.thresh_cleansing != 0:
# Try a simple thresholding approach.
thresh = thresh_fact / self.thresh_cleansing
else:
# Force a certain quantile fraction to be removed
thresh = np.quantile(ref, self.frac_cleansing)
used = ref <= thresh
frac_remove = np.sum(~used) / used.size
# Check whether we can trust the current selection
if (self.thresh_cleansing != 0
and (1 - frac_remove) < self.frac_cleansing):
# This did not work at all.
# use quantiles instead
frac_remove_user = frac_remove
thresh = np.quantile(ref, self.frac_cleansing)
used = ref <= thresh
frac_remove = np.sum(~used) / used.size
self.logger.warning(
f"{frac_remove_user:.1%} of the background images would "
f"be removed with the current settings, so we enforce "
f"`frac_cleansing`. To avoid this warning, try decreasing "
f"`thresh_cleansing` or `frac_cleansing`. The new "
f"threshold is {thresh_fact / thresh}.")
self.logger.info(f"Cleansed {frac_remove:.2%}")
step_times = self.step_times[used]
bg_images = self.bg_images[used]
else:
self.logger.info("Background series cleansing disabled")
step_times = self.step_times
bg_images = self.bg_images
# Assign each frame to a certain background index
bg_idx = np.zeros(self.image_count, dtype=int)
idx0 = 0
idx1 = None
for ii in range(len(step_times)):
t1 = step_times[ii]
idx1 = np.argmin(np.abs(self.time - t1 - self.split_time/2))
bg_idx[idx0:idx1] = ii
idx0 = idx1
if idx1 is not None:
# Fill up remainder of index array with last entry
bg_idx[idx1:] = ii
# Store the background images as an internal mapped basin
self.writer.store_basin(
name="background images",
description=f"Pipeline identifier: {self.get_ppid()}",
mapping=bg_idx,
internal_data={"image_bg": bg_images}
)
# store the offset correction, if applicable
if self.offset_correction:
self.logger.info("Computing offset correction")
# compute the mean at the top of all background images
sh, sw = self.input_data.shape[1:]
roi_full = (slice(None), slice(0, 20), slice(0, sw))
bg_data_mean = np.mean(bg_images[roi_full], axis=(1, 2))
pos = 0
step = self.writer.get_best_nd_chunks(item_shape=(sh, sw),
feat_dtype=np.uint8)[0]
bg_off = np.zeros(self.image_count, dtype=float)
# For every chunk in the input image data, compute that
# value as well and store the resulting offset value.
# TODO: Could this be parallelized, or are we limited in reading?
while pos < self.image_count:
stop = min(pos + step, self.image_count)
# Record background offset correction "bg_off". We take a
# slice of 20px from the top of the image (there are normally
# no events here, only the channel walls are visible).
cur_slice = slice(pos, stop)
# mean background brightness
val_bg = bg_data_mean[bg_idx[cur_slice]]
# mean image brightness
roi_cur = (cur_slice, slice(0, 20), slice(0, sw))
val_dat = np.mean(self.input_data[roi_cur], axis=(1, 2))
# background image = image_bg + bg_off
bg_off[cur_slice] = val_dat - val_bg
# set progress
self.image_proc.value = 0.5 * (1 + pos / self.image_count)
pos = stop
# finally, store the background offset feature
self.writer.store_feature_chunk("bg_off", bg_off)
self.image_proc.value = 1
[docs]
def process_second(self,
ii: int,
second: float | int):
idx_start = np.argmin(np.abs(second - self.time))
idx_stop = idx_start + self.kernel_size
if idx_stop >= self.image_count:
idx_stop = self.image_count
idx_start = max(0, idx_stop - self.kernel_size)
assert idx_stop - idx_start == self.kernel_size
# The following is equivalent to, but faster than:
# self.bg_images[ii] = np.median(self.input_data[idx_start:idx_stop],
# axis=0)
self.worker_counter.value = 0
self.shared_input[:] = self.input_data[idx_start:idx_stop].reshape(
self.kernel_size, -1)
num_jobs = 0
# Cut the image into jobs with ival=500 pixels which seems
# optimal on Paul's laptop.
height, width = self.image_shape
start = 0
ival = 500
smax = height * width
while start < smax:
args = (slice(start, start+ival),)
start += ival
self.queue.put(args)
num_jobs += 1
# block until workers are done
while True:
time.sleep(.01)
if self.worker_counter.value == num_jobs:
break
self.bg_images[ii] = self.shared_output.reshape(self.image_shape)
self.image_proc.value = idx_stop / (
# with offset correction, everything is slower
self.image_count * (1 + self.offset_correction))
[docs]
class WorkerSparseMed(mp_spawn.Process):
def __init__(self, job_queue, counter, shared_input, shared_output,
kernel_size, *args, **kwargs):
"""Worker process for median computation"""
super(WorkerSparseMed, self).__init__(*args, **kwargs)
self.queue = job_queue
self.queue.cancel_join_thread()
self.counter = counter
self.shared_input_raw = shared_input
self.shared_output_raw = shared_output
self.kernel_size = kernel_size
[docs]
def run(self):
"""Main loop of worker process (breaks when `self.counter` <0)"""
# confirm single-threadedness (prints to log)
confirm_single_threaded()
# Create the ctypes arrays here instead of during __init__, because
# for some reason they are copied in __init__ and not mapped.
shared_input = np.ctypeslib.as_array(
self.shared_input_raw).reshape(self.kernel_size, -1)
shared_output = np.ctypeslib.as_array(self.shared_output_raw)
while True:
if self.counter.value < 0:
break
try:
args = self.queue.get(timeout=.1)
except queue.Empty:
pass
else:
job_slice = args[0]
# Compute the median of a subslice of the array.
# Use np.partition which has less overhead than np.median
# (code copy-pasted from np.median):
kth = shared_input.shape[0] // 2
part = np.partition(a=shared_input[:, job_slice],
kth=kth,
axis=0)
# Note that we only partition at `kth`, regardless of the
# input size. This is ok, because we are only interested
# in integers anyway and +/- one grayscale value does not
# really matter.
shared_output[job_slice] = part[kth]
# shared_output[job_slice] = np.median(
# shared_input[:, job_slice],
# axis=0,
# overwrite_input=False)
with self.counter.get_lock():
self.counter.value += 1
[docs]
def start(self):
# Set all relevant os environment variables such libraries in the
# new process only use single-threaded computation.
with RequestSingleThreaded():
mp_spawn.Process.start(self)