dcnum.logic
Logic for running the dcnum pipeline
Submodules
Classes
Run a pipeline as defined by a |
|
Pipeline job recipe |
|
Constructor for JSONEncoder, with sensible defaults. |
|
A register for `ChunkSlot`s for shared memory access |
|
Process objects represent activity that is run in a separate process |
|
This constructor should always be called with keyword arguments. Arguments are: |
Package Contents
- class dcnum.logic.ChunkSlot(job, data, is_remainder=False)[source]
Bases:
dcnum.logic.chunk_slot_data.ChunkSlotData- _instance_counter = 0
- index = 0
- job
Job information object
- data
Input data object
- is_remainder = False
Whether this slot only applies to the last chunk
- seg_cls
Segmentation class
- class dcnum.logic.DCNumJobRunner(job: dcnum.logic.job.DCNumPipelineJob, tmp_suffix: str | None = None, *args, **kwargs)[source]
Bases:
threading.ThreadRun a pipeline as defined by a
job.DCNumPipelineJob- Parameters:
job (.job.DCNumPipelineJob) – pipeline job to run
tmp_suffix (str) – optional unique string for creating temporary files (defaults to hostname)
- error_tb = None
- job
- tmp_suffix = None
- event_count = 0
- _data_raw = None
- _data_temp_in = None
- _state = 'init'
- _progress_bg = None
- _progress_ex = None
- _progress_bn = None
- _segm_rate = 0
- main_logger
- path_log
- _log_file_handler
- log_queue
- _qlisten
- logger
- property draw: dcnum.read.HDF5Data
Raw input data
- property dtin: dcnum.read.HDF5Data
Input data with (corrected) background image
- property path_temp_in
- property path_temp_out
- property state
- join(delete_temporary_files=True, *args, **kwargs)[source]
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- run()[source]
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- task_background()[source]
Perform background computation task
This populates the file self.path_temp_in with the ‘image_bg’ feature.
- task_enforce_basin_strategy()[source]
Transfer basin data from input files to output if requested
The user specified the “basin_strategy” keyword argument in self.job. If this is set to “drain”, then copy all basin information from the input file to the output file. If it is set to “tap”, then only create basins in the output file.
- class dcnum.logic.DCNumPipelineJob(path_in: pathlib.Path | str, path_out: pathlib.Path | str | None = None, data_code: str = 'hdf', data_kwargs: dict | None = None, background_code: str = 'sparsemed', background_kwargs: dict | None = None, segmenter_code: str = 'thresh', segmenter_kwargs: dict | None = None, feature_code: str = 'legacy', feature_kwargs: dict | None = None, gate_code: str = 'norm', gate_kwargs: dict | None = None, basin_strategy: Literal['drain', 'tap'] = 'drain', compression: str = 'zstd-5', num_procs: int | None = None, log_level: int = logging.INFO, debug: bool = False)[source]
Pipeline job recipe
- Parameters:
path_in (pathlib.Path | str) – input data path
path_out (pathlib.Path | str) – output data path
data_code (str) – identification code of input data reader to use
data_kwargs (dict) – keyword arguments for data reader
background_code (str) – identification code of background data computation method
background_kwargs (dict) – keyword arguments for background data computation method
segmenter_code (str) – identification code of segmenter to use
segmenter_kwargs (dict) – keyword arguments for segmenter
feature_code (str) – identification code of feature extractor
feature_kwargs (dict) – keyword arguments for feature extractor
gate_code (str) – identification code for gating/event filtering class
gate_kwargs (dict) – keyword arguments for gating/event filtering class
basin_strategy (str) –
strategy on how to handle event data; In principle, not all events have to be stored in the output file if basins are defined, linking back to the original file.
You can “drain” all basins which means that the output file will contain all features, but will also be very big.
You can “tap” the basins, including the input file, which means that the output file will be comparatively small.
compression (str) – compression algorithm to use; Set this to “none” to disable compression. Currently, only the Zstandard compression algorithm may be used, with the least compression “zstd-1” and the best compression “zstd-9”. The default “zstd-5” is a trade-off. Set the compression to a higher number if the bottleneck is disk-IO. Set the compression to a lower number if the bottleneck is the CPU. Note that “zstd-5” is the accepted minimum compression setting for long-term data storage in the DC universe (enforced e.g. by DCOR-Aid).
num_procs (int) – Number of processes to use
log_level (int) – Logging level to use.
debug (bool) – Whether to set logging level to “DEBUG” and use threads instead of processes
- kwargs
initialize keyword arguments for this job
- class dcnum.logic.ExtendedJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]
Bases:
json.JSONEncoderConstructor for JSONEncoder, with sensible defaults.
If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.
If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.
If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.
If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.
If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.
If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.
If specified, separators should be an (item_separator, key_separator) tuple. The default is (’, ‘, ‘: ‘) if indent is
Noneand (‘,’, ‘: ‘) otherwise. To get the most compact JSON representation, you should specify (‘,’, ‘:’) to eliminate whitespace.If specified, default is a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a
TypeError.
- class dcnum.logic.SlotRegister(job: dcnum.logic.job.DCNumPipelineJob, data: dcnum.read.HDF5Data, event_queue: multiprocessing.Queue | None = None, num_slots: int = 3)[source]
A register for `ChunkSlot`s for shared memory access
The SlotRegister manages all ChunkSlot instances and implements methods to interact with individual `ChunkSlot`s.
- job
- data
- event_queue
- chunk_size
- num_chunks
- _slots = []
- timers
- counters
- _state
- num_frames
Total number of frames to process
- feat_nevents
Number of events per frame Shared RawArray of length len(data) into which the number of events per frame is written.
- property chunks_loaded
A process-safe counter for the number of chunks loaded
This number increments as SlotRegister.task_load_all is called.
- property masks_dropped
A process-safe counter for the number of masks dropped
Segmentation may drop invalid masks/events.
- property write_queue_size
A process-safe counter for the number of chunks in the writer queue
A large number indicates a slow writer which can be a result of a slow hard disk or a slow CPU (since is used compression). Used for preventing OOM events by stalling data processing when the writer is slow
- property slots
A list of all ChunkSlots
- property state
State of the SlotRegister, used for communication with workers
“w”: initialized (workers work)
“p”: paused (all workers pause)
“q”: quit (all workers stop)
- find_slot(state: str, chunk: int | None = None) dcnum.logic.chunk_slot.ChunkSlot | None[source]
Return the first ChunkSlot that has the given state
We sort the slots according to the slot chunks so that we always process the slot with the smallest slot chunk number first. Initially, the slot_chunks array is filled with zeros, but we populate it here.
Return None if no matching slot exists
- reserve_slot_for_task(current_state: str, next_state: str, chunk_slot: dcnum.logic.chunk_slot.ChunkSlot | None = None, batch_size: int | None = None) StateWarden | None[source]
Return slot with the specified state and lowest chunk index
- Parameters:
current_state – State requried for the task to start
next_state – State that will be set after the task is done
chunk_slot – Optional ChunkSlot to operate on; if set to None, search for a matching one, and if none can be found, return None
batch_size – Number of frames to reserve for performing the task. Defaults to the entire chunk.
- Returns:
Context manager that enforces setting the next state or None if no ChunkSlot could be reserved. Usage:
- if state_warden is not None:
- with state_warden as (chunk_slot, batch_range):
- perform_task(chunk_slot,
start_index=batch_range[0], stop_index=batch_range[1] )
The batch_range indices are defined by the batch_size parameter.
This context manager will automatically set the slot state to next_state when the context is exits without exceptions.
- Return type:
state_warden
- task_load_all(logger: logging.Logger | None = None) bool[source]
Load chunk data into memory for as many slots as possible
- Returns:
did_something – Whether data were loaded into memory
- Return type:
bool
- task_label_masks(logger: logging.Logger | None = None) bool[source]
Perform labeling of mask images for a ChunkSlot
This method is process-safe. Multiple processes may call it concurrently, working on the same ChunkSlot.
- Returns:
did_something – Whether masks where converted to labels
- Return type:
bool
- task_process_labels(logger: logging.Logger | None = None) bool[source]
Perform label processing (e.g. binary closing) for a ChunkSlot
This method is process-safe. Multiple processes may call it concurrently, working on the same ChunkSlot.
- Returns:
did_something – Whether labels were processed
- Return type:
bool
- class dcnum.logic.UniversalWorkerProcess(*args, **kwargs)[source]
Bases:
UniversalWorker,mp_spawnProcess objects represent activity that is run in a separate process
The class is analogous to threading.Thread
- class dcnum.logic.UniversalWorkerThread(*args, **kwargs)[source]
Bases:
UniversalWorker,threading.ThreadThis constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.