Source code for secsgem.hsms.packets
#####################################################################
# packets.py
#
# (c) Copyright 2015, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Contains objects that encapsulate hsms messages"""
import struct
[docs]class HsmsHeader:
"""Generic HSMS header
Base for different specific headers
:param system: message ID
:type system: integer
:param session_id: device / session ID
:type session_id: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsHeader(3, 100)
secsgem.hsms.packets.HsmsHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 3, 'sessionID': 100, 'requireResponse': False, 'sType': 1})
"""
def __init__(self, system, session_id):
self.sessionID = session_id
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x01
self.system = system
def __str__(self):
return '{sessionID:0x%04x, stream:%02d, function:%02d, pType:0x%02x, sType:0x%02x, system:0x%08x, requireResponse:%01d}' % \
(self.sessionID, self.stream, self.function, self.pType, self.sType, self.system, self.requireResponse)
def __repr__(self):
return "%s(%r)" % (self.__class__, self.__dict__)
[docs]class HsmsSelectReqHeader(HsmsHeader):
"""Header for Select Request
Header for message with SType 1.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsSelectReqHeader(14)
secsgem.hsms.packets.HsmsSelectReqHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 14, 'sessionID': 65535, 'requireResponse': False, 'sType': 1})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x01
[docs]class HsmsSelectRspHeader(HsmsHeader):
"""Header for Select Response
Header for message with SType 2.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsSelectRspHeader(24)
secsgem.hsms.packets.HsmsSelectRspHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 24, 'sessionID': 65535, 'requireResponse': False, 'sType': 2})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x02
[docs]class HsmsDeselectReqHeader(HsmsHeader):
"""Header for Deselect Request
Header for message with SType 3.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsDeselectReqHeader(1)
secsgem.hsms.packets.HsmsDeselectReqHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 1, 'sessionID': 65535, 'requireResponse': False, 'sType': 3})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x03
[docs]class HsmsDeselectRspHeader(HsmsHeader):
"""Header for Deselect Response
Header for message with SType 4.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsDeselectRspHeader(1)
secsgem.hsms.packets.HsmsDeselectRspHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 1, 'sessionID': 65535, 'requireResponse': False, 'sType': 4})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x04
[docs]class HsmsLinktestReqHeader(HsmsHeader):
"""Header for Linktest Request
Header for message with SType 5.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsLinktestReqHeader(2)
secsgem.hsms.packets.HsmsLinktestReqHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 2, 'sessionID': 65535, 'requireResponse': False, 'sType': 5})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x05
[docs]class HsmsLinktestRspHeader(HsmsHeader):
"""Header for Linktest Response
Header for message with SType 6.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsLinktestRspHeader(10)
secsgem.hsms.packets.HsmsLinktestRspHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 10, 'sessionID': 65535, 'requireResponse': False, 'sType': 6})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x06
[docs]class HsmsRejectReqHeader(HsmsHeader):
"""Header for Reject Request
Header for message with SType 7.
:param system: message ID
:type system: integer
:param s_type: sType of rejected message
:type s_type: integer
:param reason: reason for rejection
:type reason: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsRejectReqHeader(17, 3, 4)
secsgem.hsms.packets.HsmsRejectReqHeader({'function': 4, 'stream': 3, 'pType': 0, 'system': 17, 'sessionID': 65535, 'requireResponse': False, 'sType': 7})
"""
def __init__(self, system, s_type, reason):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = s_type
self.function = reason
self.pType = 0x00
self.sType = 0x07
[docs]class HsmsSeparateReqHeader(HsmsHeader):
"""Header for Separate Request
Header for message with SType 9.
:param system: message ID
:type system: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsSeparateReqHeader(17)
secsgem.hsms.packets.HsmsSeparateReqHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 17, 'sessionID': 65535, 'requireResponse': False, 'sType': 9})
"""
def __init__(self, system):
HsmsHeader.__init__(self, system, 0xFFFF)
self.requireResponse = False
self.stream = 0x00
self.function = 0x00
self.pType = 0x00
self.sType = 0x09
[docs]class HsmsStreamFunctionHeader(HsmsHeader):
"""Header for SECS message
Header for message with SType 0.
:param system: message ID
:type system: integer
:param stream: messages stream
:type stream: integer
:param function: messages function
:type function: integer
:param require_response: is response expected from remote
:type require_response: boolean
:param session_id: device / session ID
:type session_id: integer
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsStreamFunctionHeader(22, 1, 1, True, 100)
secsgem.hsms.packets.HsmsStreamFunctionHeader({'function': 1, 'stream': 1, 'pType': 0, 'system': 22, 'sessionID': 100, 'requireResponse': True, 'sType': 0})
"""
def __init__(self, system, stream, function, require_response, session_id):
HsmsHeader.__init__(self, system, session_id)
self.sessionID = session_id
self.requireResponse = require_response
self.stream = stream
self.function = function
self.pType = 0x00
self.sType = 0x00
self.system = system
[docs]class HsmsPacket:
"""Class for hsms packet.
Contains all required data and functions.
:param header: header used for this packet
:type header: :class:`secsgem.hsms.packets.HsmsHeader` and derived
:param data: data part used for streams and functions (SType 0)
:type data: string
**Example**::
>>> import secsgem
>>>
>>> secsgem.hsms.packets.HsmsPacket(secsgem.hsms.packets.HsmsLinktestReqHeader(2))
secsgem.hsms.packets.HsmsPacket({'header': secsgem.hsms.packets.HsmsLinktestReqHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 2, 'sessionID': 65535, 'requireResponse': False, 'sType': 5}), 'data': ''})
"""
def __init__(self, header=None, data=""):
if header is None:
self.header = HsmsHeader(0, 0)
else:
self.header = header
self.data = data
def __str__(self):
data = "header: " + self.header.__str__()
return data
def __repr__(self):
return "%s(%r)" % (self.__class__, self.__dict__)
[docs] def encode(self):
"""Encode packet data to hsms packet
:returns: encoded packet
:rtype: string
**Example**::
>>> import secsgem
>>>
>>> packet = secsgem.hsms.packets.HsmsPacket(secsgem.hsms.packets.HsmsLinktestReqHeader(2))
>>> secsgem.common.format_hex(packet.encode())
'00:00:00:0a:ff:ff:00:00:00:05:00:00:00:02'
"""
length = 10 + len(self.data)
data_length_text = str(len(self.data)) + "s"
header_stream = self.header.stream
if self.header.requireResponse:
header_stream |= 0b10000000
return struct.pack(">LHBBBBL" + data_length_text, length, self.header.sessionID, header_stream, self.header.function, self.header.pType, self.header.sType, self.header.system, self.data)
@staticmethod
[docs] def decode(text):
"""Decode byte array hsms packet to HsmsPacket object
:returns: received packet object
:rtype: :class:`secsgem.hsms.packets.HsmsPacket`
**Example**::
>>> import secsgem
>>>
>>> packetData = "\\x00\\x00\\x00\\x0b\\xff\\xff\\x00\\x00\\x00\\x05\\x00\\x00\\x00\\x02"
>>>
>>> secsgem.format_hex(packetData)
'00:00:00:0b:ff:ff:00:00:00:05:00:00:00:02'
>>>
>>> secsgem.hsms.packets.HsmsPacket.decode(packetData)
secsgem.hsms.packets.HsmsPacket({'header': secsgem.hsms.packets.HsmsHeader({'function': 0, 'stream': 0, 'pType': 0, 'system': 2, 'sessionID': 65535, 'requireResponse': False, 'sType': 5}), 'data': ''})
"""
data_length = len(text) - 14
data_length_text = str(data_length) + "s"
res = struct.unpack(">LHBBBBL" + data_length_text, text)
result = HsmsPacket(HsmsHeader(res[6], res[1]))
result.header.requireResponse = (((res[2] & 0b10000000) >> 7) == 1)
result.header.stream = res[2] & 0b01111111
result.header.function = res[3]
result.header.pType = res[4]
result.header.sType = res[5]
result.data = res[7]
return result