Common functionality

settings base class.

class secsgem.common.settings.DeviceType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Type of device (equipment or host).

class secsgem.common.settings.Settings(**kwargs)[source]

Settings base class.

These attributes can be initialized in the constructor and accessed as property.

property timeouts: Timeouts

Communication timeouts.

property device_type: DeviceType

Device type.

Default: DeviceType.HOST

property streams_functions: StreamsFunctions

Container with streams/functions.

Default: Global predefined stream function list

property data_items: DataItems

Container with data items.

Default: Global predefined data items list

property device_id: int

Device ID to use for connection.

Default: 0

property establish_communication_timeout: int

Time to wait between CA requests.

Default: 10

abstract create_protocol()[source]

Protocol class for this configuration.

Return type:

Protocol

abstract create_connection()[source]

Connection class for this configuration.

Return type:

Connection

abstract property name: str

Name of this configuration.

abstract generate_thread_name(functionality)[source]

Generate a unique thread name for this configuration and a provided functionality.

Parameters:

functionality (str) – name of the functionality to generate thread name for

Return type:

str

Returns:

generated thread name

Contains helper functions.

class secsgem.common.Block(header, data)[source]

Base class for data block.

property header: BlockHeaderT

Get the header.

property data: bytes

Get the data.

property checksum: int

Get the checksum.

encode()[source]

Encode block data.

Return type:

bytes

Returns:

byte-encoded block

classmethod decode(data)[source]

Decode a byte array to Block object.

Parameters:

data (bytes) – byte-encode packet data

Return type:

Optional[TypeVar(BlockT, bound= Block)]

Returns:

received packet object

class secsgem.common.BlockSendInfo(data)[source]

Container for sending block and waiting for result.

property data: bytes

Get the data for sending.

resolve(result)[source]

Resolve the send data with a result.

Parameters:

result (bool) – result to resolve with

wait()[source]

Wait for the message is sent and a result is available.

Return type:

bool

class secsgem.common.ByteQueue[source]

FIFO class for queuing and retrieving bytes.

append(data)[source]

Add bytes to the end of the queue.

Parameters:

data (bytes) – bytes to add

pop(size=1)[source]

Remove and return bytes from the beginning of queue.

Parameters:

size (int) – number of bytes to remove

Return type:

bytes

Returns:

removed bytes

pop_byte()[source]

Remove and return single byte from the beginning of queue.

Return type:

int

Returns:

removed byte

peek(size=1)[source]

Get bytes from beginning of the queue without removing them.

Parameters:

size (int) – number of bytes to peek

Return type:

bytes

Returns:

peek bytes

peek_byte(position=0)[source]

Get single byte in the buffer without removing.

Parameters:

position (int) – byte position to peek at

Return type:

int

Returns:

peek bytes

clear()[source]

Clear the bytes in the queue.

wait_for(size=1, peek=False)[source]

Wait until the requested number of bytes is available in the receive queue.

Parameters:
  • size (int) – number of bytes

  • peek (bool) – only look, don’t remove the item from the queue.

Return type:

bytes

Returns:

Found bytes

wait_for_byte(peek=False)[source]

Wait until one byte is available in the receive queue.

Parameters:

peek (bool) – only look, don’t remove the item from the queue.

Return type:

int

Returns:

Found byte

class secsgem.common.CallbackHandler[source]

Handler for callbacks for HSMS/SECS/GEM events.

This handler manages callbacks for events that can happen on a handler for a connection.

class secsgem.common.Connection(settings)[source]

Abstract base class for a connection.

property on_connected: Event

Get the connected event.

Callbacks to this event are called, when a connection was established.

property on_data: Event

Get the data received event.

Callbacks to this event are called, when data was received.

property on_disconnecting: Event

Get the disconnecting event.

Callbacks to this event are called, before the connection is separated.

property on_disconnected: Event

Get the disconnect event.

Callbacks to this event are called, after the connection was separated.

property connected: bool

Get the connected flag.

This flag is True, when a connection is established.

property disconnecting: bool

Get the disconnecting flag.

This flag is True, when the connection is about to be separated.

abstract enable()[source]

Enable the connection.

Open port and start receiver thread.

abstract disable()[source]

Disable the connection.

Close port and stop receiver thread.

abstract send_data(data)[source]

Send data to the remote host.

Parameters:

data (bytes) – encoded data.

Return type:

bool

Returns:

True if succeeded, False if failed

class secsgem.common.DeviceType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Type of device (equipment or host).

class secsgem.common.EventProducer[source]

Manages the consumers for the events and handles firing events.

fire(event, data)[source]

Fire a event.

calls all the available handlers for a specific event

Parameters:
  • event (str) – name of the event

  • data (dict[str, Any]) – data connected to this event

property targets: Targets

Targets used as consumer for this producer.

class secsgem.common.Header(system, device_id, stream, function, require_response)[source]

Abstract base class for a message header.

property device_id: int

Get device id.

property stream: int

Get stream.

property function: int

Get function.

property system: int

Get system.

property require_response: bool

Get require response flag.

abstract encode()[source]

Encode header to message.

Return type:

bytes

Returns:

encoded header

abstract classmethod decode(data)[source]

Decode byte array header to Header object.

Parameters:

data (bytes) – byte-encode header data

Return type:

Header

Returns:

Header object

updated_with(**kwargs)[source]

Get a new header with updated fields.

Parameters:

kwargs – parameter name will update constructor field

Return type:

TypeVar(HeaderT, bound= Header)

Returns:

new header with modified data

class secsgem.common.Message(header, data, complete=True)[source]

Abstract base class for a message.

classmethod from_block(block)[source]

Initialize Message object from Block object.

Parameters:

block (Block) – block to initialize from

Return type:

TypeVar(MessageT, bound= Message)

Returns:

Message object

abstract property header: Header

Get the header.

abstract property data: bytes

Get the header.

abstract property complete: bool

Check if the message is complete.

property blocks: list[BlockT]

Get the blocks.

class secsgem.common.Protocol(settings)[source]

Abstract base class for a protocol.

property events

Property for event handling.

get_next_system_counter()[source]

Return the next System.

Return type:

int

Returns:

System for the next command

enable()[source]

Enable the connection.

disable()[source]

Disable the connection.

abstract serialize_data()[source]

Get protocol serialized data for debugging.

Return type:

dict[str, Any]

Returns:

data to serialize for this object

send_message(message)[source]

Send a message to the remote host.

Parameters:

message (Message) – message to be transmitted

Return type:

bool

Returns:

True if sending was successful

send_and_waitfor_response(function)[source]

Send the message and wait for the response.

Parameters:

function (SecsStreamFunction) – message to be sent

Return type:

Message | None

Returns:

Message that was received

send_response(function, system)[source]

Send response function for system.

Parameters:
  • function (SecsStreamFunction) – function to be sent

  • system (int) – system to reply to

Return type:

bool

Returns:

True if sending was successful

send_stream_function(function)[source]

Send the message and wait for the response.

Parameters:

function (SecsStreamFunction) – message to be sent

Return type:

bool

Returns:

True if successful

class secsgem.common.ProtocolDispatcher(receiver_target, dispatcher_target, settings)[source]

Thread that calls a target function when a trigger was raised.

start()[source]

Start the thread.

stop()[source]

Stop the thread.

trigger_receiver()[source]

Trigger the thread to call target function.

queue_block(source, block)[source]

Add a block to the dispatch queue and trigger dispatch thread.

Parameters:
  • source (object) – source of the block

  • block (Block) – new block

class secsgem.common.SerialConnection(settings)[source]

Connection class used for serial connections.

enable()[source]

Enable the connection.

Open port and start receiver thread.

disable()[source]

Disable the connection.

Close port and stop receiver thread.

send_data(data)[source]

Send data to the remote host.

Parameters:

data (bytes) – encoded data.

Return type:

bool

Returns:

True if succeeded, False if failed

class secsgem.common.Settings(**kwargs)[source]

Settings base class.

These attributes can be initialized in the constructor and accessed as property.

property timeouts: Timeouts

Communication timeouts.

property device_type: DeviceType

Device type.

Default: DeviceType.HOST

property streams_functions: StreamsFunctions

Container with streams/functions.

Default: Global predefined stream function list

property data_items: DataItems

Container with data items.

Default: Global predefined data items list

property device_id: int

Device ID to use for connection.

Default: 0

property establish_communication_timeout: int

Time to wait between CA requests.

Default: 10

abstract create_protocol()[source]

Protocol class for this configuration.

Return type:

Protocol

abstract create_connection()[source]

Connection class for this configuration.

Return type:

Connection

abstract property name: str

Name of this configuration.

abstract generate_thread_name(functionality)[source]

Generate a unique thread name for this configuration and a provided functionality.

Parameters:

functionality (str) – name of the functionality to generate thread name for

Return type:

str

Returns:

generated thread name

class secsgem.common.State(state, name, parent=None, initial=False)[source]

State machine state class.

property state: Enum

Get the connection state for this state.

property name: str

Get the name for this state.

property events: EventProducer

Property for event handling.

property parent: State | None

Get parent state if available.

property active: bool

Get if the state is active.

enter(source)[source]

Enter the state.

Parameters:

source (State | None) – state to enter from.

leave(destination)[source]

Leave the state.

Parameters:

destination (State | None) – state to enter from.

class secsgem.common.StateMachine[source]

Base state machine.

property current: Enum

Get the current state enum.

property current_state: State

Get the current state.

transition(name)[source]

Get the object for a specific transition.

Parameters:

name (str) – transition name

Return type:

Transition

class secsgem.common.TcpClientConnection(settings)[source]

Client class for single tcp client connection.

enable()[source]

Enable the connection.

Starts the client connection process to the remote.

disable()[source]

Disable the connection.

Stops all connection attempts, and closes the connection

class secsgem.common.TcpServerConnection(settings)[source]

Server class for TCP server connection.

Creates a listening socket and waits for one incoming connection on this socket. After the connection is established the listening socket is closed.

enable()[source]

Enable the connection.

Starts the connection server process.

disable()[source]

Disable the connection.

Stops all connection attempts, and closes the connection

class secsgem.common.Timeouts(**kwargs)[source]

Timeouts.

Example

>>> import secsgem.common
>>> timeouts = secsgem.common.Timeouts(t3=60.0)
>>> timeouts.t3
60.0
>>> timeouts.t4
10.0
t1
Type:

float

Value:

5.0

Inter-Character Timeout

t2
Type:

float

Value:

100.0

Protocol Timeout

t3
Type:

float

Value:

45.0

Reply Timeout

t4
Type:

float

Value:

10.0

Inter-Block Timeout

t5
Type:

float

Value:

10.0

Connect Separation Time

t6
Type:

float

Value:

5.0

Control Transaction Timeout

t7
Type:

float

Value:

8.0

Not Selected Timeout

t8
Type:

float

Value:

5.0

Network Intercharacter Timeout

classmethod timeouts()[source]

Get a list of default timeouts.

Return type:

list[_Timeout]

classmethod args()[source]

Get a list of available arguments.

Return type:

list[str]

class secsgem.common.Transition(name, sources, destination)[source]

State machine transition class.

property name: str

Get transition name.

property events: EventProducer

Property for event handling.

property sources: list[State]

Get the allowed source states for the transition.

property destination: State

Get the destination state for the transition.

exception secsgem.common.UnknownTransitionError(transition)[source]

Exception for unknown transition.

exception secsgem.common.WrongSourceStateError(transition, expected, actual)[source]

Exception for wrong source state for transition.

secsgem.common.format_hex(text)[source]

Return byte arrays (string) formated as hex numbers.

Parameters:

text (bytes) – byte array

Return type:

str

Returns:

formated sring

Example

>>> import secsgem.common
>>>
>>> data = b"asdfg"
>>> secsgem.common.format_hex(data)
'61:73:64:66:67'
secsgem.common.function_name(function)[source]

Get name of function or method.

Return type:

str

Returns:

function/method name

secsgem.common.indent_block(block, spaces=2)[source]

Indent a multiline string by a number of spaces.

Parameters:
  • block (str) – input text

  • spaces (int) – number of spaces to prepend to each line

Return type:

str

Returns:

indented text

secsgem.common.is_errorcode_ewouldblock(errorcode)[source]

Check if the errorcode is a would-block error.

Parameters:

errorcode (int) – Code of the error

Return type:

bool

Returns:

True if blocking error code

secsgem.common.is_windows()[source]

Return True if running on windows.

Return type:

bool

Returns:

True when running on a windows system