Source code for dcnum.write.queue_writer_process

import traceback
from logging.handlers import QueueHandler
import multiprocessing as mp

from .queue_writer_base import QueueWriterBase

mp_spawn = mp.get_context('spawn')


[docs] class QueueWriterProcess(QueueWriterBase, mp_spawn.Process): def __init__(self, log_queue: mp.Queue, *args, **kwargs): # Since we are running in a process, we cannot initialize the logger # during init. We must initialize the logger in `run`. self.log_queue = log_queue """queue for logging""" super(QueueWriterProcess, self).__init__( name="QueueWriterProcess", *args, **kwargs)
[docs] def run(self, **kwargs): # Clear any handlers that might be set for this logger. This is # important for the case when we are an instance of # EventExtractorThread, because then all handlers from the main # thread are inherited (as opposed to no handlers in the case # of EventExtractorProcess). logger = self.get_logger() logger.handlers.clear() queue_handler = QueueHandler(self.log_queue) queue_handler.setLevel(self.log_level) logger.addHandler(queue_handler) logger.info("Ready") # some checks to avoid confusion for kw, val in kwargs.items(): if val is not None: logger.warning(f"Specifying `{kw}` in " f"`QueueWriterProcess.run` has no effect.") try: super(QueueWriterProcess, self).run(logger=logger) except BaseException: self.logger.error(traceback.format_exc()) # Make sure everything gets written to the queue. queue_handler.flush() queue_handler.close() # Close the logging queue. self.log_queue.close() self.log_queue.join_thread()