Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
stream.py 9.01 KiB
import os
import errno
import logging
import ctypes as c
from collections import deque, defaultdict, namedtuple
from edosl0util.headers import (
PrimaryHeader,
GROUP_FIRST,
GROUP_CONTINUING,
GROUP_LAST,
GROUP_STANDALONE
)
from edosl0util import headers
LOG = logging.getLogger(__name__)
# Max CCSDS sequence id
MAX_SEQ_ID = 2**14 - 1
class Error(Exception):
"""
Base stream error.
"""
class PacketTooShort(Error):
"""
The number of bytes read for a packet does not match the expected length.
After this error occurs the stream is no longer usable because data offsets
are no longer reliable.
"""
class NonConsecutiveSeqId(Error):
"""
Non-consecutive sequence numbers encounterd for apid, i.e., packet gap.
"""
class BasicStream(object):
"""
Basic packet stream iterator that reads the primary and secondary headers and
maintains offsets and read sizes.
"""
Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data'])
def __init__(self, fobj, header_lookup=None, with_data=True):
self.file = fobj
self.header_lookup = header_lookup
self.with_data = with_data
try:
self._offset = self.file.tell()
except IOError as err: # handle illegal seek for pipes
if err.errno != errno.ESPIPE:
raise
self._offset = 0
def __iter__(self):
return self
def __next__(self):
return self.next()
def _read(self, size):
buf = self.file.read(size)
if not buf: # EOF
raise StopIteration()
if len(buf) != size:
raise PacketTooShort(
'expected to read {:d} bytes, got {:d} at offset {:d}'
.format(size, len(buf), self._offset), self.file)
self._offset += size
return buf
def read_primary_header(self):
buf = self._read(headers.PRIMARY_HEADER_SIZE)
h1 = PrimaryHeader.from_buffer_copy(buf)
return h1, headers.PRIMARY_HEADER_SIZE
def read_secondary_header(self, ph):
H2Impl = self.header_lookup(ph) if self.header_lookup else None
if H2Impl is None:
return bytes(), 0
h2size = c.sizeof(H2Impl)
buf = self._read(h2size)
return H2Impl.from_buffer_copy(buf), h2size
def next(self):
offset = self._offset
h1, h1size = self.read_primary_header()
h2, h2size = self.read_secondary_header(h1)
# data length includes h2size
total_size = h1size + h1.data_length_minus1 + 1
data_size = h1.data_length_minus1 + 1 - h2size
if self.with_data:
data = self._read(data_size)
else:
data = None
self.fobj.seek(data_size, os.SEEK_CUR)
return self.Tracker(h1, h2, total_size, offset, data)
class Packet(object):
def __init__(self, h1, h2, data, data_size=None, offset=None):
self.primary_header = h1
self.secondary_header = h2
self.data = data
self.size = data_size
self.offset = offset
self.apid = h1.apid
self.seqid = self.primary_header.source_sequence_count
def __str__(self):
return '<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>' % \
(self.apid, self.seqid, self.stamp, self.size, self.offset)
__repr__ = __str__
@property
def stamp(self):
return (
self.secondary_header and
hasattr(self.secondary_header, 'timecode') and
self.secondary_header.timecode.asdatetime() or None)
@property
def timestamp(self):
return (
self.secondary_header and
hasattr(self.secondary_header, 'timecode') and
self.secondary_header.timecode.astimestamp() or None)
def bytes(self):
return buffer(self.primary_header) + \
buffer(self.secondary_header) + self.data
def is_group(self):
return self.is_first() or self.is_continuing() or self.is_last()
def is_first(self):
return self.primary_header.sequence_grouping == GROUP_FIRST
def is_continuine(self):
return self.primary_header.sequence_grouping == GROUP_CONTINUING
def is_last(self):
return self.primary_header.sequence_grouping == GROUP_LAST
def is_standalone(self):
return self.primary_header.sequence_grouping == GROUP_STANDALONE
class PacketStream(object):
SEQID_NOTSET = -1
def __init__(self, data_stream, fail_on_missing=False, fail_on_tooshort=False):
"""
:param data_stream: An interable of ``Tracker`` objects
:keyword fail_on_missing:
Raise a ``NonConsecutiveSeqId`` error when sequence id gaps are
found. If this is False sequence id gaps are ignored.
:keyword fail_on_tooshort:
Raise a ``PacketTooShort`` error when a packet is encountered for
which less bytes can be read than are expected according to the
packet headers. Most of the this would be due to a truncated file
with an incomplete packet at the end. If this is False a
``StopIteration`` will be raised which will effectively truncate
the file.
"""
self._stream = data_stream
self._seek_cache = deque()
self._first = None
self._last = None
self._apid_info = defaultdict(
lambda: {'count': 0,
'last_seqid': self.SEQID_NOTSET,
'num_missing': 0})
self._fail_on_missing = fail_on_missing
self._fail_on_tooshort = fail_on_tooshort
def __repr__(self):
filepath = getattr(self.file, 'name', None)
return '<{} file={}>'.format(self.__class__.__name__, filepath)
def __iter__(self):
return self
# python3
def __next__(self):
return self.next()
@property
def file(self):
return self._stream.file
def push_back(self, packet):
"""
Put a packet back such that it is the next file provided when
``next`` is called.
"""
self._seek_cache.append(packet)
def next(self):
# return any items on the seek cache before reading more
if len(self._seek_cache):
return self._seek_cache.popleft()
try:
h1, h2, data_size, offset, data = self._stream.next()
except PacketTooShort as err:
if self._fail_on_tooshort:
raise
LOG.error('Packet too short, aborting stream: %s', err)
# The result of this is essentially a file truncation
raise StopIteration()
packet = Packet(h1, h2, data, data_size=data_size, offset=offset)
have_missing = self._update_info(packet)
if have_missing and self._fail_on_missing:
self.push_back(packet)
raise NonConsecutiveSeqId()
return packet
def _update_info(self, packet):
"""
Handle gap detection and first/last. Returns whether any missing
packets were detected.
"""
have_missing = False
apid = self._apid_info[packet.apid]
if packet.stamp:
if self._first is None:
self._first = packet.stamp
else:
self._first = min(packet.stamp, self._first)
if self._last is None:
self._last = packet.stamp
else:
self._last = max(packet.stamp, self._last)
apid['count'] += 1
if apid['last_seqid'] != self.SEQID_NOTSET:
if packet.seqid > apid['last_seqid']:
num_missing = packet.seqid - apid['last_seqid'] - 1
else:
num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID
have_missing = num_missing != apid['num_missing']
apid['num_missing'] += num_missing
apid['last_seqid'] = packet.seqid
return have_missing
def info(self):
return self._first, self._last, self._apid_info
def seek_to(self, stamp, apid=None):
"""
Seek forward such that the next packet provided will be the first packet
having a timestamp available in the secondary header >= stamp and apid.
"""
# seek past partial groups
packet = self.next()
while not packet.stamp:
packet = self.next()
# seek to first packet => `stamp`
current_stamp = packet.stamp
while current_stamp < stamp:
packet = self.next()
current_stamp = packet.stamp or current_stamp
# put back so next packet is the first packet > `stamp`
self.push_back(packet)
if apid is not None:
packet = self.next()
while packet.apid != apid:
packet = self.next()
self.push_back(packet)
def jpss_packet_stream(fobj, **kwargs):
stream = BasicStream(fobj, headers.jpss_header_lookup)
return PacketStream(stream, **kwargs)
def aqua_packet_stream(fobj, **kwargs):
stream = BasicStream(fobj, headers.aqua_header_lookup)
return PacketStream(stream, **kwargs)