Source code for secsgem.common.tcp_server_connection

#####################################################################
# tcp_server_connection.py
#
# (c) Copyright 2021-2023, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""TCP server connection."""

from __future__ import annotations

import select
import socket
import threading
import time
import typing

from .helpers import is_windows
from .tcp_connection import TcpConnection

if typing.TYPE_CHECKING:
    from .settings import Settings


[docs]class TcpServerConnection(TcpConnection): """Server class for TCP server connection. Creates a listening socket and waits for one incoming connection on this socket. After the connection is established the listening socket is closed. """ def __init__(self, settings: Settings): """Initialize a TCP server connection. Args: settings: protocol and communication settings """ # initialize super class TcpConnection.__init__(self, settings) # initially not enabled self._enabled = False # reconnect thread required for server self._server_thread = None self._stop_server_thread = False self._server_sock = None self.on_disconnected.register(self._disconnected) def _disconnected(self, _: dict[str, typing.Any]): """Called when the connection was disconnected. This is required to initiate the reconnect if the connection is still enabled """ if self._enabled: self.__start_server_thread()
[docs] def enable(self): """Enable the connection. Starts the connection server process. """ # only start if not already enabled if not self._enabled: # mark connection as enabled self._enabled = True # start the connection thread self.__start_server_thread()
[docs] def disable(self): """Disable the connection. Stops all connection attempts, and closes the connection """ # only stop if enabled if self._enabled: # mark connection as disabled self._enabled = False # stop connection thread if it is running if self._server_thread and self._server_thread.is_alive(): self._stop_server_thread = True if self._server_sock: self._server_sock.close() # wait for connection thread to stop while self._stop_server_thread: time.sleep(0.2) # disconnect super class self.disconnect()
def __start_server_thread(self): self._server_thread = threading.Thread( target=self.__server_thread, name=f"secsgem_tcpServerConnection_serverThread_{self._settings.address}", ) self._server_thread.start() def __server_thread(self): """Thread function to wait for incoming tcp connections. .. warning:: Do not call this directly, for internal use only. """ self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if not is_windows(): self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._server_sock.bind((self._settings.address, self._settings.port)) self._server_sock.listen(1) while not self._stop_server_thread: try: select_result = select.select([self._server_sock], [], [], self.select_timeout) except Exception as exc: # pylint: disable=broad-except self._logger.debug("select exception", exc_info=exc) if not select_result[0]: # select timed out continue accept_result = self._server_sock.accept() if accept_result is None: continue (self._sock, (_, _)) = accept_result # setup socket self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # make socket nonblocking self._socket.setblocking(0) # mark connection as connected self._connected = True # start the receiver thread self._start_receiver() # send event try: self.on_connected({"source": self}) except Exception: # pylint: disable=broad-except self._logger.exception("ignoring exception for on_connection_established handler") self._server_sock.shutdown(socket.SHUT_RDWR) self._server_sock.close() return self._stop_server_thread = False