Newer
Older
import os
from collections import defaultdict, deque, namedtuple
from edosl0util.headers import (GROUP_CONTINUING, GROUP_FIRST, GROUP_LAST,
GROUP_STANDALONE, PrimaryHeader)
class Error(Exception):
"""
Base stream error.
"""
class PacketTooShort(Error):
"""
The number of bytes read for a packet does not match the expected length.
After this error occurs the stream is no longer usable because data offsets
are no longer reliable.
"""
class NonConsecutiveSeqId(Error):
"""
Non-consecutive sequence numbers encounterd for apid, i.e., packet gap.
class BasicStream(object):
Basic packet stream iterator that reads the primary and secondary headers
and maintains offsets and read sizes.
Tracker = namedtuple("Tracker", ["h1", "h2", "size", "offset", "data"])
def __init__(self, fobj, header_lookup=None, with_data=True):
self.header_lookup = header_lookup
self.with_data = with_data
try:
self._offset = self.file.tell()
except IOError as err: # handle illegal seek for pipes
if err.errno != errno.ESPIPE:
raise
self._offset = 0
def __iter__(self):
return self
def __next__(self):
return self.next()
def _read(self, size):
if len(buf) != size:
raise PacketTooShort(
"expected to read {:d} bytes, got {:d} at offset {:d}".format(
size, len(buf), self._offset
),
self.file,
)
return buf
def read_primary_header(self):
buf = self._read(headers.PRIMARY_HEADER_SIZE)
return h1, headers.PRIMARY_HEADER_SIZE
def read_secondary_header(self, ph):
H2Impl = self.header_lookup(ph) if self.header_lookup else None
if H2Impl is None:
return bytes(), 0
h2size = c.sizeof(H2Impl)
buf = self._read(h2size)
return H2Impl.from_buffer_copy(buf), h2size
h1, h1size = self.read_primary_header()
h2, h2size = self.read_secondary_header(h1)
# data length includes h2size
total_size = h1size + h1.data_length_minus1 + 1
data_size = h1.data_length_minus1 + 1 - h2size
data = None
if self.with_data and data_size:
data = self._read(data_size)
else:
return self.Tracker(h1, h2, total_size, offset, data)
def __init__(self, h1, h2, data, data_size=None, offset=None):
self.primary_header = h1
self.secondary_header = h2
self.data = data
self.size = data_size
self.offset = offset
self.apid = h1.apid
self.seqid = self.primary_header.source_sequence_count
return "<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>" % (
self.apid,
self.seqid,
self.stamp,
self.size,
self.offset,
)
def cds_timecode(self):
self.secondary_header
and hasattr(self.secondary_header, "timecode")
and self.secondary_header.timecode.day_segmented_timecode()
or None
)
def stamp(self):
self.secondary_header
and hasattr(self.secondary_header, "timecode")
and self.secondary_header.timecode.asdatetime()
or None
)
return (
bytearray(self.primary_header)
+ bytearray(self.secondary_header)
+ self.data
)
return self.is_first() or self.is_continuing() or self.is_last()
return self.primary_header.sequence_grouping == GROUP_FIRST
return self.primary_header.sequence_grouping == GROUP_CONTINUING
# compatibility
is_continuine = is_continuing
return self.primary_header.sequence_grouping == GROUP_LAST
return self.primary_header.sequence_grouping == GROUP_STANDALONE
def __init__(self, data_stream, fail_on_missing=False, fail_on_tooshort=False):
"""
:param data_stream: An interable of ``Tracker`` objects
:keyword fail_on_missing:
Raise a ``NonConsecutiveSeqId`` error when sequence id gaps are
found. If this is False sequence id gaps are ignored.
:keyword fail_on_tooshort:
Raise a ``PacketTooShort`` error when a packet is encountered for
which less bytes can be read than are expected according to the
packet headers. Most of the this would be due to a truncated file
with an incomplete packet at the end. If this is False a
``StopIteration`` will be raised which will effectively truncate
the file.
"""
self._stream = data_stream
self._first = None
self._last = None
lambda: {"count": 0, "last_seqid": self.SEQID_NOTSET, "num_missing": 0}
)
self._fail_on_missing = fail_on_missing
self._fail_on_tooshort = fail_on_tooshort
filepath = getattr(self.file, "name", None)
return "<{} file={}>".format(self.__class__.__name__, filepath)
# python3
def __next__(self):
return self.next()
@property
def file(self):
return self._stream.file
"""
Put a packet back such that it is the next file provided when
``next`` is called.
"""
def next(self):
# return any items on the seek cache before reading more
if len(self._seek_cache):
h1, h2, data_size, offset, data = next(self._stream)
if self._fail_on_tooshort:
raise
# The result of this is essentially a file truncation
raise StopIteration()
packet = Packet(h1, h2, data, data_size=data_size, offset=offset)
have_missing = self._update_info(packet)
if have_missing and self._fail_on_missing:
self.push_back(packet)
raise NonConsecutiveSeqId()
return packet
def _update_info(self, packet):
"""
Handle gap detection and first/last. Returns whether any missing
packets were detected.
"""
if packet.stamp:
if self._first is None:
self._first = packet.stamp
else:
self._first = min(packet.stamp, self._first)
if self._last is None:
self._last = packet.stamp
else:
self._last = max(packet.stamp, self._last)
apid["count"] += 1
if apid["last_seqid"] != self.SEQID_NOTSET:
if packet.seqid > apid["last_seqid"]:
num_missing = packet.seqid - apid["last_seqid"] - 1
num_missing = packet.seqid - apid["last_seqid"] + MAX_SEQ_ID
have_missing = num_missing != apid["num_missing"]
apid["num_missing"] += num_missing
apid["last_seqid"] = packet.seqid
return self._first, self._last, self._apid_info
def seek_to(self, stamp, apid=None):
Seek forward such that the next packet provided will be the first packet
having a timestamp available in the secondary header >= stamp and apid.
while not packet.cds_timecode:
# seek to first packet => `stamp`
current_tc = packet.cds_timecode
current_tc = packet.cds_timecode or current_tc
# put back so next packet is the first packet > `stamp`
self.push_back(packet)
if apid is not None:
while packet.apid != apid:
packet = self.next()
self.push_back(packet)
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def collect_groups(stream):
"""
Collects packets into lists of groupped packets. If a packet is standalone
it will be a list of just a single packet. If a packet is groupped it will
be a list of packets in the group. There is no garauntee the group will be
complete. The first packet in a group will always have a timestamp.
"""
done = object() # sentinal for stopping iter
group = deque()
while True:
# drop leading packets without timestamp
p = next(stream, done)
if p is done:
return
# just pass through any non-groupped packets
if p.is_standalone():
yield [p]
continue
# Yield and start a new group if either we get a first packet or the current
# packets APID does not match the current group.
if p.is_first() or (group and group[-1].apid != p.apid):
if group:
yield list(group)
group = deque([p])
continue
group.append(p)
def jpss_packet_stream(fobj, **kwargs):
stream = BasicStream(fobj, headers.jpss_header_lookup)
return PacketStream(stream, **kwargs)
def aqua_packet_stream(fobj, **kwargs):
stream = BasicStream(fobj, headers.aqua_header_lookup)
return PacketStream(stream, **kwargs)