dcnum.logic =========== .. py:module:: dcnum.logic .. autoapi-nested-parse:: Logic for running the dcnum pipeline Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/dcnum/logic/chunk_slot/index /autoapi/dcnum/logic/chunk_slot_data/index /autoapi/dcnum/logic/ctrl/index /autoapi/dcnum/logic/job/index /autoapi/dcnum/logic/json_encoder/index /autoapi/dcnum/logic/slot_register/index /autoapi/dcnum/logic/universal_worker/index Classes ------- .. autoapisummary:: dcnum.logic.ChunkSlot dcnum.logic.DCNumJobRunner dcnum.logic.DCNumPipelineJob dcnum.logic.ExtendedJSONEncoder dcnum.logic.SlotRegister dcnum.logic.UniversalWorkerProcess dcnum.logic.UniversalWorkerThread Package Contents ---------------- .. py:class:: ChunkSlot(job, data, is_remainder=False) Bases: :py:obj:`dcnum.logic.chunk_slot_data.ChunkSlotData` .. py:attribute:: _instance_counter :value: 0 .. py:attribute:: index :value: 0 .. py:attribute:: job Job information object .. py:attribute:: data Input data object .. py:attribute:: is_remainder :value: False Whether this slot only applies to the last chunk .. py:attribute:: seg_cls Segmentation class .. py:method:: __repr__() .. py:method:: load(idx) Load chunk `idx` into `self.mp_image` and return numpy views .. py:class:: DCNumJobRunner(job: dcnum.logic.job.DCNumPipelineJob, tmp_suffix: str | None = None, *args, **kwargs) Bases: :py:obj:`threading.Thread` Run a pipeline as defined by a :class:`.job.DCNumPipelineJob` :param job: pipeline job to run :type job: .job.DCNumPipelineJob :param tmp_suffix: optional unique string for creating temporary files (defaults to hostname) :type tmp_suffix: str .. py:attribute:: error_tb :value: None .. py:attribute:: job .. py:attribute:: tmp_suffix :value: None .. py:attribute:: event_count :value: 0 .. py:attribute:: _data_raw :value: None .. py:attribute:: _data_temp_in :value: None .. py:attribute:: _state :value: 'init' .. py:attribute:: _progress_bg :value: None .. py:attribute:: _progress_ex :value: None .. py:attribute:: _progress_bn :value: None .. py:attribute:: _segm_rate :value: 0 .. py:attribute:: main_logger .. py:attribute:: path_log .. py:attribute:: _log_file_handler .. py:attribute:: log_queue .. py:attribute:: _qlisten .. py:attribute:: logger .. py:method:: __enter__() .. py:method:: __exit__(exc_type, exc_val, exc_tb) .. py:property:: draw :type: dcnum.read.HDF5Data Raw input data .. py:property:: dtin :type: dcnum.read.HDF5Data Input data with (corrected) background image .. py:property:: path_temp_in .. py:property:: path_temp_out .. py:property:: state .. py:method:: close(delete_temporary_files=True) .. py:method:: join(delete_temporary_files=True, *args, **kwargs) Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out. When the timeout argument is not present or None, the operation will block until the thread terminates. A thread can be join()ed many times. join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception. .. py:method:: get_status() .. py:method:: run() Method representing the thread's activity. You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively. .. py:method:: run_pipeline() Execute the pipeline job .. py:method:: task_background() Perform background computation task This populates the file `self.path_temp_in` with the 'image_bg' feature. .. py:method:: task_enforce_basin_strategy() Transfer basin data from input files to output if requested The user specified the "basin_strategy" keyword argument in `self.job`. If this is set to "drain", then copy all basin information from the input file to the output file. If it is set to "tap", then only create basins in the output file. .. py:method:: task_segment_extract() .. py:class:: DCNumPipelineJob(path_in: pathlib.Path | str, path_out: pathlib.Path | str | None = None, data_code: str = 'hdf', data_kwargs: dict | None = None, background_code: str = 'sparsemed', background_kwargs: dict | None = None, segmenter_code: str = 'thresh', segmenter_kwargs: dict | None = None, feature_code: str = 'legacy', feature_kwargs: dict | None = None, gate_code: str = 'norm', gate_kwargs: dict | None = None, basin_strategy: Literal['drain', 'tap'] = 'drain', compression: str = 'zstd-5', num_procs: int | None = None, log_level: int = logging.INFO, debug: bool = False) Pipeline job recipe :param path_in: input data path :type path_in: pathlib.Path | str :param path_out: output data path :type path_out: pathlib.Path | str :param data_code: identification code of input data reader to use :type data_code: str :param data_kwargs: keyword arguments for data reader :type data_kwargs: dict :param background_code: identification code of background data computation method :type background_code: str :param background_kwargs: keyword arguments for background data computation method :type background_kwargs: dict :param segmenter_code: identification code of segmenter to use :type segmenter_code: str :param segmenter_kwargs: keyword arguments for segmenter :type segmenter_kwargs: dict :param feature_code: identification code of feature extractor :type feature_code: str :param feature_kwargs: keyword arguments for feature extractor :type feature_kwargs: dict :param gate_code: identification code for gating/event filtering class :type gate_code: str :param gate_kwargs: keyword arguments for gating/event filtering class :type gate_kwargs: dict :param basin_strategy: strategy on how to handle event data; In principle, not all events have to be stored in the output file if basins are defined, linking back to the original file. - You can "drain" all basins which means that the output file will contain all features, but will also be very big. - You can "tap" the basins, including the input file, which means that the output file will be comparatively small. :type basin_strategy: str :param compression: compression algorithm to use; Set this to "none" to disable compression. Currently, only the Zstandard compression algorithm may be used, with the least compression "zstd-1" and the best compression "zstd-9". The default "zstd-5" is a trade-off. Set the compression to a higher number if the bottleneck is disk-IO. Set the compression to a lower number if the bottleneck is the CPU. Note that "zstd-5" is the accepted minimum compression setting for long-term data storage in the DC universe (enforced e.g. by DCOR-Aid). :type compression: str :param num_procs: Number of processes to use :type num_procs: int :param log_level: Logging level to use. :type log_level: int :param debug: Whether to set logging level to "DEBUG" and use threads instead of processes :type debug: bool .. py:attribute:: kwargs initialize keyword arguments for this job .. py:method:: __getitem__(item) .. py:method:: __getstate__() .. py:method:: __setstate__(state) .. py:method:: assert_pp_codes() Sanity check of `self.kwargs` .. py:method:: get_hdf5_dataset_kwargs() -> dict Validate and return output HDF5 Dataset keyword arguments .. py:method:: get_ppid(ret_hash=False, ret_dict=False) .. py:method:: get_segmenter_class() Return the class of the segmenter associated with this job .. py:method:: validate() Make sure the pipeline will run given the job kwargs :returns: for testing convenience :rtype: True :raises dcnum.segm.SegmenterNotApplicableError:: the segmenter is incompatible with the input path .. py:class:: ExtendedJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None) Bases: :py:obj:`json.JSONEncoder` Constructor for JSONEncoder, with sensible defaults. If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped. If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters. If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place. If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats. If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis. If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if *indent* is ``None`` and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace. If specified, default is a function that gets called for objects that can't otherwise be serialized. It should return a JSON encodable version of the object or raise a ``TypeError``. .. py:method:: default(o) Extended JSON encoder for the **dcnum** logic This JSON encoder can handle the following additional objects: - ``pathlib.Path`` - integer numbers - ``numpy`` boolean - slices (via "PYTHON-SLICE" identifier) .. py:class:: SlotRegister(job: dcnum.logic.job.DCNumPipelineJob, data: dcnum.read.HDF5Data, event_queue: multiprocessing.Queue | None = None, num_slots: int = 3) A register for `ChunkSlot`s for shared memory access The `SlotRegister` manages all `ChunkSlot` instances and implements methods to interact with individual `ChunkSlot`s. .. py:attribute:: job .. py:attribute:: data .. py:attribute:: event_queue .. py:attribute:: chunk_size .. py:attribute:: num_chunks .. py:attribute:: _slots :value: [] .. py:attribute:: timers .. py:attribute:: counters .. py:attribute:: _state .. py:attribute:: num_frames Total number of frames to process .. py:attribute:: feat_nevents Number of events per frame Shared RawArray of length `len(data)` into which the number of events per frame is written. .. py:method:: __getitem__(idx) .. py:method:: __iter__() Iterate over slots, sorted by current chunk number .. py:method:: __len__() .. py:property:: chunks_loaded A process-safe counter for the number of chunks loaded This number increments as `SlotRegister.task_load_all` is called. .. py:property:: masks_dropped A process-safe counter for the number of masks dropped Segmentation may drop invalid masks/events. .. py:property:: write_queue_size A process-safe counter for the number of chunks in the writer queue A large number indicates a slow writer which can be a result of a slow hard disk or a slow CPU (since is used compression). Used for preventing OOM events by stalling data processing when the writer is slow .. py:property:: slots A list of all `ChunkSlots` .. py:property:: state State of the `SlotRegister`, used for communication with workers - "w": initialized (workers work) - "p": paused (all workers pause) - "q": quit (all workers stop) .. py:method:: close() .. py:method:: find_slot(state: str, chunk: int | None = None) -> dcnum.logic.chunk_slot.ChunkSlot | None Return the first `ChunkSlot` that has the given state We sort the slots according to the slot chunks so that we always process the slot with the smallest slot chunk number first. Initially, the slot_chunks array is filled with zeros, but we populate it here. Return None if no matching slot exists .. py:method:: get_counter_lock(name) .. py:method:: get_time(method_name) Return accumulative time for the given method .. py:method:: reserve_slot_for_task(current_state: str, next_state: str, chunk_slot: dcnum.logic.chunk_slot.ChunkSlot | None = None, batch_size: int | None = None) -> StateWarden | None Return slot with the specified state and lowest chunk index :param current_state: State requried for the task to start :param next_state: State that will be set after the task is done :param chunk_slot: Optional `ChunkSlot` to operate on; if set to None, search for a matching one, and if none can be found, return None :param batch_size: Number of frames to reserve for performing the task. Defaults to the entire chunk. :returns: Context manager that enforces setting the next state or None if no `ChunkSlot` could be reserved. Usage: if state_warden is not None: with state_warden as (chunk_slot, batch_range): perform_task(chunk_slot, start_index=batch_range[0], stop_index=batch_range[1] ) The `batch_range` indices are defined by the `batch_size` parameter. This context manager will automatically set the slot state to `next_state` when the context is exits without exceptions. :rtype: state_warden .. py:method:: task_load_all(logger: logging.Logger | None = None) -> bool Load chunk data into memory for as many slots as possible :returns: **did_something** -- Whether data were loaded into memory :rtype: bool .. py:method:: task_label_masks(logger: logging.Logger | None = None) -> bool Perform labeling of mask images for a `ChunkSlot` This method is process-safe. Multiple processes may call it concurrently, working on the same `ChunkSlot`. :returns: **did_something** -- Whether masks where converted to labels :rtype: bool .. py:method:: task_process_labels(logger: logging.Logger | None = None) -> bool Perform label processing (e.g. binary closing) for a `ChunkSlot` This method is process-safe. Multiple processes may call it concurrently, working on the same `ChunkSlot`. :returns: **did_something** -- Whether labels were processed :rtype: bool .. py:method:: task_extract_features(logger: logging.Logger | None = None) -> bool Perform feature extraction for a `ChunkSlot` This method is process-safe. Multiple processes may call it concurrently, working on the same `ChunkSlot`. :returns: **did_something** -- Whether events were extracted :rtype: bool .. py:class:: UniversalWorkerProcess(*args, **kwargs) Bases: :py:obj:`UniversalWorker`, :py:obj:`mp_spawn` Process objects represent activity that is run in a separate process The class is analogous to `threading.Thread` .. py:method:: start() Start child process .. py:class:: UniversalWorkerThread(*args, **kwargs) Bases: :py:obj:`UniversalWorker`, :py:obj:`threading.Thread` This constructor should always be called with keyword arguments. Arguments are: *group* should be None; reserved for future extension when a ThreadGroup class is implemented. *target* is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called. *name* is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number. *args* is a list or tuple of arguments for the target invocation. Defaults to (). *kwargs* is a dictionary of keyword arguments for the target invocation. Defaults to {}. If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.