#####################################################################
# byte_queue.py
#
# (c) Copyright 2023, Benjamin Parzella. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#####################################################################
"""Queue for bytes."""
from __future__ import annotations
import threading
[docs]class ByteQueue:
"""FIFO class for queuing and retrieving bytes."""
def __init__(self) -> None:
"""Initialize the queue."""
self._buffer = bytearray()
self._buffer_lock = threading.Condition()
[docs] def append(self, data: bytes):
"""Add bytes to the end of the queue.
Args:
data: bytes to add
"""
with self._buffer_lock:
self._buffer.extend(data)
self._buffer_lock.notify_all()
[docs] def pop(self, size: int = 1) -> bytes:
"""Remove and return bytes from the beginning of queue.
Args:
size: number of bytes to remove
Returns:
removed bytes
"""
with self._buffer_lock:
data = self._buffer[:size]
del self._buffer[:size]
return data
[docs] def pop_byte(self) -> int:
"""Remove and return single byte from the beginning of queue.
Returns:
removed byte
"""
with self._buffer_lock:
data = self._buffer[0]
del self._buffer[0]
return data
[docs] def peek(self, size: int = 1) -> bytes:
"""Get bytes from beginning of the queue without removing them.
Args:
size: number of bytes to peek
Returns:
peek bytes
"""
return self._buffer[:size]
[docs] def peek_byte(self, position: int = 0) -> int:
"""Get single byte in the buffer without removing.
Args:
position: byte position to peek at
Returns:
peek bytes
"""
return self._buffer[position]
[docs] def clear(self):
"""Clear the bytes in the queue."""
with self._buffer_lock:
self._buffer.clear()
def __len__(self) -> int:
"""Get the length of the queue.
Returns:
queue length
"""
return len(self._buffer)
[docs] def wait_for(self, size: int = 1, peek: bool = False) -> bytes:
"""Wait until the requested number of bytes is available in the receive queue.
Args:
size: number of bytes
peek: only look, don't remove the item from the queue.
Returns:
Found bytes
"""
def min_size() -> bool:
return len(self._buffer) >= size
if len(self._buffer) < size:
with self._buffer_lock:
self._buffer_lock.wait_for(min_size)
if peek:
return self.peek(size)
return self.pop(size)
[docs] def wait_for_byte(self, peek: bool = False) -> int:
"""Wait until one byte is available in the receive queue.
Args:
peek: only look, don't remove the item from the queue.
Returns:
Found byte
"""
return self.wait_for(peek=peek)[0]