Source code for secsgem.gem.hosthandler

#####################################################################
# hosthandler.py
#
# (c) Copyright 2013-2024, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Handler for GEM host."""

from __future__ import annotations

import collections
import typing

import secsgem.common

from .handler import GemHandler

if typing.TYPE_CHECKING:
    import secsgem.secs


[docs] class GemHostHandler(GemHandler): """Baseclass for creating host models. Inherit from this class and override required functions.""" def __init__(self, settings: secsgem.common.Settings): """Initialize a gem host handler. Args: settings: communication settings """ GemHandler.__init__(self, settings) self.is_host = True self.report_subscriptions: dict[int | str, list[int | str]] = {}
[docs] def clear_collection_events(self) -> None: """Clear all collection events.""" self._logger.info("Clearing collection events") # clear subscribed reports self.report_subscriptions = {} # disable all ceids self.disable_ceids() # delete all reports self.disable_ceid_reports()
[docs] def subscribe_collection_event(self, ceid: int | str, dvs: list[int | str], report_id: int | str | None = None): """Subscribe to a collection event. Args: ceid: ID of the collection event dvs: DV IDs to add for collection event report_id: optional - ID for report, autonumbering if None """ self._logger.info("Subscribing to collection event %s", ceid) if report_id is None: report_id = self._report_id_counter self._report_id_counter += 1 # note subscribed reports self.report_subscriptions[report_id] = dvs # create report self.send_and_waitfor_response( self.stream_function(2, 33)({"DATAID": 0, "DATA": [{"RPTID": report_id, "VID": dvs}]}), ) # link event report to collection event self.send_and_waitfor_response( self.stream_function(2, 35)({"DATAID": 0, "DATA": [{"CEID": ceid, "RPTID": [report_id]}]}), ) # enable collection event self.send_and_waitfor_response(self.stream_function(2, 37)({"CEED": True, "CEID": [ceid]}))
[docs] def send_remote_command(self, rcmd: int | str, params: list[str]) -> secsgem.secs.SecsStreamFunction: """Send a remote command. Args: rcmd: Name of command params: DV IDs to add for collection event Returns: response from remote """ self._logger.info("Send RCMD %s", rcmd) s2f41 = self.stream_function(2, 41)() s2f41.RCMD = rcmd if isinstance(params, list): for param in params: s2f41.PARAMS.append({"CPNAME": param[0], "CPVAL": param[1]}) elif isinstance(params, collections.OrderedDict): for param in params: s2f41.PARAMS.append({"CPNAME": param, "CPVAL": params[param]}) # send remote command return self.settings.streams_functions.decode(self.send_and_waitfor_response(s2f41))
[docs] def delete_process_programs(self, ppids: list[int | str]): """Delete a list of process program. Args: ppids: Process programs to delete Returns: response from remote """ self._logger.info("Delete process programs %s", ppids) # send remote command return self.settings.streams_functions.decode( self.send_and_waitfor_response(self.stream_function(7, 17)(ppids)), ).get()
[docs] def get_process_program_list(self): """Get process program list. Returns: response from remote """ self._logger.info("Get process program list") # send remote command return self.settings.streams_functions.decode( self.send_and_waitfor_response(self.stream_function(7, 19)()), ).get()
[docs] def go_online(self): """Set control state to online. Returns: response text or None if no response text """ self._logger.info("Go online") # send remote command resp = self.settings.streams_functions.decode(self.send_and_waitfor_response(self.stream_function(1, 17)())) if resp is None: return None return resp.get()
[docs] def go_offline(self): """Set control state to offline. Returns: response text or None if no response text """ self._logger.info("Go offline") # send remote command return self.settings.streams_functions.decode( self.send_and_waitfor_response(self.stream_function(1, 15)()), ).get()
[docs] def enable_alarm(self, alid: int | str): """Enable alarm. Args: alid: alarm id to enable Returns: response from remote """ self._logger.info("Enable alarm %d", alid) return self.settings.streams_functions.decode( self.send_and_waitfor_response( self.stream_function(5, 3)({"ALED": self.settings.data_items.ALED.ENABLE, "ALID": alid}), ), ).get()
[docs] def disable_alarm(self, alid: int | str): """Disable alarm. Args: alid: alarm id to disable Returns: response from remote """ self._logger.info("Disable alarm %d", alid) return self.settings.streams_functions.decode( self.send_and_waitfor_response( self.stream_function(5, 3)({"ALED": self.settings.data_items.ALED.DISABLE, "ALID": alid}), ), ).get()
[docs] def list_alarms(self, alids: list[int | str] | None = None): """List alarms. Args: alids: alarms to list details for Returns: response from remote """ if alids is None: alids = [] self._logger.info("List all alarms") else: self._logger.info("List alarms %s", alids) return self.settings.streams_functions.decode( self.send_and_waitfor_response(self.stream_function(5, 5)(alids)), ).get()
[docs] def list_enabled_alarms(self): """List enabled alarms. Returns: response from remote """ self._logger.info("List all enabled alarms") return self.settings.streams_functions.decode( self.send_and_waitfor_response(self.stream_function(5, 7)()), ).get()
def _on_alarm_received( self, _handler: secsgem.secs.SecsHandler, _alarm_id: int, _alarm_code: int, _alarm_text: str, ): """Handle received alarm. Args: handler: handler the message was received on alarm_id: alarm ID alarm_code: alarm code alarm_text: alarm text Returns: response to alarm """ return self.settings.data_items.ACKC5.ACCEPTED def _on_s05f01( self, handler: secsgem.secs.SecsHandler, message: secsgem.common.Message, ) -> secsgem.secs.SecsStreamFunction | None: """Handle Stream 5, Function 1, Alarm request. Args: handler: handler the message was received on message: complete message received """ s5f1 = self.settings.streams_functions.decode(message) result = self._callback_handler.alarm_received(handler, s5f1.ALID, s5f1.ALCD, s5f1.ALTX) self.events.fire( "alarm_received", {"code": s5f1.ALCD, "alid": s5f1.ALID, "text": s5f1.ALTX, "handler": self.protocol, "peer": self}, ) return self.stream_function(5, 2)(result) def _on_s06f11( self, _handler: secsgem.secs.SecsHandler, message: secsgem.common.Message, ) -> secsgem.secs.SecsStreamFunction | None: """Handle Stream 6, Function 11, Event Report Send. Args: handler: handler the message was received on message: complete message received """ function = self.settings.streams_functions.decode(message) for report in function.RPT: report_dvs = self.report_subscriptions[report.RPTID.get()] report_values = report.V.get() values = [] for index, data_value_id in enumerate(report_dvs): values.append({"dvid": data_value_id, "value": report_values[index]}) data = { "ceid": function.CEID, "rptid": report.RPTID, "values": values, "handler": self.protocol, "peer": self, } self.events.fire("collection_event_received", data) return self.stream_function(6, 12)(0) def _on_terminal_received(self, _handler: secsgem.secs.SecsHandler, _terminal_id: int, _text: str): """Handle received terminal message. Args: handler: handler the message was received on terminal_id: terminal ID text: messsage text """ return self.settings.data_items.ACKC10.ACCEPTED def _on_s10f01( self, handler: secsgem.secs.SecsHandler, message: secsgem.common.Message, ) -> secsgem.secs.SecsStreamFunction | None: """Handle Stream 10, Function 1, Terminal Request. Args: handler: handler the message was received on message: complete message received """ s10f1 = self.settings.streams_functions.decode(message) result = self._callback_handler.terminal_received(handler, s10f1.TID, s10f1.TEXT) self.events.fire( "terminal_received", {"text": s10f1.TEXT, "terminal": s10f1.TID, "handler": self.protocol, "peer": self}, ) return self.stream_function(10, 2)(result)