#####################################################################
# connections.py
#
# (c) Copyright 2013-2015, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Contains objects and functions to create and handle hsms connection."""
import logging
import socket
import select
import struct
import time
import threading
import errno
from secsgem.common import is_windows
from packets import HsmsPacket
# TODO: timeouts (T7, T8)
"""Names for hsms header SType"""
hsmsSTypes = {
1: "Select.req",
2: "Select.rsp",
3: "Deselect.req",
4: "Deselect.rsp",
5: "Linktest.req",
6: "Linktest.rsp",
7: "Reject.req",
9: "Separate.req"
}
def is_errorcode_ewouldblock(errorcode):
if errorcode == errno.EAGAIN or errorcode == errno.EWOULDBLOCK:
return True
return False
[docs]class HsmsConnection(object):
"""Connection class used for active and passive hsms connections.
:param active: Is the connection active (*True*) or passive (*False*)
:type active: boolean
:param address: IP address of remote host
:type address: string
:param port: TCP port of remote host
:type port: integer
:param session_id: session / device ID to use for connection
:type session_id: integer
:param delegate: target for messages
:type delegate: inherited from :class:`secsgem.hsms.handler.HsmsHandler`
"""
selectTimeout = 0.5
""" Timeout for select calls """
sendBlockSize = 1024 * 1024
""" Block size for outbound data """
T3 = 45.0
""" Reply Timeout """
T5 = 10.0
""" Connect Separation Time """
T6 = 5.0
""" Control Transaction Timeout """
def __init__(self, active, address, port, session_id=0, delegate=None):
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
# set parameters
self.active = active
self.remoteAddress = address
self.remotePort = port
self.sessionID = session_id
self.delegate = delegate
# connection socket
self.sock = None
# buffer for received data
self.receiveBuffer = ""
# system id counter
self.systemCounter = 1
# receiving thread flags
self.threadRunning = False
self.stopThread = False
# connected flag
self.connected = False
# flag set during disconnection
self.disconnecting = False
def _serialize_data(self):
"""Returns data for serialization
:returns: data to serialize for this object
:rtype: dict
"""
return {'active': self.active, 'remoteAddress': self.remoteAddress, 'remotePort': self.remotePort, 'sessionID': self.sessionID, 'systemCounter': self.systemCounter, 'connected': self.connected}
def __str__(self):
return ("Active" if self.active else "Passive") + " connection to " + self.remoteAddress + ":" + str(self.remotePort) + " sessionID=" + str(self.sessionID)
def _start_receiver(self):
"""Start the thread for receiving and handling incoming messages. Will also do the initial Select and Linktest requests
.. warning:: Do not call this directly, will be called from HSMS client/server class.
.. seealso:: :class:`secsgem.hsms.connections.HsmsActiveConnection`, :class:`secsgem.hsms.connections.HsmsPassiveConnection`, :class:`secsgem.hsms.connections.HsmsMultiPassiveConnection`
"""
# mark connection as connected
self.connected = True
if self.delegate and hasattr(self.delegate, 'on_connection_established') and callable(getattr(self.delegate, 'on_connection_established')):
self.delegate.on_connection_established(self)
# start data receiving thread
threading.Thread(target=self.__receiver_thread, args=(), name="secsgem_hsmsConnection_receiver_{}:{}".format(self.remoteAddress, self.remotePort)).start()
# wait until thread is running
while not self.threadRunning:
pass
# send event
def _on_hsms_connection_close(self, data):
pass
[docs] def disconnect(self):
"""Close connection"""
# return if thread isn't running
if not self.threadRunning:
return
# set disconnecting flag to avoid another select
self.disconnecting = True
# set flag to stop the thread
self.stopThread = True
# wait until thread stopped
while self.threadRunning:
pass
# clear disconnecting flag, no selects coming any more
self.disconnecting = False
[docs] def send_packet(self, packet):
"""Send the ASCII coded packet to the remote host
:param packet: encoded data to be transmitted
:type packet: string / byte array
"""
self.logger.info("> %s", packet)
# encode the packet
data = packet.encode()
# split data into blocks
blocks = [data[i: i + self.sendBlockSize] for i in range(0, len(data), self.sendBlockSize)]
for block in blocks:
retry = True
# not sent yet, retry
while retry:
# wait until socket is writable
while not select.select([], [self.sock], [], self.selectTimeout)[1]:
pass
try:
# send packet
self.sock.send(block)
# retry will be cleared if send succeeded
retry = False
except socket.error, e:
errorcode = e[0]
if not is_errorcode_ewouldblock(errorcode):
# raise if not EWOULDBLOCK
raise e
# it is EWOULDBLOCK, so retry sending
def _process_receive_buffer(self):
"""Parse the receive buffer and dispatch callbacks.
.. warning:: Do not call this directly, will be called from :func:`secsgem.hsmsConnections.hsmsConnection.__receiver_thread` method.
"""
# check if enough data in input buffer
if len(self.receiveBuffer) < 4:
return False
# unpack length from input buffer
length = struct.unpack(">L", self.receiveBuffer[0:4])[0] + 4
# check if enough data in input buffer
if len(self.receiveBuffer) < length:
return False
# extract and remove packet from input buffer
data = self.receiveBuffer[0:length]
self.receiveBuffer = self.receiveBuffer[length:]
# decode received packet
response = HsmsPacket.decode(data)
# redirect packet to hsms handler
if self.delegate and hasattr(self.delegate, 'on_connection_packet_received') and callable(getattr(self.delegate, 'on_connection_packet_received')):
self.delegate.on_connection_packet_received(self, response)
# return True if more data is available
if len(self.receiveBuffer) > 0:
return True
return False
def __receiver_thread(self):
"""Thread for receiving incoming data and adding it to the receive buffer.
.. warning:: Do not call this directly, will be called from :func:`secsgem.hsmsConnections.hsmsConnection._startReceiver` method.
"""
self.threadRunning = True
try:
# check if shutdown requested
while not self.stopThread:
# check if data available
select_result = select.select([self.sock], [], [self.sock], self.selectTimeout)
# check if disconnection was started
if self.disconnecting:
time.sleep(0.2)
continue
if select_result[0]:
try:
# get data from socket
recv_data = self.sock.recv(1024)
# check if socket was closed
if len(recv_data) == 0:
self.connected = False
self.stopThread = True
continue
# add received data to input buffer
self.receiveBuffer += recv_data
except socket.error, e:
errorcode = e[0]
if not is_errorcode_ewouldblock(errorcode):
raise e
# handle data in input buffer
while self._process_receive_buffer():
pass
except Exception, e:
self.logger.error('exception {0}'.format(e), exc_info=True)
# notify listeners of disconnection
if self.delegate and hasattr(self.delegate, 'on_connection_before_closed') and callable(getattr(self.delegate, 'on_connection_before_closed')):
self.delegate.on_connection_before_closed(self)
# close the socket
self.sock.close()
# notify listeners of disconnection
if self.delegate and hasattr(self.delegate, 'on_connection_closed') and callable(getattr(self.delegate, 'on_connection_closed')):
self.delegate.on_connection_closed(self)
# reset all flags
self.connected = False
self.threadRunning = False
self.stopThread = False
# clear receive buffer
self.receiveBuffer = ""
# notify inherited classes of disconnection
self._on_hsms_connection_close({'connection': self})
[docs] def get_next_system_counter(self):
"""Returns the next System.
:returns: System for the next command
:rtype: integer
"""
self.systemCounter += 1
return self.systemCounter
[docs]class HsmsPassiveConnection(HsmsConnection):
"""Server class for single passive (incoming) connection
Creates a listening socket and waits for one incoming connection on this socket. After the connection is established the listening socket is closed.
:param address: IP address of target host
:type address: string
:param port: TCP port of target host
:type port: integer
:param session_id: session / device ID to use for connection
:type session_id: integer
:param delegate: target for messages
:type delegate: object
**Example**::
# TODO: create example
"""
def __init__(self, address, port=5000, session_id=0, delegate=None):
# initialize super class
HsmsConnection.__init__(self, True, address, port, session_id, delegate)
# initially not enabled
self.enabled = False
# reconnect thread required for passive connection
self.serverThread = None
self.stopServerThread = False
def _on_hsms_connection_close(self, data):
"""Signal from super that the connection was closed
This is required to initiate the reconnect if the connection is still enabled
"""
if self.enabled:
self.__start_server_thread()
[docs] def enable(self):
"""Enable the connection.
Starts the connection process to the passive remote.
"""
# only start if not already enabled
if not self.enabled:
# mark connection as enabled
self.enabled = True
# start the connection thread
self.__start_server_thread()
[docs] def disable(self):
"""Disable the connection.
Stops all connection attempts, and closes the connection
"""
# only stop if enabled
if self.enabled:
# mark connection as disabled
self.enabled = False
# stop connection thread if it is running
if self.serverThread and self.serverThread.isAlive():
self.stopServerThread = True
# wait for connection thread to stop
while self.stopServerThread:
time.sleep(0.2)
# disconnect super class
self.disconnect()
def __start_server_thread(self):
self.serverThread = threading.Thread(target=self.__server_thread, name="secsgem_HsmsPassiveConnection_serverThread_{}".format(self.remoteAddress))
self.serverThread.start()
def __server_thread(self):
"""Thread function to (re)connect active connection to remote host.
.. warning:: Do not call this directly, for internal use only.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not is_windows():
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', self.remotePort))
sock.listen(1)
while True:
accept_result = sock.accept()
if accept_result is None:
continue
(self.sock, (_, _)) = accept_result
# setup socket
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# make socket nonblocking
self.sock.setblocking(0)
# start the receiver thread
self._start_receiver()
sock.close()
return
[docs]class HsmsMultiPassiveConnection(HsmsConnection):
"""Connection class for single connection from :class:`secsgem.hsms.connections.HsmsMultiPassiveServer`
Handles connections incoming connection from :class:`secsgem.hsms.connections.HsmsMultiPassiveServer`
:param address: IP address of target host
:type address: string
:param port: TCP port of target host
:type port: integer
:param session_id: session / device ID to use for connection
:type session_id: integer
:param delegate: target for messages
:type delegate: object
**Example**::
# TODO: create example
"""
def __init__(self, address, port=5000, session_id=0, delegate=None):
# initialize super class
HsmsConnection.__init__(self, True, address, port, session_id, delegate)
# initially not enabled
self.enabled = False
[docs] def on_connected(self, sock, address):
"""Connected callback for :class:`secsgem.hsms.connections.HsmsMultiPassiveServer`
:param sock: Socket for new connection
:type sock: :class:`Socket`
:param address: IP address of remote host
:type address: string
"""
# setup socket
self.sock = sock
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
_ = address
# make socket nonblocking
self.sock.setblocking(0)
# start the receiver thread
self._start_receiver()
[docs] def enable(self):
"""Enable the connection.
Starts the connection process to the passive remote.
"""
self.enabled = True
[docs] def disable(self):
"""Disable the connection.
Stops all connection attempts, and closes the connection
"""
self.enabled = False
if self.connected:
self.disconnect()
[docs]class HsmsMultiPassiveServer(object):
"""Server class for multiple passive (incoming) connection. The server creates a listening socket and waits for incoming connections on this socket.
:param port: TCP port to listen on
:type port: integer
**Example**::
# TODO: create example
"""
selectTimeout = 0.5
""" Timeout for select calls """
def __init__(self, port=5000):
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
self.listenSock = None
self.port = port
self.threadRunning = False
self.stopThread = False
self.connections = {}
self.listenThread = None
[docs] def create_connection(self, address, port=5000, session_id=0, delegate=None):
""" Create and remember connection for the server
:param address: IP address of target host
:type address: string
:param port: TCP port of target host
:type port: integer
:param session_id: session / device ID to use for connection
:type session_id: integer
:param delegate: target for messages
:type delegate: object
"""
connection = HsmsMultiPassiveConnection(address, port, session_id, delegate)
connection.handler = self
self.connections[address] = connection
return connection
[docs] def start(self):
"""Starts the server and returns. It will launch a listener running in background to wait for incoming connections."""
self.listenSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not is_windows():
self.listenSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listenSock.bind(('', self.port))
self.listenSock.listen(1)
self.listenSock.setblocking(0)
self.listenThread = threading.Thread(target=self._listen_thread, args=(), name="secsgem_hsmsMultiPassiveServer_listenThread_{}".format(self.port))
self.listenThread.start()
self.logger.debug("listening")
[docs] def stop(self, terminate_connections=True):
"""Stops the server. The background job waiting for incoming connections will be terminated. Optionally all connections received will be closed.
:param terminate_connections: terminate all connection made by this server
:type terminate_connections: boolean
"""
self.stopThread = True
if self.listenThread.isAlive:
while self.threadRunning:
pass
self.listenSock.close()
self.stopThread = False
if terminate_connections:
for address in self.connections:
connection = self.connections[address]
connection.disconnect()
self.logger.debug("server stopped")
def _initialize_connection_thread(self, accept_result):
"""Setup connection
.. warning:: Do not call this directly, used internally.
"""
(sock, (source_ip, source_port)) = accept_result
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if source_ip not in self.connections:
sock.close()
return
connection = self.connections[source_ip]
if not connection.enabled:
sock.close()
return
connection.on_connected(sock, source_ip)
def _listen_thread(self):
"""Thread listening for incoming connections
.. warning:: Do not call this directly, used internally.
"""
self.threadRunning = True
try:
while not self.stopThread:
# check for data in the input buffer
select_result = select.select([self.listenSock], [], [self.listenSock], self.selectTimeout)
if select_result[0]:
accept_result = None
try:
accept_result = self.listenSock.accept()
except socket.error, e:
errorcode = e[0]
if not is_errorcode_ewouldblock(errorcode):
raise e
if accept_result is None:
continue
if self.stopThread:
continue
self.logger.debug("connection from %s:%d", accept_result[1][0], accept_result[1][1])
threading.Thread(target=self._initialize_connection_thread, args=(accept_result,), name="secsgem_hsmsMultiPassiveServer_InitializeConnectionThread_{}:{}".format(accept_result[1][0], accept_result[1][1])).start()
except Exception, e:
self.logger.error('exception {0}'.format(e), exc_info=True)
self.threadRunning = False
[docs]class HsmsActiveConnection(HsmsConnection):
"""Client class for single active (outgoing) connection
:param address: IP address of target host
:type address: string
:param port: TCP port of target host
:type port: integer
:param session_id: session / device ID to use for connection
:type session_id: integer
:param delegate: target for messages
:type delegate: object
**Example**::
# TODO: create example
"""
def __init__(self, address, port=5000, session_id=0, delegate=None):
# initialize super class
HsmsConnection.__init__(self, True, address, port, session_id, delegate)
# initially not enabled
self.enabled = False
# reconnect thread required for active connection
self.connectionThread = None
self.stopConnectionThread = False
# flag if this is the first connection since enable
self.firstConnection = True
def _on_hsms_connection_close(self, data):
"""Signal from super that the connection was closed
This is required to initiate the reconnect if the connection is still enabled
"""
if self.enabled:
self.__start_connect_thread()
[docs] def enable(self):
"""Enable the connection.
Starts the connection process to the passive remote.
"""
# only start if not already enabled
if not self.enabled:
# reset first connection to eliminate reconnection timeout
self.firstConnection = True
# mark connection as enabled
self.enabled = True
# start the connection thread
self.__start_connect_thread()
[docs] def disable(self):
"""Disable the connection.
Stops all connection attempts, and closes the connection
"""
# only stop if enabled
if self.enabled:
# mark connection as disabled
self.enabled = False
# stop connection thread if it is running
if self.connectionThread and self.connectionThread.isAlive():
self.stopConnectionThread = True
# wait for connection thread to stop
while self.stopConnectionThread:
time.sleep(0.2)
# disconnect super class
self.disconnect()
def __idle(self, timeout):
"""Wait until timeout elapsed or connection thread is stopped
:param timeout: number of seconds to wait
:type timeout: float
:returns: False if thread was stopped
:rtype: boolean
"""
for i in range(int(timeout) * 5):
time.sleep(0.2)
# check if connection was disabled
if self.stopConnectionThread:
self.stopConnectionThread = False
return False
return True
def __start_connect_thread(self):
self.connectionThread = threading.Thread(target=self.__connect_thread, name="secsgem_HsmsActiveConnection_connectThread_{}".format(self.remoteAddress))
self.connectionThread.start()
def __connect_thread(self):
"""Thread function to (re)connect active connection to remote host.
.. warning:: Do not call this directly, for internal use only.
"""
# wait for timeout if this is not the first connection
if not self.firstConnection:
if not self.__idle(self.T5):
return
self.firstConnection = False
# try to connect to remote
while not self.__connect():
if not self.__idle(self.T5):
return
def __connect(self):
"""Open connection to remote host
:returns: True if connection was established, False if connection failed
:rtype: boolean
"""
# create socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# setup socket
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.logger.debug("connecting to %s:%d", self.remoteAddress, self.remotePort)
# try to connect socket
try:
self.sock.connect((self.remoteAddress, self.remotePort))
except socket.error:
self.logger.debug("connecting to %s:%d failed", self.remoteAddress, self.remotePort)
return False
# make socket nonblocking
self.sock.setblocking(0)
# start the receiver thread
self._start_receiver()
return True