#####################################################################
# handler.py
#
# (c) Copyright 2013-2024, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Handler for SECS commands."""
from __future__ import annotations
import logging
import typing
import secsgem.common
import secsgem.hsms
if typing.TYPE_CHECKING:
from .data_items.data_items import DataItems
from .functions.base import SecsStreamFunction
[docs]
class SecsHandler: # pylint: disable=too-many-instance-attributes,too-many-public-methods
"""Baseclass for creating Host/Equipment models. This layer contains the SECS functionality.
Inherit from this class and override required functions.
"""
def __init__(self, settings: secsgem.common.Settings):
"""Initialize a secs handler.
Args:
settings: settings defining protocol and connection
"""
self._settings = settings
self._protocol = settings.create_protocol()
self._protocol.events.message_received += self._on_message_received
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
self._callback_handler = secsgem.common.CallbackHandler()
self._callback_handler.target = self
@property
def settings(self) -> secsgem.common.Settings:
"""Get the setting object."""
return self._settings
@staticmethod
def _generate_sf_callback_name(stream: int, function: int) -> str:
return f"s{stream:02d}f{function:02d}"
@property
def protocol(self) -> secsgem.common.Protocol:
"""Get the connection for the handler."""
return self._protocol
[docs]
def enable(self):
"""Enable the connection."""
self.protocol.enable()
[docs]
def disable(self):
"""Disable the connection."""
self.protocol.disable()
[docs]
def send_response(self, function: SecsStreamFunction, system: int) -> bool:
"""Wrapper for connections send_response function."""
return self.protocol.send_response(function, system)
[docs]
def send_and_waitfor_response(self, function: SecsStreamFunction) -> secsgem.common.Message | None:
"""Wrapper for connections send_and_waitfor_response function."""
return self.protocol.send_and_waitfor_response(function)
[docs]
def send_stream_function(self, function: SecsStreamFunction) -> bool:
"""Wrapper for connections send_stream_function function."""
return self.protocol.send_stream_function(function)
@property
def events(self) -> secsgem.common.EventProducer:
"""Wrapper for connections events."""
return self.protocol.events
@property
def callbacks(self) -> secsgem.common.CallbackHandler:
"""Property for callback handling."""
return self._callback_handler
[docs]
def register_stream_function(self, stream: int, function: int, callback: typing.Callable):
"""Register the function callback for stream and function.
Args:
stream: stream to register callback for
function: function to register callback for
callback: method to call when stream and functions is received
"""
name = self._generate_sf_callback_name(stream, function)
setattr(self._callback_handler, name, callback)
[docs]
def unregister_stream_function(self, stream: int, function: int):
"""Unregister the function callback for stream and function.
Args:
stream: stream to unregister callback for
function: function to register callback for
"""
name = self._generate_sf_callback_name(stream, function)
setattr(self._callback_handler, name, None)
def _handle_unknown_functions(self, message: secsgem.common.Message):
self.logger.warning(
"unexpected function received S%02dF%02d\n%s",
message.header.stream,
message.header.function,
message.header,
)
# reply S09F05 if no callback present
if message.header.require_response:
self.send_response(self.stream_function(9, 5)(message.header.encode()), message.header.system)
def _handle_stream_function(self, message: secsgem.common.Message):
sf_callback_index = self._generate_sf_callback_name(message.header.stream, message.header.function)
if sf_callback_index not in self._callback_handler:
self._handle_unknown_functions(message)
return
try:
callback = getattr(self._callback_handler, sf_callback_index)
result = callback(self, message)
if result is not None:
self.send_response(result, message.header.system)
except Exception: # pylint: disable=broad-except
self.logger.exception("Callback aborted because of exception, abort sent")
self.send_response(self.stream_function(message.header.stream, 0)(), message.header.system)
def _on_message_received(self, data: dict[str, typing.Any]):
"""Message received from protocol layer.
Args:
data: received data
"""
message = data["message"]
# check if callbacks available for this stream and function
self._handle_stream_function(message)
[docs]
def disable_ceids(self) -> secsgem.common.Message | None:
"""Disable all Collection Events."""
self.logger.info("Disable all collection events")
return self.send_and_waitfor_response(self.stream_function(2, 37)({"CEED": False, "CEID": []}))
[docs]
def disable_ceid_reports(self) -> secsgem.common.Message | None:
"""Disable all Collection Event Reports."""
self.logger.info("Disable all collection event reports")
return self.send_and_waitfor_response(self.stream_function(2, 33)({"DATAID": 0, "DATA": []}))
[docs]
def list_svs(self, svs: list[str | int] | None = None) -> SecsStreamFunction:
"""Get list of available Status Variables.
Args:
svs: Status Variables to list
Returns:
available Status Variables
"""
self.logger.info("Get list of status variables")
if svs is None:
svs = []
return self.settings.streams_functions.decode(self.send_and_waitfor_response(self.stream_function(1, 11)(svs)))
[docs]
def request_svs(self, svs: list[str | int]) -> SecsStreamFunction:
"""Request contents of supplied Status Variables.
Args:
svs: Status Variables to request
Returns:
values of requested Status Variables
"""
self.logger.info("Get value of status variables %s", svs)
return self.settings.streams_functions.decode(self.send_and_waitfor_response(self.stream_function(1, 3)(svs)))
[docs]
def request_sv(self, sv_id: int | str) -> int | str | None:
"""Request contents of one Status Variable.
Args:
sv_id: id of Status Variable
Returns:
value of requested Status Variable
"""
self.logger.info("Get value of status variable %s", sv_id)
result = self.request_svs([sv_id])
if result is None:
return None
return result[0]
[docs]
def list_ecs(self, ecs: list[str | int] | None = None) -> SecsStreamFunction:
"""Get list of available Equipment Constants.
Args:
ecs: Equipment Constants to list
Returns:
available Equipment Constants
"""
self.logger.info("Get list of equipment constants")
if ecs is None:
ecs = []
return self.settings.streams_functions.decode(self.send_and_waitfor_response(self.stream_function(2, 29)(ecs)))
[docs]
def request_ecs(self, ecs: list[int | str]) -> SecsStreamFunction:
"""Request contents of supplied Equipment Constants.
Args:
ecs: Equipment Constants to request
Returns:
values of requested Equipment Constants
"""
self.logger.info("Get value of equipment constants %s", ecs)
return self.settings.streams_functions.decode(self.send_and_waitfor_response(self.stream_function(2, 13)(ecs)))
[docs]
def request_ec(self, ec_id: int | str) -> SecsStreamFunction:
"""Request contents of one Equipment Constant.
Args:
ec_id: id of Equipment Constant
Returns:
value of requested Equipment Constant
"""
self.logger.info("Get value of equipment constant %s", ec_id)
return self.request_ecs([ec_id])
[docs]
def set_ecs(self, ecs: list[list[str | int | float]]) -> int:
"""Set contents of supplied Equipment Constants.
Args:
ecs: list containing list of id / value pairs
"""
self.logger.info("Set value of equipment constants %s", ecs)
s2f16 = self.settings.streams_functions.decode(
self.send_and_waitfor_response(self.stream_function(2, 15)(ecs)),
)
return s2f16.get() # EAC
[docs]
def set_ec(self, ec_id: int | str, value: int | str | float) -> int:
"""Set contents of one Equipment Constant.
Args:
ec_id: id of Equipment Constant
value: new content of Equipment Constant
"""
self.logger.info("Set value of equipment constant %s to %s", ec_id, value)
return self.set_ecs([[ec_id, value]])
[docs]
def send_equipment_terminal(self, terminal_id: int, text: str) -> secsgem.common.Message | None:
"""Set text to equipment terminal.
Args:
terminal_id: ID of terminal
text: text to send
"""
self.logger.info("Send text to terminal %s", terminal_id)
return self.send_and_waitfor_response(self.stream_function(10, 3)({"TID": terminal_id, "TEXT": text}))
[docs]
def are_you_there(self) -> secsgem.common.Message | None:
"""Check if remote is still replying."""
self.logger.info("Requesting 'are you there'")
return self.send_and_waitfor_response(self.stream_function(1, 1)())
[docs]
def stream_function(self, stream: int, function: int) -> type[SecsStreamFunction]:
"""Get class for stream and function.
Args:
stream: stream to get class for
function: function to get class for
Returns:
class for function
"""
klass = self.settings.streams_functions.function(stream, function)
if klass is None:
raise KeyError(f"Undefined function requested: S{stream:02d}F{function:02d}")
return klass
@property
def data_items(self) -> DataItems:
"""Get data item container.
Returns:
data item container
"""
return self.settings.streams_functions.data_items