Source code for secsgem.hsms.handler

#####################################################################
# handler.py
#
# (c) Copyright 2013-2015, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Contains class to create model for hsms endpoints."""

import logging
import time
import threading

from secsgem.common import EventProducer
from secsgem.common.fysom import Fysom

from connections import HsmsActiveConnection, HsmsPassiveConnection, hsmsSTypes
from packets import HsmsPacket, HsmsRejectReqHeader, HsmsStreamFunctionHeader, HsmsSelectReqHeader, \
    HsmsSelectRspHeader, HsmsLinktestReqHeader, HsmsLinktestRspHeader, HsmsDeselectReqHeader, HsmsDeselectRspHeader, \
    HsmsSeparateReqHeader


[docs]class HsmsHandler(EventProducer): """Baseclass for creating Host/Equipment models. This layer contains the HSMS functionality. Inherit from this class and override required functions. :param address: IP address of remote host :type address: string :param port: TCP port of remote host :type port: integer :param active: Is the connection active (*True*) or passive (*False*) :type active: boolean :param session_id: session / device ID to use for connection :type session_id: integer :param name: Name of the underlying configuration :type name: string :param event_handler: object for event handling :type event_handler: :class:`secsgem.common.EventHandler` :param custom_connection_handler: object for connection handling (ie multi server) :type custom_connection_handler: :class:`secsgem.hsms.connections.HsmsMultiPassiveServer` **Example**:: import secsgem def onConnect(event, data): print "Connected" client = secsgem.HsmsHandler("10.211.55.33", 5000, True, 0, "test", event_handler=secsgem.EventHandler(events={'hsms_connected': onConnect})) client.enable() time.sleep(3) client.disable() """ def __init__(self, address, port, active, session_id, name, event_handler=None, custom_connection_handler=None): EventProducer.__init__(self, event_handler) self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self.address = address self.port = port self.active = active self.sessionID = session_id self.name = name self.connected = False # repeating linktest variables self.linktestTimer = None self.linktestTimeout = 30 # event and packet queues self.eventQueue = [] self.packetQueue = [] # hsms connection state fsm self.connectionState = Fysom({ 'initial': 'NOT_CONNECTED', 'events': [ {'name': 'connect', 'src': 'NOT_CONNECTED', 'dst': 'CONNECTED'}, {'name': 'disconnect', 'src': ['CONNECTED', 'NOT_SELECTED', 'SELECTED'], 'dst': 'NOT_CONNECTED'}, {'name': 'select', 'src': ['CONNECTED', 'NOT_SELECTED'], 'dst': 'SELECTED'}, {'name': 'deselect', 'src': 'SELECTED', 'dst': 'NOT_SELECTED'}, {'name': 'timeoutT7', 'src': ['CONNECTED', 'NOT_SELECTED'], 'dst': 'NOT_CONNECTED'}, ], 'callbacks': { 'onNOT_SELECTED': self._on_state_connect, 'onNOT_CONNECTED': self._on_state_disconnect, 'onSELECTED': self._on_state_select, }, 'autoforward': [ {'src': 'CONNECTED', 'dst': 'NOT_SELECTED'} ] }) # setup connection if self.active: if custom_connection_handler is None: self.connection = HsmsActiveConnection(self.address, self.port, self.sessionID, self) else: self.connection = custom_connection_handler.create_connection(self.address, self.port, self.sessionID, self) else: if custom_connection_handler is None: self.connection = HsmsPassiveConnection(self.address, self.port, self.sessionID, self) else: self.connection = custom_connection_handler.create_connection(self.address, self.port, self.sessionID, self) def _on_state_connect(self, _): """Connection state model got event connect :param data: event attributes :type data: object """ # start linktest timer self.linktestTimer = threading.Timer(self.linktestTimeout, self._on_linktest_timer) self.linktestTimer.start() # start select process if connection is active if self.active: system_id = self.send_select_req() self.waitfor_select_rsp(system_id) def _on_state_disconnect(self, _): """Connection state model got event disconnect :param data: event attributes :type data: object """ # stop linktest timer if self.linktestTimer: self.linktestTimer.cancel() self.linktestTimer = None def _on_state_select(self, _): """Connection state model got event select :param data: event attributes :type data: object """ # send event self.fire_event('hsms_selected', {'connection': self}) # notify hsms handler of selection if hasattr(self, '_on_hsms_select') and callable(getattr(self, '_on_hsms_select')): self._on_hsms_select() def _on_linktest_timer(self): """Linktest time timed out, so send linktest request""" # send linktest request and wait for response system_id = self.send_linktest_req() self.waitfor_linktest_rsp(system_id) # restart the timer self.linktestTimer = threading.Timer(self.linktestTimeout, self._on_linktest_timer) self.linktestTimer.start()
[docs] def on_connection_established(self, _): """Connection was established""" # update connection state self.connectionState.connect() self.connected = True self.fire_event("hsms_connected", {'connection': self})
[docs] def on_connection_before_closed(self, _): """Connection is about to be closed""" # send separate request self.send_separate_req()
[docs] def on_connection_closed(self, _): """Connection was closed""" # update connection state self.connected = False self.connectionState.disconnect() self.fire_event("hsms_disconnected", {'connection': self})
def _queue_packet(self, packet): """Add packet to event queue :param packet: received data packet :type packet: :class:`secsgem.hsms.packets.HsmsPacket` """ # add to event queue self.packetQueue.append(packet) # notify all that new event arrived for event in self.eventQueue: event.set()
[docs] def on_connection_packet_received(self, _, packet): """Packet received by connection :param packet: received data packet :type packet: :class:`secsgem.hsms.packets.HsmsPacket` """ if packet.header.sType > 0: self.logger.info("< %s\n %s", packet, hsmsSTypes[packet.header.sType]) # check if it is a select request if packet.header.sType == 0x01: # if we are disconnecting send reject else send response if self.connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.sType, 4) else: self.send_select_rsp(packet.header.system) # update connection state self.connectionState.select() # check if it is a select response elif packet.header.sType == 0x02: # update connection state self.connectionState.select() # queue packet to notify waiting threads self._queue_packet(packet) # check if it is a deselect request elif packet.header.sType == 0x03: # if we are disconnecting send reject else send response if self.connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.sType, 4) else: self.send_deselect_rsp(packet.header.system) # update connection state self.connectionState.deselect() elif packet.header.sType == 0x04: # update connection state self.connectionState.deselect() # queue packet to notify waiting threads self._queue_packet(packet) # check if it is a linktest request elif packet.header.sType == 0x05: # if we are disconnecting send reject else send response if self.connection.disconnecting: self.send_reject_rsp(packet.header.system, packet.header.sType, 4) else: self.send_linktest_rsp(packet.header.system) else: # queue packet if not handeled self._queue_packet(packet) else: if not self.connectionState.isstate("SELECTED"): self.logger.info("< %s", packet) self.logger.warning("received message when not selected") self.connection.send_packet(HsmsPacket(HsmsRejectReqHeader(packet.header.system, packet.header.sType, 4))) return True # redirect packet to hsms handler if hasattr(self, '_on_hsms_packet_received') and callable(getattr(self, '_on_hsms_packet_received')): self._on_hsms_packet_received(packet) else: self.logger.info("< %s", packet)
def _serialize_data(self): """Returns data for serialization :returns: data to serialize for this object :rtype: dict """ return {'address': self.address, 'port': self.port, 'active': self.active, 'sessionID': self.sessionID, 'name': self.name, 'connected': self.connected}
[docs] def enable(self): """Enables the connection""" self.connection.enable()
[docs] def disable(self): """Disables the connection""" self.connection.disable()
[docs] def waitfor_stream_function(self, stream, function, is_control=False): """Wait for an incoming stream and function and return the receive data :param stream: number of stream to wait for :type stream: integer :param function: number of function to wait for :type function: integer :param is_control: is it a control packet :type is_control: bool :returns: Packet that was received :rtype: :class:`secsgem.hsms.packets.HsmsPacket` """ if is_control: # setup timeout to T6 timeout = time.time() + self.connection.T6 else: # setup timeout to T3 timeout = time.time() + self.connection.T3 # setup event for new item in queue event = threading.Event() self.eventQueue.append(event) found_packet = None while found_packet is None: for packet in self.packetQueue: if (packet.header.stream == stream) and (packet.header.function == function): self.packetQueue.remove(packet) found_packet = packet break if found_packet is None: if event.wait(1): event.clear() elif not self.connected or self.connection.disconnecting or time.time() > timeout: return None self.eventQueue.remove(event) return found_packet
[docs] def send_stream_function(self, packet): """Send the packet and wait for the response :param packet: packet to be sent :type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction` """ out_packet = HsmsPacket(HsmsStreamFunctionHeader(self.connection.get_next_system_counter(), packet.stream, packet.function, True, self.sessionID), packet.encode()) self.connection.send_packet(out_packet)
[docs] def waitfor_system(self, system, is_control=False): """Wait for an message with supplied system :param system: number of system to wait for :type system: integer :returns: Packet that was received :rtype: :class:`secsgem.hsms.packets.HsmsPacket` """ if not self.connected: self.logger.warning("handler not connected waiting for response for system {0}".format(system)) return None if is_control: # setup timeout to T6 timeout = time.time() + self.connection.T6 else: # setup timeout to T3 timeout = time.time() + self.connection.T3 event = threading.Event() self.eventQueue.append(event) found_packet = None while found_packet is None: for packet in self.packetQueue: if packet.header.system == system: self.packetQueue.remove(packet) found_packet = packet break if found_packet is None: if event.wait(1): event.clear() elif not self.connected or self.connection.disconnecting or time.time() > timeout: self.logger.warning("response for system {0} not received within timeout".format(system)) return None self.eventQueue.remove(event) return found_packet
[docs] def send_and_waitfor_response(self, packet): """Send the packet and wait for the response :param packet: packet to be sent :type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction` :returns: Packet that was received :rtype: :class:`secsgem.hsms.packets.HsmsPacket` """ out_packet = HsmsPacket(HsmsStreamFunctionHeader(self.connection.get_next_system_counter(), packet.stream, packet.function, True, self.sessionID), packet.encode()) self.connection.send_packet(out_packet) return self.waitfor_system(out_packet.header.system, (packet.stream == 0))
[docs] def send_response(self, function, system): """Send response function for system :param function: function to be sent :type function: :class:`secsgem.secs.functionbase.SecsStreamFunction` :param system: system to reply to :type system: integer """ out_packet = HsmsPacket(HsmsStreamFunctionHeader(system, function.stream, function.function, False, self.sessionID), function.encode()) self.connection.send_packet(out_packet)
[docs] def send_select_req(self): """Send a Select Request to the remote host :returns: System of the sent request :rtype: integer """ system_id = self.connection.get_next_system_counter() packet = HsmsPacket(HsmsSelectReqHeader(system_id)) self.connection.send_packet(packet) return system_id
[docs] def send_select_rsp(self, system_id): """Send a Select Response to the remote host :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsSelectRspHeader(system_id)) self.connection.send_packet(packet)
[docs] def waitfor_select_rsp(self, system_id): """Wait for an incoming Select Response :param system_id: System of the request to reply for :type system_id: integer :returns: Packet that was received :rtype: :class:`secsgem.hsms.packets.HsmsPacket` """ result = self.waitfor_system(system_id, True) return result
[docs] def send_linktest_req(self): """Send a Linktest Request to the remote host :returns: System of the sent request :rtype: integer """ system_id = self.connection.get_next_system_counter() packet = HsmsPacket(HsmsLinktestReqHeader(system_id)) self.connection.send_packet(packet) return system_id
[docs] def send_linktest_rsp(self, system_id): """Send a Linktest Response to the remote host :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsLinktestRspHeader(system_id)) self.connection.send_packet(packet)
[docs] def waitfor_linktest_rsp(self, system_id): """Wait for an incoming Linktest Response :param system_id: System of the request to reply for :type system_id: integer :returns: Packet that was received :rtype: :class:`secsgem.hsms.packets.HsmsPacket` """ return self.waitfor_system(system_id, True)
[docs] def send_deselect_req(self): """Send a Deselect Request to the remote host :returns: System of the sent request :rtype: integer """ system_id = self.connection.get_next_system_counter() packet = HsmsPacket(HsmsDeselectReqHeader(system_id)) self.connection.send_packet(packet) return system_id
[docs] def send_deselect_rsp(self, system_id): """Send a Deselect Response to the remote host :param system_id: System of the request to reply for :type system_id: integer """ packet = HsmsPacket(HsmsDeselectRspHeader(system_id)) self.connection.send_packet(packet)
[docs] def waitfor_deselect_rsp(self, system_id): """Wait for an incoming Deselect Response :param system_id: System of the request to reply for :type system_id: integer :returns: Packet that was received :rtype: :class:`secsgem.hsms.packets.HsmsPacket` """ result = self.waitfor_system(system_id, True) return result
[docs] def send_reject_rsp(self, system_id, s_type, reason): """Send a Reject Response to the remote host :param system_id: System of the request to reply for :type system_id: integer :param s_type: s_type of rejected message :type s_type: integer :param reason: reason for rejection :type reason: integer """ packet = HsmsPacket(HsmsRejectReqHeader(system_id, s_type, reason)) self.connection.send_packet(packet)
[docs] def send_separate_req(self): """Send a Separate Request to the remote host""" system_id = self.connection.get_next_system_counter() packet = HsmsPacket(HsmsSeparateReqHeader(system_id)) self.connection.send_packet(packet) return system_id