#####################################################################
# handler.py
#
# (c) Copyright 2013-2015, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Contains class to create model for hsms endpoints."""
import logging
import Queue
import random
import threading
from ..common import EventProducer
from ..common.fysom import Fysom
from connections import HsmsActiveConnection, HsmsPassiveConnection, hsmsSTypes
from packets import HsmsPacket, HsmsRejectReqHeader, HsmsStreamFunctionHeader, HsmsSelectReqHeader, \
HsmsSelectRspHeader, HsmsLinktestReqHeader, HsmsLinktestRspHeader, HsmsDeselectReqHeader, HsmsDeselectRspHeader, \
HsmsSeparateReqHeader
[docs]class HsmsHandler(EventProducer):
"""Baseclass for creating Host/Equipment models. This layer contains the HSMS functionality. Inherit from this class and override required functions.
:param address: IP address of remote host
:type address: string
:param port: TCP port of remote host
:type port: integer
:param active: Is the connection active (*True*) or passive (*False*)
:type active: boolean
:param session_id: session / device ID to use for connection
:type session_id: integer
:param name: Name of the underlying configuration
:type name: string
:param event_handler: object for event handling
:type event_handler: :class:`secsgem.common.EventHandler`
:param custom_connection_handler: object for connection handling (ie multi server)
:type custom_connection_handler: :class:`secsgem.hsms.connections.HsmsMultiPassiveServer`
**Example**::
import secsgem
def onConnect(event, data):
print "Connected"
client = secsgem.HsmsHandler("10.211.55.33", 5000, True, 0, "test", event_handler=secsgem.EventHandler(events={'hsms_connected': onConnect}))
client.enable()
time.sleep(3)
client.disable()
"""
def __init__(self, address, port, active, session_id, name, event_handler=None, custom_connection_handler=None):
EventProducer.__init__(self, event_handler)
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
self.communicationLogger = logging.getLogger("hsms_communication")
self.address = address
self.port = port
self.active = active
self.sessionID = session_id
self.name = name
self.connected = False
# system id counter
self.systemCounter = random.randint(0, (2 ** 32) - 1)
# repeating linktest variables
self.linktestTimer = None
self.linktestTimeout = 30
# response queues
self._systemQueues = {}
# hsms connection state fsm
self.connectionState = Fysom({
'initial': 'NOT_CONNECTED',
'events': [
{'name': 'connect', 'src': 'NOT_CONNECTED', 'dst': 'CONNECTED'},
{'name': 'disconnect', 'src': ['CONNECTED', 'NOT_SELECTED', 'SELECTED'], 'dst': 'NOT_CONNECTED'},
{'name': 'select', 'src': ['CONNECTED', 'NOT_SELECTED'], 'dst': 'SELECTED'},
{'name': 'deselect', 'src': 'SELECTED', 'dst': 'NOT_SELECTED'},
{'name': 'timeoutT7', 'src': ['CONNECTED', 'NOT_SELECTED'], 'dst': 'NOT_CONNECTED'},
],
'callbacks': {
'onNOT_SELECTED': self._on_state_connect,
'onNOT_CONNECTED': self._on_state_disconnect,
'onSELECTED': self._on_state_select,
},
'autoforward': [
{'src': 'CONNECTED', 'dst': 'NOT_SELECTED'}
]
})
# setup connection
if self.active:
if custom_connection_handler is None:
self.connection = HsmsActiveConnection(self.address, self.port, self.sessionID, self)
else:
self.connection = custom_connection_handler.create_connection(self.address, self.port, self.sessionID, self)
else:
if custom_connection_handler is None:
self.connection = HsmsPassiveConnection(self.address, self.port, self.sessionID, self)
else:
self.connection = custom_connection_handler.create_connection(self.address, self.port, self.sessionID, self)
[docs] def get_next_system_counter(self):
"""Returns the next System.
:returns: System for the next command
:rtype: integer
"""
self.systemCounter += 1
if self.systemCounter > ((2 ** 32) - 1):
self.systemCounter = 0
return self.systemCounter
def _on_state_connect(self, _):
"""Connection state model got event connect
:param data: event attributes
:type data: object
"""
# start linktest timer
self.linktestTimer = threading.Timer(self.linktestTimeout, self._on_linktest_timer)
self.linktestTimer.start()
# start select process if connection is active
if self.active:
response = self.send_select_req()
if response is None:
self.logger.warning("select request failed")
def _on_state_disconnect(self, _):
"""Connection state model got event disconnect
:param data: event attributes
:type data: object
"""
# stop linktest timer
if self.linktestTimer:
self.linktestTimer.cancel()
self.linktestTimer = None
def _on_state_select(self, _):
"""Connection state model got event select
:param data: event attributes
:type data: object
"""
# send event
self.fire_event('hsms_selected', {'connection': self})
# notify hsms handler of selection
if hasattr(self, '_on_hsms_select') and callable(getattr(self, '_on_hsms_select')):
self._on_hsms_select()
def _on_linktest_timer(self):
"""Linktest time timed out, so send linktest request"""
# send linktest request and wait for response
self.send_linktest_req()
# restart the timer
self.linktestTimer = threading.Timer(self.linktestTimeout, self._on_linktest_timer)
self.linktestTimer.start()
[docs] def on_connection_established(self, _):
"""Connection was established"""
self.connected = True
# update connection state
self.connectionState.connect()
self.fire_event("hsms_connected", {'connection': self})
[docs] def on_connection_before_closed(self, _):
"""Connection is about to be closed"""
# send separate request
self.send_separate_req()
[docs] def on_connection_closed(self, _):
"""Connection was closed"""
# update connection state
self.connected = False
self.connectionState.disconnect()
self.fire_event("hsms_disconnected", {'connection': self})
[docs] def on_connection_packet_received(self, _, packet):
"""Packet received by connection
:param packet: received data packet
:type packet: :class:`secsgem.hsms.packets.HsmsPacket`
"""
# if packet.header.system > self.systemCounter or packet.header.system == 0:
# self.systemCounter = packet.header.system
if packet.header.sType > 0:
self.communicationLogger.info("< %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
# check if it is a select request
if packet.header.sType == 0x01:
# if we are disconnecting send reject else send response
if self.connection.disconnecting:
self.send_reject_rsp(packet.header.system, packet.header.sType, 4)
else:
self.send_select_rsp(packet.header.system)
# update connection state
self.connectionState.select()
# check if it is a select response
elif packet.header.sType == 0x02:
# update connection state
self.connectionState.select()
if packet.header.system in self._systemQueues:
# send packet to request sender
self._systemQueues[packet.header.system].put_nowait(packet)
# what to do if no sender for request waiting?
# check if it is a deselect request
elif packet.header.sType == 0x03:
# if we are disconnecting send reject else send response
if self.connection.disconnecting:
self.send_reject_rsp(packet.header.system, packet.header.sType, 4)
else:
self.send_deselect_rsp(packet.header.system)
# update connection state
self.connectionState.deselect()
# check if it is a deselect response
elif packet.header.sType == 0x04:
# update connection state
self.connectionState.deselect()
if packet.header.system in self._systemQueues:
# send packet to request sender
self._systemQueues[packet.header.system].put_nowait(packet)
# what to do if no sender for request waiting?
# check if it is a linktest request
elif packet.header.sType == 0x05:
# if we are disconnecting send reject else send response
if self.connection.disconnecting:
self.send_reject_rsp(packet.header.system, packet.header.sType, 4)
else:
self.send_linktest_rsp(packet.header.system)
else:
if packet.header.system in self._systemQueues:
# send packet to request sender
self._systemQueues[packet.header.system].put_nowait(packet)
# what to do if no sender for request waiting?
else:
if hasattr(self, 'secs_decode') and callable(getattr(self, 'secs_decode')):
message = self.secs_decode(packet)
if message is None:
self.communicationLogger.info("< %s", packet, extra=self._get_log_extra())
else:
self.communicationLogger.info("< %s\n%s", packet, message, extra=self._get_log_extra())
else:
self.communicationLogger.info("< %s", packet, extra=self._get_log_extra())
if not self.connectionState.isstate("SELECTED"):
self.logger.warning("received message when not selected")
out_packet = HsmsPacket(HsmsRejectReqHeader(packet.header.system, packet.header.sType, 4))
self.communicationLogger.info("> %s\n %s", out_packet, hsmsSTypes[out_packet.header.sType], extra=self._get_log_extra())
self.connection.send_packet(out_packet)
return True
# someone is waiting for this message
if packet.header.system in self._systemQueues:
# send packet to request sender
self._systemQueues[packet.header.system].put_nowait(packet)
# redirect packet to hsms handler
elif hasattr(self, '_on_hsms_packet_received') and callable(getattr(self, '_on_hsms_packet_received')):
self._on_hsms_packet_received(packet)
# just log if nobody is interested
else:
self.logger.warning("packet unhandled")
def _get_queue_for_system(self, system_id):
"""Creates a new queue to receive responses for a certain system
:param system_id: system id to watch
:type system_id: int
:returns: queue to receive responses with
:rtype: Queue.Queue
"""
self._systemQueues[system_id] = Queue.Queue()
return self._systemQueues[system_id]
def _remove_queue(self, system_id):
"""Remove queue for system id from list
:param system_id: system id to remove
:type system_id: int
"""
del self._systemQueues[system_id]
def _serialize_data(self):
"""Returns data for serialization
:returns: data to serialize for this object
:rtype: dict
"""
return {'address': self.address, 'port': self.port, 'active': self.active, 'sessionID': self.sessionID, 'name': self.name, 'connected': self.connected}
[docs] def enable(self):
"""Enables the connection"""
self.connection.enable()
[docs] def disable(self):
"""Disables the connection"""
self.connection.disable()
[docs] def send_stream_function(self, packet):
"""Send the packet and wait for the response
:param packet: packet to be sent
:type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction`
"""
out_packet = HsmsPacket(HsmsStreamFunctionHeader(self.get_next_system_counter(), packet.stream, packet.function, True, self.sessionID), packet.encode())
if packet is None:
self.communicationLogger.info("> %s", out_packet, extra=self._get_log_extra())
else:
self.communicationLogger.info("> %s\n%s", out_packet, packet, extra=self._get_log_extra())
return self.connection.send_packet(out_packet)
[docs] def send_and_waitfor_response(self, packet):
"""Send the packet and wait for the response
:param packet: packet to be sent
:type packet: :class:`secsgem.secs.functionbase.SecsStreamFunction`
:returns: Packet that was received
:rtype: :class:`secsgem.hsms.packets.HsmsPacket`
"""
system_id = self.get_next_system_counter()
response_queue = self._get_queue_for_system(system_id)
out_packet = HsmsPacket(HsmsStreamFunctionHeader(system_id, packet.stream, packet.function, True, self.sessionID), packet.encode())
if packet is None:
self.communicationLogger.info("> %s", out_packet, extra=self._get_log_extra())
else:
self.communicationLogger.info("> %s\n%s", out_packet, packet, extra=self._get_log_extra())
if not self.connection.send_packet(out_packet):
self.logger.error("Sending packet failed")
self._remove_queue(system_id)
return None
try:
response = response_queue.get(True, self.connection.T3)
except Queue.Empty:
response = None
self._remove_queue(system_id)
return response
[docs] def send_response(self, function, system):
"""Send response function for system
:param function: function to be sent
:type function: :class:`secsgem.secs.functionbase.SecsStreamFunction`
:param system: system to reply to
:type system: integer
"""
out_packet = HsmsPacket(HsmsStreamFunctionHeader(system, function.stream, function.function, False, self.sessionID), function.encode())
if function is None:
self.communicationLogger.info("> %s", out_packet, extra=self._get_log_extra())
else:
self.communicationLogger.info("> %s\n%s", out_packet, function, extra=self._get_log_extra())
return self.connection.send_packet(out_packet)
[docs] def send_select_req(self):
"""Send a Select Request to the remote host
:returns: System of the sent request
:rtype: integer
"""
system_id = self.get_next_system_counter()
response_queue = self._get_queue_for_system(system_id)
packet = HsmsPacket(HsmsSelectReqHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
if not self.connection.send_packet(packet):
self._remove_queue(system_id)
return None
try:
response = response_queue.get(True, self.connection.T6)
except Queue.Empty:
response = None
self._remove_queue(system_id)
return response
[docs] def send_select_rsp(self, system_id):
"""Send a Select Response to the remote host
:param system_id: System of the request to reply for
:type system_id: integer
"""
packet = HsmsPacket(HsmsSelectRspHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
return self.connection.send_packet(packet)
[docs] def send_linktest_req(self):
"""Send a Linktest Request to the remote host
:returns: System of the sent request
:rtype: integer
"""
system_id = self.get_next_system_counter()
response_queue = self._get_queue_for_system(system_id)
packet = HsmsPacket(HsmsLinktestReqHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
if not self.connection.send_packet(packet):
self._remove_queue(system_id)
return None
try:
response = response_queue.get(True, self.connection.T6)
except Queue.Empty:
response = None
self._remove_queue(system_id)
return response
[docs] def send_linktest_rsp(self, system_id):
"""Send a Linktest Response to the remote host
:param system_id: System of the request to reply for
:type system_id: integer
"""
packet = HsmsPacket(HsmsLinktestRspHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
return self.connection.send_packet(packet)
[docs] def send_deselect_req(self):
"""Send a Deselect Request to the remote host
:returns: System of the sent request
:rtype: integer
"""
system_id = self.get_next_system_counter()
response_queue = self._get_queue_for_system(system_id)
packet = HsmsPacket(HsmsDeselectReqHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
if not self.connection.send_packet(packet):
self._remove_queue(system_id)
return None
try:
response = response_queue.get(True, self.connection.T6)
except Queue.Empty:
response = None
self._remove_queue(system_id)
return response
[docs] def send_deselect_rsp(self, system_id):
"""Send a Deselect Response to the remote host
:param system_id: System of the request to reply for
:type system_id: integer
"""
packet = HsmsPacket(HsmsDeselectRspHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
return self.connection.send_packet(packet)
[docs] def send_reject_rsp(self, system_id, s_type, reason):
"""Send a Reject Response to the remote host
:param system_id: System of the request to reply for
:type system_id: integer
:param s_type: s_type of rejected message
:type s_type: integer
:param reason: reason for rejection
:type reason: integer
"""
packet = HsmsPacket(HsmsRejectReqHeader(system_id, s_type, reason))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
return self.connection.send_packet(packet)
[docs] def send_separate_req(self):
"""Send a Separate Request to the remote host"""
system_id = self.get_next_system_counter()
packet = HsmsPacket(HsmsSeparateReqHeader(system_id))
self.communicationLogger.info("> %s\n %s", packet, hsmsSTypes[packet.header.sType], extra=self._get_log_extra())
if not self.connection.send_packet(packet):
return None
return system_id
# helpers
def _get_log_extra(self):
return {"address": self.address, "port": self.port, "sessionID": self.sessionID, "remoteName": self.name}