Source code for secsgem.common.protocol_dispatcher

#####################################################################
# protocol_dispatcher.py
#
# (c) Copyright 2023, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Thread that can be triggered and call a callback."""

from __future__ import annotations

import logging
import queue
import threading
import typing

if typing.TYPE_CHECKING:
    from .message import Block
    from .settings import Settings


[docs]class ProtocolDispatcher: # pylint: disable=too-many-instance-attributes """Thread that calls a target function when a trigger was raised.""" def __init__( self, receiver_target: typing.Callable, dispatcher_target: typing.Callable, settings: Settings, ) -> None: """Initialize thread object. Args: receiver_target: function to call when receiver triggered dispatcher_target: function to call when message available for dispatch settings: communication/protocol settings """ self._receiver_target = receiver_target self._dispatcher_target = dispatcher_target self._settings = settings self._receiver_thread: threading.Thread | None = None self._dispatcher_thread: threading.Thread | None = None self._receiver_thread_trigger = threading.Event() self._dispatcher_thread_trigger = threading.Event() self._dispatch_queue: queue.Queue[tuple[object, Block]] = queue.Queue() self._stop_receiver_thread = False self._stop_dispatcher_thread = False
[docs] def start(self): """Start the thread.""" self._stop_receiver_thread = False self._stop_dispatcher_thread = False self._receiver_thread = threading.Thread( target=self._receiver_thread_function, args=(), name=self._settings.generate_thread_name("protocol_receiver"), daemon=True, ) self._dispatcher_thread = threading.Thread( target=self._dispatcher_thread_function, args=(), name=self._settings.generate_thread_name("protocol_dispatcher"), daemon=True, ) self._receiver_thread.start() self._dispatcher_thread.start()
[docs] def stop(self): """Stop the thread.""" if not self._receiver_thread.is_alive(): return self._stop_receiver_thread = True self._receiver_thread_trigger.set() self._receiver_thread.join()
[docs] def trigger_receiver(self): """Trigger the thread to call target function.""" self._receiver_thread_trigger.set()
[docs] def queue_block(self, source: object, block: Block): """Add a block to the dispatch queue and trigger dispatch thread. Args: source: source of the block block: new block """ self._dispatch_queue.put((source, block)) self._dispatcher_thread_trigger.set()
def _receiver_thread_function(self): while not self._stop_receiver_thread: self._receiver_thread_trigger.wait() self._receiver_thread_trigger.clear() if self._stop_receiver_thread: continue try: self._receiver_target() except Exception as exc: # pylint: disable=broad-exception-caught logging.warning("Exception in receiver callback, ignoring", exc_info=exc) self._stop_receiver_thread = False def _dispatcher_thread_function(self): while not self._stop_dispatcher_thread: self._dispatcher_thread_trigger.wait() self._dispatcher_thread_trigger.clear() if self._stop_dispatcher_thread: continue while self._dispatch_queue.qsize() > 0: data = self._dispatch_queue.get() try: self._dispatcher_target(*data) except Exception as exc: # pylint: disable=broad-exception-caught logging.warning("Exception in dispatcher callback, ignoring", exc_info=exc) self._stop_dispatcher_thread = False