import abc
import functools
import inspect
import logging
import multiprocessing as mp
import pathlib
import time
from ...common import cpu_count, h5py
from ...meta import ppid
from ...read import HDF5Data, md5sum
from ...write import HDF5Writer, create_with_basins, set_default_filter_kwargs
# All subprocesses should use 'spawn' to avoid issues with threads
# and 'fork' on POSIX systems.
mp_spawn = mp.get_context('spawn')
[docs]
class Background(abc.ABC):
def __init__(self, input_data, output_path, compress=True, num_cpus=None,
**kwargs):
"""Base class for background computation
Parameters
----------
input_data: array-like or pathlib.Path
The input data can be either a path to an HDF5 file with
the "evtens/image" dataset or an array-like object that
behaves like an image stack (first axis enumerates events)
output_path: pathlib.Path
Path to the output file. If `input_data` is a path, you can
set `output_path` to the same path to write directly to the
input file. The data are written in the "events/image_bg"
dataset in the output file.
compress: bool
Whether to compress background data. Set this to False
for faster processing.
num_cpus: int
Number of CPUs to use for median computation. Defaults to
`dcnum.common.cpu_count()`.
kwargs:
Additional keyword arguments passed to the subclass.
"""
self.logger = logging.getLogger(
f"dcnum.feat.feat_background.{self.__class__.__name__}")
# proper conversion to Path objects
output_path = pathlib.Path(output_path)
self.output_path = output_path
if isinstance(input_data, str):
input_data = pathlib.Path(input_data)
# kwargs checks
self.check_user_kwargs(**kwargs)
# Using spec is not really necessary here, because kwargs are
# fully populated for background computation, but this might change.
spec = inspect.getfullargspec(self.check_user_kwargs)
self.kwargs = spec.kwonlydefaults or {}
"""background keyword arguments"""
self.kwargs.update(kwargs)
if num_cpus is None:
num_cpus = cpu_count()
self.num_cpus = num_cpus
"""number of CPUs used"""
self.image_proc = mp_spawn.Value("d", 0)
"""fraction of images that have been processed"""
self.hdin = None
"""HDF5Data instance for input data"""
self.h5in = None
"""input h5py.File"""
self.h5out = None
"""output h5py.File"""
self.paths_ref = []
"""reference paths for logging to the output .rtdc file"""
# Check whether user passed an array or a path
if isinstance(input_data, pathlib.Path):
# Compute MD5 sum before opening the file so that we don't
# get a file-locking issue (PermissionError) on Windows.
md5_5m = md5sum(input_data, blocksize=65536, count=80)
if str(input_data.resolve()) == str(output_path.resolve()):
self.h5in = h5py.File(input_data, "a", libver="latest")
self.h5out = self.h5in
else:
self.paths_ref.append(input_data)
self.h5in = h5py.File(input_data, "r", libver="latest")
# TODO: Properly setup HDF5 caching.
# Right now, we are accessing the raw h5ds property of
# the ImageCache. We have to go via the ImageCache route,
# because HDF5Data properly resolves basins and the image
# feature might be in a basin.
self.hdin = HDF5Data(self.h5in, md5_5m=md5_5m)
self.input_data = self.hdin.image.h5ds
else:
self.input_data = input_data
self.image_shape = self.input_data[0].shape
"""shape of event images"""
self.image_count = len(self.input_data)
"""number of images in the input data"""
if self.h5out is None:
if not output_path.exists():
# If the output path does not exist, then we create
# an output file with basins (for user convenience).
create_with_basins(path_out=output_path,
basin_paths=self.paths_ref)
# "a", because output file already exists
self.h5out = h5py.File(output_path, "a", libver="latest")
# Initialize writer
self.writer = HDF5Writer(
obj=self.h5out,
ds_kwds=set_default_filter_kwargs(compression=compress),
)
[docs]
def __enter__(self):
return self
[docs]
def __exit__(self, type, value, tb):
self.writer.close()
# Close h5in and h5out
if self.hdin is not None: # we have an input file
self.hdin.close() # this closes self.h5in
if self.h5in is not self.h5out and self.h5out is not None:
self.h5out.close()
[docs]
@abc.abstractmethod
def check_user_kwargs(self, **kwargs):
"""Implement this to check the kwargs during init"""
[docs]
def get_ppid(self):
"""Return a unique background pipeline identifier
The pipeline identifier is universally applicable and must
be backwards-compatible (future versions of dcnum will
correctly acknowledge the ID).
The segmenter pipeline ID is defined as::
KEY:KW_BACKGROUND
Where KEY is e.g. "sparsemed" or "rollmed", and KW_BACKGROUND is a
list of keyword arguments for `check_user_kwargs`, e.g.::
kernel_size=100^batch_size=10000
which may be abbreviated to::
k=100^b=10000
"""
return self.get_ppid_from_ppkw(self.kwargs)
[docs]
@classmethod
def get_ppid_code(cls):
if cls is Background:
raise ValueError("Cannot get `key` for `Background` base class!")
key = cls.__name__.lower()
if key.startswith("background"):
key = key[10:]
return key
[docs]
@classmethod
def get_ppid_from_ppkw(cls, kwargs):
"""Return the PPID based on given keyword arguments for a subclass"""
code = cls.get_ppid_code()
cback = ppid.kwargs_to_ppid(cls, "check_user_kwargs", kwargs)
return ":".join([code, cback])
[docs]
@staticmethod
def get_ppkw_from_ppid(bg_ppid):
"""Return keyword arguments for any subclass from a PPID string"""
code, pp_check_user_kwargs = bg_ppid.split(":")
for bg_code in get_available_background_methods():
if bg_code == code:
cls = get_available_background_methods()[bg_code]
break
else:
raise ValueError(
f"Could not find background computation method '{code}'!")
kwargs = ppid.ppid_to_kwargs(cls=cls,
method="check_user_kwargs",
ppid=pp_check_user_kwargs)
return kwargs
[docs]
def get_progress(self):
"""Return progress of background computation, float in [0,1]"""
if self.image_count == 0:
return 0.
else:
return self.image_proc.value
[docs]
def process(self):
"""Perform the background computation
This irreversibly removes/overrides any "image_bg" and
"bg_off" features defined in the output file `self.h5out`.
"""
t0 = time.perf_counter()
# Delete any old background data
for ds_key in ["image_bg", "bg_off"]:
for grp_key in ["events", "basin_events"]:
if grp_key in self.h5out and ds_key in self.h5out[grp_key]:
del self.h5out[grp_key][ds_key]
# Perform the actual background computation
self.process_approach()
bg_ppid = self.get_ppid()
# Store pipeline information in the image_bg/bg_off feature
for ds_key in ["image_bg", "bg_off"]:
for grp_key in ["events", "basin_events"]:
if grp_key in self.h5out and ds_key in self.h5out[grp_key]:
self.h5out[f"{grp_key}/{ds_key}"].attrs[
"dcnum ppid background"] = bg_ppid
self.h5out[F"{grp_key}/{ds_key}"].attrs[
"dcnum ppid generation"] = ppid.DCNUM_PPID_GENERATION
self.logger.info(
f"Background computation time: {time.perf_counter()-t0:.1f}s")
[docs]
@abc.abstractmethod
def process_approach(self):
"""The actual background computation approach"""
[docs]
@functools.cache
def get_available_background_methods():
"""Return dictionary of background computation methods"""
methods = {}
for cls in Background.__subclasses__():
methods[cls.get_ppid_code()] = cls
return methods