dcnum.logic

Logic for running the dcnum pipeline

Submodules

Classes

ChunkSlot

DCNumJobRunner

Run a pipeline as defined by a job.DCNumPipelineJob

DCNumPipelineJob

Pipeline job recipe

ExtendedJSONEncoder

Constructor for JSONEncoder, with sensible defaults.

SlotRegister

A register for `ChunkSlot`s for shared memory access

UniversalWorkerProcess

Process objects represent activity that is run in a separate process

UniversalWorkerThread

This constructor should always be called with keyword arguments. Arguments are:

Package Contents

class dcnum.logic.ChunkSlot(job, data, is_remainder=False)[source]

Bases: dcnum.logic.chunk_slot_data.ChunkSlotData

_instance_counter = 0
index = 0
job

Job information object

data

Input data object

is_remainder = False

Whether this slot only applies to the last chunk

seg_cls

Segmentation class

__repr__()[source]
load(idx)[source]

Load chunk idx into self.mp_image and return numpy views

class dcnum.logic.DCNumJobRunner(job: dcnum.logic.job.DCNumPipelineJob, tmp_suffix: str | None = None, *args, **kwargs)[source]

Bases: threading.Thread

Run a pipeline as defined by a job.DCNumPipelineJob

Parameters:
  • job (.job.DCNumPipelineJob) – pipeline job to run

  • tmp_suffix (str) – optional unique string for creating temporary files (defaults to hostname)

error_tb = None
job
tmp_suffix = None
event_count = 0
_data_raw = None
_data_temp_in = None
_state = 'init'
_progress_bg = None
_progress_ex = None
_progress_bn = None
_segm_rate = 0
main_logger
path_log
_log_file_handler
log_queue
_qlisten
logger
__enter__()[source]
__exit__(exc_type, exc_val, exc_tb)[source]
property draw: dcnum.read.HDF5Data

Raw input data

property dtin: dcnum.read.HDF5Data

Input data with (corrected) background image

property path_temp_in
property path_temp_out
property state
close(delete_temporary_files=True)[source]
join(delete_temporary_files=True, *args, **kwargs)[source]

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

get_status()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

run_pipeline()[source]

Execute the pipeline job

task_background()[source]

Perform background computation task

This populates the file self.path_temp_in with the ‘image_bg’ feature.

task_enforce_basin_strategy()[source]

Transfer basin data from input files to output if requested

The user specified the “basin_strategy” keyword argument in self.job. If this is set to “drain”, then copy all basin information from the input file to the output file. If it is set to “tap”, then only create basins in the output file.

task_segment_extract()[source]
class dcnum.logic.DCNumPipelineJob(path_in: pathlib.Path | str, path_out: pathlib.Path | str | None = None, data_code: str = 'hdf', data_kwargs: dict | None = None, background_code: str = 'sparsemed', background_kwargs: dict | None = None, segmenter_code: str = 'thresh', segmenter_kwargs: dict | None = None, feature_code: str = 'legacy', feature_kwargs: dict | None = None, gate_code: str = 'norm', gate_kwargs: dict | None = None, basin_strategy: Literal['drain', 'tap'] = 'drain', compression: str = 'zstd-5', num_procs: int | None = None, log_level: int = logging.INFO, debug: bool = False)[source]

Pipeline job recipe

Parameters:
  • path_in (pathlib.Path | str) – input data path

  • path_out (pathlib.Path | str) – output data path

  • data_code (str) – identification code of input data reader to use

  • data_kwargs (dict) – keyword arguments for data reader

  • background_code (str) – identification code of background data computation method

  • background_kwargs (dict) – keyword arguments for background data computation method

  • segmenter_code (str) – identification code of segmenter to use

  • segmenter_kwargs (dict) – keyword arguments for segmenter

  • feature_code (str) – identification code of feature extractor

  • feature_kwargs (dict) – keyword arguments for feature extractor

  • gate_code (str) – identification code for gating/event filtering class

  • gate_kwargs (dict) – keyword arguments for gating/event filtering class

  • basin_strategy (str) –

    strategy on how to handle event data; In principle, not all events have to be stored in the output file if basins are defined, linking back to the original file.

    • You can “drain” all basins which means that the output file will contain all features, but will also be very big.

    • You can “tap” the basins, including the input file, which means that the output file will be comparatively small.

  • compression (str) – compression algorithm to use; Set this to “none” to disable compression. Currently, only the Zstandard compression algorithm may be used, with the least compression “zstd-1” and the best compression “zstd-9”. The default “zstd-5” is a trade-off. Set the compression to a higher number if the bottleneck is disk-IO. Set the compression to a lower number if the bottleneck is the CPU. Note that “zstd-5” is the accepted minimum compression setting for long-term data storage in the DC universe (enforced e.g. by DCOR-Aid).

  • num_procs (int) – Number of processes to use

  • log_level (int) – Logging level to use.

  • debug (bool) – Whether to set logging level to “DEBUG” and use threads instead of processes

kwargs

initialize keyword arguments for this job

__getitem__(item)[source]
__getstate__()[source]
__setstate__(state)[source]
assert_pp_codes()[source]

Sanity check of self.kwargs

get_hdf5_dataset_kwargs() dict[source]

Validate and return output HDF5 Dataset keyword arguments

get_ppid(ret_hash=False, ret_dict=False)[source]
get_segmenter_class()[source]

Return the class of the segmenter associated with this job

validate()[source]

Make sure the pipeline will run given the job kwargs

Returns:

for testing convenience

Return type:

True

Raises:

dcnum.segm.SegmenterNotApplicableError: – the segmenter is incompatible with the input path

class dcnum.logic.ExtendedJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.JSONEncoder

Constructor for JSONEncoder, with sensible defaults.

If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.

If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.

If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.

If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.

If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (’, ‘, ‘: ‘) if indent is None and (‘,’, ‘: ‘) otherwise. To get the most compact JSON representation, you should specify (‘,’, ‘:’) to eliminate whitespace.

If specified, default is a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a TypeError.

default(o)[source]

Extended JSON encoder for the dcnum logic

This JSON encoder can handle the following additional objects:

  • pathlib.Path

  • integer numbers

  • numpy boolean

  • slices (via “PYTHON-SLICE” identifier)

class dcnum.logic.SlotRegister(job: dcnum.logic.job.DCNumPipelineJob, data: dcnum.read.HDF5Data, event_queue: multiprocessing.Queue | None = None, num_slots: int = 3)[source]

A register for `ChunkSlot`s for shared memory access

The SlotRegister manages all ChunkSlot instances and implements methods to interact with individual `ChunkSlot`s.

job
data
event_queue
chunk_size
num_chunks
_slots = []
timers
counters
_state
num_frames

Total number of frames to process

feat_nevents

Number of events per frame Shared RawArray of length len(data) into which the number of events per frame is written.

__getitem__(idx)[source]
__iter__()[source]

Iterate over slots, sorted by current chunk number

__len__()[source]
property chunks_loaded

A process-safe counter for the number of chunks loaded

This number increments as SlotRegister.task_load_all is called.

property masks_dropped

A process-safe counter for the number of masks dropped

Segmentation may drop invalid masks/events.

property write_queue_size

A process-safe counter for the number of chunks in the writer queue

A large number indicates a slow writer which can be a result of a slow hard disk or a slow CPU (since is used compression). Used for preventing OOM events by stalling data processing when the writer is slow

property slots

A list of all ChunkSlots

property state

State of the SlotRegister, used for communication with workers

  • “w”: initialized (workers work)

  • “p”: paused (all workers pause)

  • “q”: quit (all workers stop)

close()[source]
find_slot(state: str, chunk: int | None = None) dcnum.logic.chunk_slot.ChunkSlot | None[source]

Return the first ChunkSlot that has the given state

We sort the slots according to the slot chunks so that we always process the slot with the smallest slot chunk number first. Initially, the slot_chunks array is filled with zeros, but we populate it here.

Return None if no matching slot exists

get_counter_lock(name)[source]
get_time(method_name)[source]

Return accumulative time for the given method

reserve_slot_for_task(current_state: str, next_state: str, chunk_slot: dcnum.logic.chunk_slot.ChunkSlot | None = None, batch_size: int | None = None) StateWarden | None[source]

Return slot with the specified state and lowest chunk index

Parameters:
  • current_state – State requried for the task to start

  • next_state – State that will be set after the task is done

  • chunk_slot – Optional ChunkSlot to operate on; if set to None, search for a matching one, and if none can be found, return None

  • batch_size – Number of frames to reserve for performing the task. Defaults to the entire chunk.

Returns:

Context manager that enforces setting the next state or None if no ChunkSlot could be reserved. Usage:

if state_warden is not None:
with state_warden as (chunk_slot, batch_range):
perform_task(chunk_slot,

start_index=batch_range[0], stop_index=batch_range[1] )

The batch_range indices are defined by the batch_size parameter.

This context manager will automatically set the slot state to next_state when the context is exits without exceptions.

Return type:

state_warden

task_load_all(logger: logging.Logger | None = None) bool[source]

Load chunk data into memory for as many slots as possible

Returns:

did_something – Whether data were loaded into memory

Return type:

bool

task_label_masks(logger: logging.Logger | None = None) bool[source]

Perform labeling of mask images for a ChunkSlot

This method is process-safe. Multiple processes may call it concurrently, working on the same ChunkSlot.

Returns:

did_something – Whether masks where converted to labels

Return type:

bool

task_process_labels(logger: logging.Logger | None = None) bool[source]

Perform label processing (e.g. binary closing) for a ChunkSlot

This method is process-safe. Multiple processes may call it concurrently, working on the same ChunkSlot.

Returns:

did_something – Whether labels were processed

Return type:

bool

task_extract_features(logger: logging.Logger | None = None) bool[source]

Perform feature extraction for a ChunkSlot

This method is process-safe. Multiple processes may call it concurrently, working on the same ChunkSlot.

Returns:

did_something – Whether events were extracted

Return type:

bool

class dcnum.logic.UniversalWorkerProcess(*args, **kwargs)[source]

Bases: UniversalWorker, mp_spawn

Process objects represent activity that is run in a separate process

The class is analogous to threading.Thread

start()[source]

Start child process

class dcnum.logic.UniversalWorkerThread(*args, **kwargs)[source]

Bases: UniversalWorker, threading.Thread

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.