Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
stream.py 5.76 KiB
import logging
from collections import deque
from edos.ccsds import (
c,
PrimaryHeader,
JpssFirstSecondaryHeader,
JpssSecondaryHeader,
GROUP_FIRST,
GROUP_CONTINUING,
GROUP_LAST,
GROUP_STANDALONE
)
from edosl0util import headers
LOG = logging.getLogger(__name__)
class MissingPackets(Exception):
"""
Sequence ID for an APID are not sequential. It is expected that sequence ids
are sequential, rolling over at SequenceId.MAX.
"""
class Packet(object):
def __init__(self, apid, seqid, stamp, blob, pheader, sheader):
self.apid = apid
self.seqid = seqid
self.stamp = stamp
self.blob = blob
self._primary_header = pheader
self._secondary_header = sheader
def __str__(self):
return '<Packet apid=%d seqid=%d stamp=%s>' % \
(self.apid, self.seqid, self.stamp)
__repr__ = __str__
def is_group(self):
return False
def is_first(self):
return self._primary_header.sequence_grouping == GROUP_FIRST
def is_continuine(self):
return self._primary_header.sequence_grouping == GROUP_CONTINUING
def is_last(self):
return self._primary_header.sequence_grouping == GROUP_LAST
def is_standalone(self):
return self._primary_header.sequence_grouping == GROUP_STANDALONE
class SequenceId(object):
MAX = 2**14
def __init__(self, initial=0):
self._val = initial
def __iter__(self):
return self
@property
def val(self):
return self._val
def next(self):
self._val = (self._val + 1) % self.MAX
return self.val
def default_header_lookup(primary_header):
return (
headers.aqua_header_lookup(primary_header) or
headers.jpss_header_lookup(primary_header)
)
class PacketStream(object):
"""
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time.
"""
def __init__(self, fobj, fail_on_missing=False, header_lookup=default_header_lookup):
"""
:param fobj: File-like object to read stream from
:keyword fail_on_missing: If True raise a MissingPackets error when packets
are missing.
:keyword header_lookup: A function that takes a PrimaryHeader and returns
a secondary header implementation that will be used.
"""
self._input = fobj
self._seek_cache = deque()
self._seqids = {}
self._fail_on_missing = fail_on_missing
self._header_lookup = header_lookup
def __str__(self):
return '<PacketStream pos=%d %s>' % (self.tell(), self._input)
def __iter__(self):
return self
def push_back(self, packet):
self._seek_cache.append(packet)
def _read(self, size):
data = self._input.read(size)
if not data:
raise StopIteration
return data
def next(self):
"""
Return the next packet in the stream.
:keyword fail_on_missing: If True MissingPackets will be raised when
sequence ids are non-sequential.
:raises: StopIteration when no more data can be serialzed into Packets.
:raises: MissingPackets see `fail_on_missing`
"""
# return any items on the seek cache before reading more
if len(self._seek_cache):
return self._seek_cache.pop()
buf = self._read(c.sizeof(PrimaryHeader))
data = buf
h1 = PrimaryHeader.from_buffer_copy(buf)
buf = self._read(h1.data_length_minus1 + 1)
data += buf
stamp = None
h2 = None # only first/standalone have h2 impls
H2Impl = self._header_lookup(h1)
if H2Impl:
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
if hasattr(h2, 'timecode') and hasattr(h2.timecode, 'asdatetime'):
stamp = h2.timecode.asdatetime()
packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2)
# initialize a new seqid counter
if packet.apid not in self._seqids:
self._seqids[packet.apid] = SequenceId(packet.seqid)
else:
next_seqid = self._seqids[packet.apid].next()
if self._fail_on_missing and packet.seqid != next_seqid:
raise MissingPackets("non-sequential sequence ids",
packet.seqid, next_seqid)
return packet
def seek(self, stamp, apid=None):
"""
Seek forward such that the next packet provided will be the first packet
having a timestamp available in the secondary header >= stamp, and
optionally with apid as well.
:raises: StopIteration if the end of the stream is reached before the
criteria can be met.
"""
packet = self.next()
while not packet.stamp:
packet = self.next()
current_stamp = packet.stamp
while current_stamp < stamp:
packet = self.next()
current_stamp = packet.stamp or current_stamp
self.push_back(packet)
if apid is not None:
while packet.apid != apid:
packet = self.next()
self.push_back(packet)
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(open(filea)):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(open(fileb)):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def make_streams(filepaths):
filepaths.sort(_cmp_by_first_pkt)
return [PacketStream(io.open(f, 'rb')) for f in filepaths]