Source code for secsgem.secs.handler

#####################################################################
# handler.py
#
# (c) Copyright 2013-2015, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Handler for SECS commands. Used in combination with :class:`secsgem.HsmsHandler.HsmsConnectionManager`"""

import logging
import threading
import copy

from ..common import StreamFunctionCallbackHandler
from ..hsms.handler import HsmsHandler
import functions


[docs]class SecsHandler(StreamFunctionCallbackHandler, HsmsHandler, object): """Baseclass for creating Host/Equipment models. This layer contains the SECS functionality. Inherit from this class and override required functions. :param address: IP address of remote host :type address: string :param port: TCP port of remote host :type port: integer :param active: Is the connection active (*True*) or passive (*False*) :type active: boolean :param session_id: session / device ID to use for connection :type session_id: integer :param name: Name of the underlying configuration :type name: string :param event_handler: object for event handling :type event_handler: :class:`secsgem.common.EventHandler` :param custom_connection_handler: object for connection handling (ie multi server) :type custom_connection_handler: :class:`secsgem.hsms.connections.HsmsMultiPassiveServer` """ def __init__(self, address, port, active, session_id, name, event_handler=None, custom_connection_handler=None): StreamFunctionCallbackHandler.__init__(self) HsmsHandler.__init__(self, address, port, active, session_id, name, event_handler, custom_connection_handler) self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) self._collectionEvents = {} self._dataValues = {} self._alarms = {} self._remoteCommands = {} self.secsStreamsFunctions = copy.deepcopy(functions.secsStreamsFunctions) @property def collection_events(self): """Dictionary of available collection events *Example*:: >>> handler.collection_events[123] = {'name': 'collectionEventName', 'dvids': [1, 5] } **Key** Id of the collection event (integer) **Data** Dictionary with the following fields name Name of the collection event (string) dvids Data values for the collection event (list of integers) """ return self._collectionEvents @property def data_values(self): """Dictionary of available data values *Example*:: >>> handler.data_values[5] = {'name': 'dataValueName', 'ceid': 123 } **Key** Id of the data value (integer) **Data** Dictionary with the following fields name Name of the data value (string) ceid Collection event the data value is used for (integer) """ return self._dataValues @property def alarms(self): """Dictionary of available alarms *Example*:: >>> handler.alarms[137] = {'ceidon': 1371, 'ceidoff': 1372} **Key** Id of the alarm (integer) **Data** Dictionary with the following fields ceidon Collection event id for alarm on (integer) ceidoff Collection event id for alarm off (integer) """ return self._alarms @property def remote_commands(self): """Dictionary of available remote commands *Example*:: >>> handler.remote_commands["PP_SELECT"] = {'params': [{'name': 'PROGRAM', 'format': 'A'}], 'ceids': [200, 343]} **Key** Name of the remote command (string) **Data** Dictionary with the following fields params Parameters for the remote command (list of dictionaries) *Parameters* The dictionaries have the following fields name name of the parameter (string) format format character of the parameter (string) ceids Collection events ids the remote command might return (list of integers) """ return self._remoteCommands def _run_callbacks(self, callback_index, response): handeled = False try: for callback in self.callbacks[callback_index]: if not callback(self, response) is False: handeled = True if not handeled: self.logger.warning("no callback wanted to handle handle %s\n%s", callback_index, response) except Exception, e: self.logger.error('exception {0}'.format(e), exc_info=True) def _on_hsms_packet_received(self, packet): """Packet received from hsms layer :param packet: received data packet :type packet: :class:`secsgem.hsms.packets.HsmsPacket` """ # check if callbacks available for this stream and function callback_index = "s" + str(packet.header.stream) + "f" + str(packet.header.function) if callback_index in self.callbacks: threading.Thread(target=self._run_callbacks, args=(callback_index, packet), name="secsgem_secsHandler_callback_{}".format(callback_index)).start() else: self.logger.warning("unexpected function received %s\n%s", callback_index, packet.header) if packet.header.requireResponse: self.send_response(functions.SecsS09F05(packet.header.encode()), packet.header.system)
[docs] def disable_ceids(self): """Disable all Collection Events.""" self.logger.info("Disable all collection events") if not self.connection: return None return self.send_and_waitfor_response(self.stream_function(2, 37)({"CEED": False, "CEID": []}))
[docs] def disable_ceid_reports(self): """Disable all Collection Event Reports.""" self.logger.info("Disable all collection event reports") if not self.connection: return None return self.send_and_waitfor_response(self.stream_function(2, 33)({"DATAID": 0, "DATA": []}))
[docs] def list_svs(self, svs=[]): """Get list of available Service Variables. :returns: available Service Variables :rtype: list """ self.logger.info("Get list of service variables") if not self.connection: return None packet = self.send_and_waitfor_response(self.stream_function(1, 11)(svs)) return self.secs_decode(packet)
[docs] def request_svs(self, svs): """Request contents of supplied Service Variables. :param svs: Service Variables to request :type svs: list :returns: values of requested Service Variables :rtype: list """ self.logger.info("Get value of service variables {0}".format(svs)) if not self.connection: return None packet = self.send_and_waitfor_response(self.stream_function(1, 3)(svs)) return self.secs_decode(packet)
[docs] def request_sv(self, sv): """Request contents of one Service Variable. :param sv: id of Service Variable :type sv: int :returns: value of requested Service Variable :rtype: various """ self.logger.info("Get value of service variable {0}".format(sv)) return self.request_svs([sv])[0]
[docs] def list_ecs(self, ecs=[]): """Get list of available Equipment Constants. :returns: available Equipment Constants :rtype: list """ self.logger.info("Get list of equipment constants") if not self.connection: return None packet = self.send_and_waitfor_response(self.stream_function(2, 29)(ecs)) return self.secs_decode(packet)
[docs] def request_ecs(self, ecs): """Request contents of supplied Equipment Constants. :param ecs: Equipment Constants to request :type ecs: list :returns: values of requested Equipment Constants :rtype: list """ self.logger.info("Get value of equipment constants {0}".format(ecs)) if not self.connection: return None packet = self.send_and_waitfor_response(self.stream_function(2, 13)(ecs)) return self.secs_decode(packet)
[docs] def request_ec(self, ec): """Request contents of one Equipment Constant. :param ec: id of Equipment Constant :type ec: int :returns: value of requested Equipment Constant :rtype: various """ self.logger.info("Get value of equipment constant {0}".format(ec)) return self.request_ecs([ec])
[docs] def set_ecs(self, ecs): """Set contents of supplied Equipment Constants. :param ecs: list containing list of id / value pairs :type ecs: list """ self.logger.info("Set value of equipment constants {0}".format(ecs)) if not self.connection: return None packet = self.send_and_waitfor_response(self.stream_function(2, 15)(ecs)) return self.secs_decode(packet).get()
[docs] def set_ec(self, ec, value): """Set contents of one Equipment Constant. :param ec: id of Equipment Constant :type ec: int :param value: new content of Equipment Constant :type value: various """ self.logger.info("Set value of equipment constant {0} to {1}".format(ec, value)) return self.set_ecs([[ec, value]])
[docs] def send_equipment_terminal(self, terminal_id, text): """Set text to equipment terminal :param terminal_id: ID of terminal :type terminal_id: int :param text: text to send :type text: string """ self.logger.info("Send text to terminal {0}".format(terminal_id)) if not self.connection: return None return self.send_and_waitfor_response(self.stream_function(10, 3)({"TID": terminal_id, "TEXT": text}))
[docs] def get_ceid_name(self, ceid): """Get the name of a collection event :param ceid: ID of collection event :type ceid: integer :returns: Name of the event or empty string if not found :rtype: string """ if ceid in self._collectionEvents: if "name" in self._collectionEvents[ceid]: return self._collectionEvents[ceid]["name"] return ""
[docs] def get_dvid_name(self, dvid): """Get the name of a data value :param dvid: ID of data value :type dvid: integer :returns: Name of the event or empty string if not found :rtype: string """ if dvid in self._dataValues: if "name" in self._dataValues[dvid]: return self._dataValues[dvid]["name"] return ""
[docs] def are_you_there(self): """Check if remote is still replying""" self.logger.info("Requesting 'are you there'") if not self.connection: return None return self.send_and_waitfor_response(self.stream_function(1, 1)())
[docs] def stream_function(self, stream, function): """Get class for stream and function :param stream: stream to get function for :type stream: int :param function: function to get :type function: int :return: matching stream and function class :rtype: secsSxFx class """ if stream not in self.secsStreamsFunctions: self.logger.warning("unknown function S%02dF%02d", stream, function) return None else: if function not in self.secsStreamsFunctions[stream]: self.logger.warning("unknown function S%02dF%02d", stream, function) return None else: return self.secsStreamsFunctions[stream][function]
[docs] def secs_decode(self, packet): """Get object of decoded stream and function class, or None if no class is available. :param packet: packet to get object for :type packet: :class:`secsgem.hsms.packets.HsmsPacket` :return: matching stream and function object :rtype: secsSxFx object """ if packet is None: return None if packet.header.stream not in self.secsStreamsFunctions: self.logger.warning("unknown function S%02dF%02d", packet.header.stream, packet.header.function) return None if packet.header.function not in self.secsStreamsFunctions[packet.header.stream]: self.logger.warning("unknown function S%02dF%02d", packet.header.stream, packet.header.function) return None # self.logger.debug("decoding function S{}F{} using {}".format(packet.header.stream, packet.header.function, self.secsStreamsFunctions[packet.header.stream][packet.header.function].__name__)) function = self.secsStreamsFunctions[packet.header.stream][packet.header.function]() function.decode(packet.data) # self.logger.debug("decoded {}".format(function)) return function