Source code for dcnum.meta.ppid

from __future__ import annotations

import collections
import hashlib
import inspect
import pathlib
import types
import typing
import warnings

import numpy as np


DCNUM_PPID_GENERATION = "13"
"""The dcnum pipeline generation.
Increment this string if there are breaking changes that make
previous pipelines unreproducible.
"""


[docs] class ClassWithPPIDCapabilities(typing.Protocol):
[docs] def get_ppid(self) -> str: """full pipeline identifier for the class (instance method)""" return ""
[docs] def get_ppid_code(self) -> str: """string representing the class in the pipeline (classmethod)""" return ""
[docs] def get_ppid_from_ppkw(self) -> str: """pipeline identifier from specific pipeline keywords (classmethod)""" return ""
[docs] def get_ppkw_from_ppid(self) -> dict: """class keywords from full pipeline identifier (staticmethod)""" return {}
[docs] def compute_pipeline_hash(*, bg_id, seg_id, feat_id, gate_id, dat_id="unknown", gen_id=DCNUM_PPID_GENERATION): hasher = hashlib.md5() hasher.update("|".join([ gen_id, dat_id, bg_id, seg_id, feat_id, gate_id]).encode()) pph = hasher.hexdigest() return pph
[docs] def convert_to_dtype(value, dtype): """Convert an object to the correct dtype If `dtype` is a Union of types, or a list of types, the first non-NoneType type is used for conversion. """ if dtype is bool: if isinstance(value, str): if value.lower() in ["true", "yes"]: value = True elif value.lower() in ["false", "no"]: value = False value = bool(float(value)) elif dtype in [pathlib.Path, pathlib.Path | str]: value = str(value) else: if typing.get_origin(dtype) in {typing.Union, types.UnionType}: dtype = dtype.__args__ if isinstance(dtype, (list, tuple)): for dtarg in dtype: if dtarg is type(None) or dtarg is None: continue else: value = dtarg(value) break else: raise ValueError( f"Could not convert {value=} of type {type(value)} " f"to type {dtype}") else: value = dtype(value) return value
[docs] def get_class_method_info(class_obj: ClassWithPPIDCapabilities, static_kw_methods: list | None = None, static_kw_defaults: dict | None = None, ) -> dict[str, typing.Any]: """Return dictionary of class info with static keyword methods docs Parameters ---------- class_obj: object Class to inspect, must implement the `key` method. static_kw_methods: list of callable The methods to inspect; all kwargs-only keyword arguments are extracted. static_kw_defaults: dict If a key in this dictionary matches an item in `static_kw_methods`, then these are the default values returned in the "defaults" dictionary. This is used in cases where a base class does implement some annotations, but the subclass does not actually use them, because e.g. they are taken from a property such as is the case for the mask postprocessing of segmenter classes. """ if static_kw_defaults is None: static_kw_defaults = {} doc = class_obj.__doc__ or class_obj.__init__.__doc__ or "" info: dict[str, typing.Any] = { "code": class_obj.get_ppid_code(), "doc": doc, "title": doc.split("\n")[0], } if static_kw_methods: defau = collections.OrderedDict() annot = collections.OrderedDict() for mm in static_kw_methods: meth = getattr(class_obj, mm) spec = inspect.getfullargspec(meth) if mm_defaults := static_kw_defaults.get(mm): defau[mm] = mm_defaults else: defau[mm] = spec.kwonlydefaults or {} annot[mm] = spec.annotations for k, v in annot[mm].items(): annot[mm][k] = simple_type_eval(v) info["defaults"] = defau info["annotations"] = annot return info
[docs] def kwargs_to_ppid(cls: ClassWithPPIDCapabilities, method: str, kwargs: dict, allow_invalid_keys: bool = True): info = get_class_method_info(cls, [method, "__init__"]) concat_strings = [] if info["defaults"][method]: kwdefaults = info["defaults"][method] kwdefaults_init = info["defaults"]["__init__"] kw_false = (set(kwargs.keys()) - set(kwdefaults.keys()) - set(kwdefaults_init.keys())) if kw_false: # This should not have happened. msg = (f"Invalid kwargs {kw_false} specified for method " f"'{method}'! Valid kwargs are" f"{sorted(kwdefaults.keys())}. If you wrote this " f"segmenter and had to implement `__init__`, make sure " f"that it accepts all kwonly-arguments its super class " f"accepts. If this is not the case, you are probably " f"passing invalid kwargs to the segmenter." ) if allow_invalid_keys: warnings.warn(msg, UserWarning) else: raise KeyError(msg) kwannot = info["annotations"][method] kws = list(kwdefaults.keys()) kws_abrv = get_unique_prefix(kws) for kw, abr in zip(kws, kws_abrv): val = kwargs.get(kw, kwdefaults[kw]) if kwannot[kw] in [pathlib.Path, str | pathlib.Path]: # If we have paths as arguments, only use the filename path = pathlib.Path(val) if path.exists(): val = path.name if isinstance(val, (bool, np.bool_)): val = int(val) # do not print e.g. "True" elif isinstance(val, (float, np.floating)): if val == int(val): val = int(val) # omit the ".0" at the end concat_strings.append(f"{abr}={val}") return "^".join(concat_strings)
[docs] def ppid_to_kwargs(cls, method, ppid): """Convert pipeline method id to method keyword arguments Notes ----- Keep in mind that when a `method` is changed in a later version, new keyword arguments should always be put AT THE VERY END of the keyword list. Otherwise, might will be ambiguities regarding the abbreviated keys! """ info = get_class_method_info(cls, [method]) items = ppid.split("^") kwargs = {} if info["defaults"][method] and items: # assemble the individual entries entries = [] for abr, val in [it.split("=") for it in items]: entries.append((abr, val)) # sort the entries according to their length # (This is not really necessary, but increases robustness.) entries = sorted(entries, key=lambda x: -len(x[0])) # populate default keyword arguments kwargs.update(info["defaults"][method]) # keep the keys in their original order, such that we are # backwards-compatible with shorter pipeline identifiers keys = list(kwargs.keys()) # determine the correct values by iterating through the info used_keys = [] for abr_key, val in entries: for full_key in keys: if full_key not in used_keys and full_key.startswith(abr_key): annot = info["annotations"][method][full_key] kwargs[full_key] = convert_to_dtype(val, annot) used_keys.append(full_key) break else: raise ValueError(f"Unknown abbreviated key '{abr_key}'!") return kwargs
[docs] def simple_type_eval(type_string: str | type) -> str | type | list: """Return the type encoded by a string, e.g. "bool" -> bool If `type_string` is already a type, it is passed through. If there is no rule to convert `type_string` to a type, `type_string` is returned as-is. If `type_string` represents a union of types (using '|'), then a list of types is returned. """ if isinstance(type_string, str): type_string = type_string.strip() if type_string.count("|"): try: return [simple_type_eval(ts) for ts in type_string.split("|")] except BaseException: pass if type_string in ["bool", "dict", "float", "int", "str"]: return eval(type_string) elif type_string == "None": return type(None) return type_string
[docs] class AbrvStr: def __init__(self, string): self.string = string self.abrv_lengths = [1] # initialize with minimum length 1
[docs] def __getitem__(self, item): return self.string.__getitem__(item)
@property def abrv(self): return self.string[:max(self.abrv_lengths)]
[docs] def meet(self, other): assert self.string != other.string if len(self.string) >= len(other.string): a, b = other, self else: a, b = self, other al = 1 bl = 1 while b[:bl].startswith(a[:al]): if bl == len(a.string): bl += 1 break else: al += 1 bl += 1 a.abrv_lengths.append(al) b.abrv_lengths.append(bl)
[docs] def get_unique_prefix(str_list): """Find unique prefix for a list of strings Parameters ---------- str_list: list of str List of strings to abbreviate """ size = len(str_list) abrv_str_list = [AbrvStr(a) for a in str_list] for ii in range(size): for jj in range(size): if ii != jj: abrv_str_list[ii].meet(abrv_str_list[jj]) return [a.abrv for a in abrv_str_list]