Skip to content
Snippets Groups Projects
Commit 37f84fe2 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Add errors PacketTooShort, NonsequentialSeqId.

Reimpl stream functions as classes.
parent 6eb4f69d
No related branches found
No related tags found
No related merge requests found
...@@ -19,57 +19,98 @@ LOG = logging.getLogger(__name__) ...@@ -19,57 +19,98 @@ LOG = logging.getLogger(__name__)
MAX_SEQ_ID = 2**14 - 1 MAX_SEQ_ID = 2**14 - 1
def simple_stream(fobj): class Error(Exception):
"""
Base stream error.
"""
class PacketTooShort(Error):
"""
The number of bytes read for a packet does not match the expected length.
After this error occurs the stream is no longer usable because data offsets
are no longer reliable.
"""
def __init__(self, header=None, offset=None):
self.offset = offset
self.primary_header = header
self.args = (self.offset, self.primary_header)
class NonConsecutiveSeqId(Error):
"""
Non-consecutive sequence numbers encounterd for apid.
"""
class SimpleStream(object):
""" """
Generator that yields PrimaryHeaders and data. Files are read using mmap. Generator that yields PrimaryHeaders and data. Files are read using mmap.
""" """
psize = c.sizeof(PrimaryHeader) def __init__(self, fobj):
while True: self.fobj = fobj
buf = fobj.read(psize) self.offset = fobj.tell()
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
psize = c.sizeof(PrimaryHeader)
buf = self.fobj.read(psize)
if not buf:
raise StopIteration()
if len(buf) < psize: if len(buf) < psize:
return raise PacketTooShort(header=None, offset=self.offset)
h1 = PrimaryHeader.from_buffer_copy(buf) h1 = PrimaryHeader.from_buffer_copy(buf)
# read user data # read user data
size = h1.data_length_minus1 + 1 size = h1.data_length_minus1 + 1
buf = fobj.read(size) buf = self.fobj.read(size)
if len(buf) < size: if len(buf) < size:
return raise PacketTooShort(header=h1, offset=self.offset)
yield h1, buf self.offset += (psize + size)
return h1, buf
def _full_stream(header_lookup, fobj): class FullStream(SimpleStream):
""" """
Generator that yields primary and secondary header and any left over bytes Iterable returning primary and secondary header and any left over bytes
of "user" data. of "user" data.
:param header_lookup: function that takes a primary header and returns
a secondary header struct to use, or None.
""" """
stream = simple_stream(fobj) def __init__(self, header_lookup, fobj):
for h1, buf in stream: """
:param header_lookup: function that takes a primary header and returns
a secondary header struct to use, or None.
"""
self.header_lookup = header_lookup
super(FullStream, self).__init__(fobj)
def next(self):
h1, buf = super(FullStream, self).next()
h2 = '' h2 = ''
if h1.secondary_header_flag: if h1.secondary_header_flag:
H2Impl = header_lookup(h1) H2Impl = self.header_lookup(h1)
if H2Impl: if H2Impl:
hsize = c.sizeof(H2Impl) hsize = c.sizeof(H2Impl)
h2 = H2Impl.from_buffer_copy(buf) h2 = H2Impl.from_buffer_copy(buf)
yield h1, h2, buf[hsize:] return h1, h2, buf[hsize:]
continue return h1, h2, buf
yield h1, h2, buf
def jpss_full_stream(fobj): def jpss_full_stream(fobj):
""" """
`_full_stream` impl for JPSS secondary headers. `FullStream` impl for JPSS secondary headers.
""" """
return _full_stream(headers.jpss_header_lookup, fobj) return FullStream(headers.jpss_header_lookup, fobj)
def aqua_full_stream(fobj): def aqua_full_stream(fobj):
""" """
`_full_stream` impl for Aqua secondary headers. `FullStream` impl for Aqua secondary headers.
""" """
return _full_stream(headers.aqua_header_lookup, fobj) return FullStream(headers.aqua_header_lookup, fobj)
class Packet(object): class Packet(object):
...@@ -123,7 +164,7 @@ class PacketStream(object): ...@@ -123,7 +164,7 @@ class PacketStream(object):
""" """
SEQID_NOTSET = -1 SEQID_NOTSET = -1
def __init__(self, fobj): def __init__(self, fobj, fail_on_missing=False):
""" """
:param fobj: File-like object to read stream from :param fobj: File-like object to read stream from
""" """
...@@ -131,6 +172,7 @@ class PacketStream(object): ...@@ -131,6 +172,7 @@ class PacketStream(object):
self._seek_cache = deque() self._seek_cache = deque()
self._apid_info = defaultdict( self._apid_info = defaultdict(
lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0}) lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0})
self._fail_on_missing = fail_on_missing
def __iter__(self): def __iter__(self):
return self return self
...@@ -143,21 +185,15 @@ class PacketStream(object): ...@@ -143,21 +185,15 @@ class PacketStream(object):
self._seek_cache.append(packet) self._seek_cache.append(packet)
def next(self): def next(self):
"""
Return the next packet in the stream.
:keyword fail_on_missing: If True MissingPackets will be raised when
sequence ids are non-sequential.
:raises: StopIteration when no more data can be serialzed into Packets.
:raises: MissingPackets see `fail_on_missing`
"""
# return any items on the seek cache before reading more # return any items on the seek cache before reading more
if len(self._seek_cache): if len(self._seek_cache):
return self._seek_cache.popleft() return self._seek_cache.popleft()
packet = Packet(*self._stream.next()) packet = Packet(*self._stream.next())
self._update_info(packet) have_missing = self._update_info(packet)
if have_missing and self._fail_on_missing:
self.push_back(packet)
raise NonConsecutiveSeqId()
return packet return packet
def _update_info(self, packet): def _update_info(self, packet):
...@@ -177,11 +213,10 @@ class PacketStream(object): ...@@ -177,11 +213,10 @@ class PacketStream(object):
def info(self): def info(self):
return self._apid_info return self._apid_info
def seek(self, stamp, apid=None): def seek_to(self, stamp, apid=None):
""" """
Seek forward such that the next packet provided will be the first packet Seek forward such that the next packet provided will be the first packet
having a timestamp available in the secondary header >= stamp, and having a timestamp available in the secondary header >= stamp and apid.
optionally with apid as well.
:raises: StopIteration if the end of the stream is reached before the :raises: StopIteration if the end of the stream is reached before the
criteria can be met. criteria can be met.
...@@ -191,9 +226,9 @@ class PacketStream(object): ...@@ -191,9 +226,9 @@ class PacketStream(object):
while not packet.stamp: while not packet.stamp:
packet = self.next() packet = self.next()
# seek to first packet > `stamp` # seek to first packet => `stamp`
current_stamp = packet.stamp current_stamp = packet.stamp
while current_stamp <= stamp: while current_stamp < stamp:
packet = self.next() packet = self.next()
current_stamp = packet.stamp or current_stamp current_stamp = packet.stamp or current_stamp
...@@ -206,6 +241,18 @@ class PacketStream(object): ...@@ -206,6 +241,18 @@ class PacketStream(object):
packet = self.next() packet = self.next()
self.push_back(packet) self.push_back(packet)
def seek_to_next_stamp(self):
"""
Seek to the next packet with a timestamp available in the secondary
header. For standalone packets this is essentially the same as `next`.
For packet groups it is essentially a "next group" function.
"""
packet = self.next() # get past current packet
packet = self.next()
while not packet.stamp:
packet = self.next()
self.push_back(packet)
def _cmp_by_first_pkt(filea, fileb): def _cmp_by_first_pkt(filea, fileb):
""" """
...@@ -224,6 +271,6 @@ def _cmp_by_first_pkt(filea, fileb): ...@@ -224,6 +271,6 @@ def _cmp_by_first_pkt(filea, fileb):
return cmp(stampa, stampb) return cmp(stampa, stampb)
def make_streams(filepaths): def make_streams(filepaths, **kwargs):
filepaths.sort(_cmp_by_first_pkt) filepaths.sort(_cmp_by_first_pkt)
return [PacketStream(io.open(f, 'rb')) for f in filepaths] return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment