Something went wrong on our end
-
Bruce Flynn authored
Reimpl stream functions as classes.
Bruce Flynn authoredReimpl stream functions as classes.
stream.py 7.73 KiB
import io
import logging
import ctypes as c
from collections import deque, defaultdict
from edosl0util.headers import (
PrimaryHeader,
GROUP_FIRST,
GROUP_CONTINUING,
GROUP_LAST,
GROUP_STANDALONE
)
from edosl0util import headers
LOG = logging.getLogger(__name__)
# Max CCSDS sequence id
MAX_SEQ_ID = 2**14 - 1
class Error(Exception):
"""
Base stream error.
"""
class PacketTooShort(Error):
"""
The number of bytes read for a packet does not match the expected length.
After this error occurs the stream is no longer usable because data offsets
are no longer reliable.
"""
def __init__(self, header=None, offset=None):
self.offset = offset
self.primary_header = header
self.args = (self.offset, self.primary_header)
class NonConsecutiveSeqId(Error):
"""
Non-consecutive sequence numbers encounterd for apid.
"""
class SimpleStream(object):
"""
Generator that yields PrimaryHeaders and data. Files are read using mmap.
"""
def __init__(self, fobj):
self.fobj = fobj
self.offset = fobj.tell()
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
psize = c.sizeof(PrimaryHeader)
buf = self.fobj.read(psize)
if not buf:
raise StopIteration()
if len(buf) < psize:
raise PacketTooShort(header=None, offset=self.offset)
h1 = PrimaryHeader.from_buffer_copy(buf)
# read user data
size = h1.data_length_minus1 + 1
buf = self.fobj.read(size)
if len(buf) < size:
raise PacketTooShort(header=h1, offset=self.offset)
self.offset += (psize + size)
return h1, buf
class FullStream(SimpleStream):
"""
Iterable returning primary and secondary header and any left over bytes
of "user" data.
"""
def __init__(self, header_lookup, fobj):
"""
:param header_lookup: function that takes a primary header and returns
a secondary header struct to use, or None.
"""
self.header_lookup = header_lookup
super(FullStream, self).__init__(fobj)
def next(self):
h1, buf = super(FullStream, self).next()
h2 = ''
if h1.secondary_header_flag:
H2Impl = self.header_lookup(h1)
if H2Impl:
hsize = c.sizeof(H2Impl)
h2 = H2Impl.from_buffer_copy(buf)
return h1, h2, buf[hsize:]
return h1, h2, buf
def jpss_full_stream(fobj):
"""
`FullStream` impl for JPSS secondary headers.
"""
return FullStream(headers.jpss_header_lookup, fobj)
def aqua_full_stream(fobj):
"""
`FullStream` impl for Aqua secondary headers.
"""
return FullStream(headers.aqua_header_lookup, fobj)
class Packet(object):
def __init__(self, h1, h2, data):
self.data = data
self.primary_header = h1
self.secondary_header = h2
def __str__(self):
return '<Packet apid=%d seqid=%d stamp=%s>' % \
(self.apid, self.seqid, self.stamp)
__repr__ = __str__
@property
def apid(self):
return self.primary_header.apid
@property
def seqid(self):
return self.primary_header.source_sequence_count
@property
def stamp(self):
h2 = self.secondary_header
return h2.timecode.asdatetime() if hasattr(h2, 'timecode') else None
def bytes(self):
return buffer(self.primary_header) + \
buffer(self.secondary_header) + self.data
def is_group(self):
return False
def is_first(self):
return self.primary_header.sequence_grouping == GROUP_FIRST
def is_continuine(self):
return self.primary_header.sequence_grouping == GROUP_CONTINUING
def is_last(self):
return self.primary_header.sequence_grouping == GROUP_LAST
def is_standalone(self):
return self.primary_header.sequence_grouping == GROUP_STANDALONE
class PacketStream(object):
"""
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time.
"""
SEQID_NOTSET = -1
def __init__(self, fobj, fail_on_missing=False):
"""
:param fobj: File-like object to read stream from
"""
self._stream = jpss_full_stream(fobj)
self._seek_cache = deque()
self._apid_info = defaultdict(
lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0})
self._fail_on_missing = fail_on_missing
def __iter__(self):
return self
# python3
def __next__(self):
return self.next()
def push_back(self, packet):
self._seek_cache.append(packet)
def next(self):
# return any items on the seek cache before reading more
if len(self._seek_cache):
return self._seek_cache.popleft()
packet = Packet(*self._stream.next())
have_missing = self._update_info(packet)
if have_missing and self._fail_on_missing:
self.push_back(packet)
raise NonConsecutiveSeqId()
return packet
def _update_info(self, packet):
have_missing = False
apid = self._apid_info[packet.apid]
apid['count'] += 1
if apid['last_seqid'] != self.SEQID_NOTSET:
if packet.seqid > apid['last_seqid']:
num_missing = packet.seqid - apid['last_seqid'] - 1
else:
num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID
have_missing = num_missing != apid['num_missing']
apid['num_missing'] += num_missing
apid['last_seqid'] = packet.seqid
return have_missing
def info(self):
return self._apid_info
def seek_to(self, stamp, apid=None):
"""
Seek forward such that the next packet provided will be the first packet
having a timestamp available in the secondary header >= stamp and apid.
:raises: StopIteration if the end of the stream is reached before the
criteria can be met.
"""
# seek past partial groups
packet = self.next()
while not packet.stamp:
packet = self.next()
# seek to first packet => `stamp`
current_stamp = packet.stamp
while current_stamp < stamp:
packet = self.next()
current_stamp = packet.stamp or current_stamp
# put back so next packet is the first packet > `stamp`
self.push_back(packet)
if apid is not None:
packet = self.next()
while packet.apid != apid:
packet = self.next()
self.push_back(packet)
def seek_to_next_stamp(self):
"""
Seek to the next packet with a timestamp available in the secondary
header. For standalone packets this is essentially the same as `next`.
For packet groups it is essentially a "next group" function.
"""
packet = self.next() # get past current packet
packet = self.next()
while not packet.stamp:
packet = self.next()
self.push_back(packet)
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(io.open(filea, 'rb')):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(io.open(fileb, 'rb')):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def make_streams(filepaths, **kwargs):
filepaths.sort(_cmp_by_first_pkt)
return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths]