import collections
import copy
import inspect
import logging
import pathlib
import re
from typing import Literal
from ..common import LazyLoader, cpu_count
from ..feat import QueueEventExtractor
from ..feat.feat_background.base import get_available_background_methods
from ..feat.gate import Gate
from ..meta.ppid import compute_pipeline_hash, DCNUM_PPID_GENERATION
from ..read import HDF5Data
from ..segm import get_segmenters
hdf5plugin = LazyLoader("hdf5plugin")
[docs]
class DCNumPipelineJob:
def __init__(self,
path_in: pathlib.Path | str,
path_out: pathlib.Path | str | None = None,
data_code: str = "hdf",
data_kwargs: dict | None = None,
background_code: str = "sparsemed",
background_kwargs: dict | None = None,
segmenter_code: str = "thresh",
segmenter_kwargs: dict | None = None,
feature_code: str = "legacy",
feature_kwargs: dict | None = None,
gate_code: str = "norm",
gate_kwargs: dict | None = None,
basin_strategy: Literal["drain", "tap"] = "drain",
compression: str = "zstd-5",
num_procs: int | None = None,
log_level: int = logging.INFO,
debug: bool = False,
):
"""Pipeline job recipe
Parameters
----------
path_in: pathlib.Path | str
input data path
path_out: pathlib.Path | str
output data path
data_code: str
identification code of input data reader to use
data_kwargs: dict
keyword arguments for data reader
background_code: str
identification code of background data computation method
background_kwargs: dict
keyword arguments for background data computation method
segmenter_code: str
identification code of segmenter to use
segmenter_kwargs: dict
keyword arguments for segmenter
feature_code: str
identification code of feature extractor
feature_kwargs: dict
keyword arguments for feature extractor
gate_code: str
identification code for gating/event filtering class
gate_kwargs: dict
keyword arguments for gating/event filtering class
basin_strategy: str
strategy on how to handle event data; In principle, not all
events have to be stored in the output file if basins are
defined, linking back to the original file.
- You can "drain" all basins which means that the output file
will contain all features, but will also be very big.
- You can "tap" the basins, including the input file, which means
that the output file will be comparatively small.
compression: str
compression algorithm to use; Set this to "none" to disable
compression. Currently, only the Zstandard compression
algorithm may be used, with the least compression "zstd-1"
and the best compression "zstd-9". The default "zstd-5" is
a trade-off. Set the compression to a higher number if the
bottleneck is disk-IO. Set the compression to a lower number
if the bottleneck is the CPU. Note that "zstd-5" is the
accepted minimum compression setting for long-term data
storage in the DC universe (enforced e.g. by DCOR-Aid).
num_procs: int
Number of processes to use
log_level: int
Logging level to use.
debug: bool
Whether to set logging level to "DEBUG" and
use threads instead of processes
"""
self.kwargs = {}
"""initialize keyword arguments for this job"""
spec = inspect.getfullargspec(DCNumPipelineJob.__init__)
locs = locals()
for arg in spec.args:
if arg == "self":
continue
value = locs[arg]
if value is None and spec.annotations[arg] == dict | None:
value = {}
self.kwargs[arg] = value
# Set default pixel size for this job
if "pixel_size" not in self.kwargs["data_kwargs"]:
# Extract from input file
with HDF5Data(path_in) as hd:
self.kwargs["data_kwargs"]["pixel_size"] = hd.pixel_size
# Set default output path
if path_out is None:
pin = pathlib.Path(path_in)
path_out = pin.with_name(pin.stem + "_dcn.rtdc")
# Set logging level to DEBUG in debugging mode
if self.kwargs["debug"]:
self.kwargs["log_level"] = logging.DEBUG
self.kwargs["path_out"] = pathlib.Path(path_out)
# Set default mask kwargs for segmenter
self.kwargs["segmenter_kwargs"].setdefault("kwargs_mask", {})
# Set default number of processes
if num_procs is None:
self.kwargs["num_procs"] = cpu_count()
[docs]
def __getitem__(self, item):
return copy.deepcopy(self.kwargs[item])
[docs]
def __getstate__(self):
state = copy.deepcopy(self.kwargs)
return state
[docs]
def __setstate__(self, state):
if not hasattr(self, "kwargs"):
self.kwargs = {}
self.kwargs.clear()
self.kwargs.update(copy.deepcopy(state))
[docs]
def assert_pp_codes(self):
"""Sanity check of `self.kwargs`"""
# PPID classes with only one option
for cls, key in [
(HDF5Data, "data_code"),
(Gate, "gate_code"),
(QueueEventExtractor, "feature_code"),
]:
code_act = self.kwargs[key]
code_exp = cls.get_ppid_code()
if code_act != code_exp:
raise ValueError(f"Invalid code '{code_act}' for '{key}', "
f"expected '{code_exp}'!")
# PPID classes with multiple options
for options, key in [
(get_available_background_methods(), "background_code"),
(get_segmenters(), "segmenter_code"),
]:
code_act = self.kwargs[key]
if code_act not in options:
raise ValueError(f"Invalid code '{code_act}' for '{key}', "
f"expected one of '{options}'!")
[docs]
def get_hdf5_dataset_kwargs(self) -> dict:
"""Validate and return output HDF5 Dataset keyword arguments
"""
cp = str(self.kwargs["compression"]).lower().strip()
ds_kw = {"fletcher32": True}
if cp == "none":
# No compression
ds_kw["compression"] = None
ds_kw["compression_opts"] = None
elif re.match("^zstd-[1-9]$", cp):
# Zstandard compression
clevel = int(cp[-1])
for key, val in dict(hdf5plugin.Zstd(clevel=clevel)).items():
ds_kw[key] = val
else:
raise ValueError(f"Unsupported compression setting '{cp}'")
return ds_kw
[docs]
def get_ppid(self, ret_hash=False, ret_dict=False):
self.assert_pp_codes()
pp_hash_kw = collections.OrderedDict()
pp_hash_kw["gen_id"] = DCNUM_PPID_GENERATION
for pp_kw, cls, cls_kw in [
("dat_id", HDF5Data, "data_kwargs"),
("bg_id",
get_available_background_methods()[
self.kwargs["background_code"]],
"background_kwargs"),
("seg_id",
get_segmenters()[self.kwargs["segmenter_code"]],
"segmenter_kwargs"),
("feat_id", QueueEventExtractor, "feature_kwargs"),
("gate_id", Gate, "gate_kwargs"),
]:
pp_hash_kw[pp_kw] = cls.get_ppid_from_ppkw(self.kwargs[cls_kw])
ppid = "|".join(pp_hash_kw.values())
ret = [ppid]
if ret_hash:
pp_hash = compute_pipeline_hash(**pp_hash_kw)
ret.append(pp_hash)
if ret_dict:
ret.append(pp_hash_kw)
if len(ret) == 1:
ret = ret[0]
return ret
[docs]
def get_segmenter_class(self):
"""Return the class of the segmenter associated with this job"""
return get_segmenters()[self.kwargs["segmenter_code"]]
[docs]
def validate(self):
"""Make sure the pipeline will run given the job kwargs
Returns
-------
True:
for testing convenience
Raises
------
dcnum.segm.SegmenterNotApplicableError:
the segmenter is incompatible with the input path
"""
# Check segmenter applicability applicability
seg_cls = self.get_segmenter_class()
with HDF5Data(self.kwargs["path_in"]) as hd:
seg_cls.validate_applicability(
segmenter_kwargs=self.kwargs["segmenter_kwargs"],
logs=hd.logs,
meta=hd.meta)
return True