Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
stream.py 6.68 KiB
import io
from collections import namedtuple, deque, defaultdict
from edos.ccsds import (
c,
PrimaryHeader,
JpssFirstSecondaryHeader,
JpssSecondaryHeader,
GROUP_FIRST,
GROUP_LAST,
GROUP_STANDALONE
)
from edosl0util.timecode import cds_stamp
MAX_SEQ_ID = 2**14
secondary_header_impls = {
GROUP_FIRST: JpssFirstSecondaryHeader,
GROUP_STANDALONE: JpssSecondaryHeader
}
class Packet(namedtuple('Packet', ('primary_header', 'secondary_header',
'apid', 'seqid', 'stamp', 'blob'))):
def __str__(self):
return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp)
__repr__ = __str__
class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))):
def __str__(self):
return '<Group apid=%s stamp=%s packets=%d>' % (
self.apid, self.packets[0].stamp if self.packets else '', len(self.packets)
)
__repr__ = __str__
class PacketStream(object):
"""
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time.
"""
def __init__(self, filepath):
self._filepath = filepath
self._input = io.open(filepath, mode='rb')
def __iter__(self):
return self
def _read(self, size):
data = self._input.read(size)
if not data:
raise StopIteration
return data
def next(self):
buf = self._read(c.sizeof(PrimaryHeader))
data = buf
h1 = PrimaryHeader.from_buffer_copy(buf)
buf = self._read(h1.data_length_minus1 + 1)
data += buf
stamp = None
h2 = None # only first/standalone have h2 impls
H2Impl = secondary_header_impls.get(h1.sequence_grouping)
if H2Impl:
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data)
# FUTURE: Use an iter with a peek function so we don't have to keep items that
# were pushed back in memory.
class _IndecisiveIter(object):
"""
Iterator which allows you to put an item back. Warning, items that are pushed
back are kept in memory.
"""
def __init__(self, iterable):
self.iterable = iterable
self._cache = deque()
def push_back(self, item):
self._cache.append(item)
def __iter__(self):
return self
def next(self):
if len(self._cache):
return self._cache.pop()
return self.iterable.next()
class PacketCollector(object):
"""
Collects packets from a PacketStream into PacketGroup tuples, if applicable.
Grouping uses PrimaryHeader.sequence_grouping flag such that any packet
between GROUP_FIRST and GROUP_LAST are considered part of the same group.
"""
def __init__(self, stream):
self._stream = stream
def __iter__(self):
return self
def next(self):
packet = self._stream.next()
grouping = packet.primary_header.sequence_grouping
if grouping == GROUP_FIRST:
blob = bytearray(packet.blob)
grppkts = deque([packet])
while grouping != GROUP_LAST:
packet = self._stream.next()
assert not packet.stamp, "Expected only first packet in group to have stamp"
blob.extend(packet.blob)
grppkts.append(packet)
grouping = packet.primary_header.sequence_grouping
return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob))
return packet
class PacketWalker(object):
"""
Provides additional features for a PacketStream, such as seeking and reading
packets until a missing sequence id is encountered.
"""
def __init__(self, stream):
self._stream = _IndecisiveIter(stream)
# seqnums for each apid for locating missing pkts
self._seqcache = {}
# stamps for each apid so we can seek
self._stampcache = defaultdict(lambda: datetime(1900, 1, 1))
self._finished = False
def _next_packet(self):
try:
packet = self._stream.next()
if packet.stamp:
self._stampcache[packet.apid] = packet.stamp
return packet
except StopIteration:
self._finished = True
raise
def end_of_stream(self):
return self._finished
def seek(self, stamp=None, apid=None):
"""
Seek forward to a position in the packet stream where stamp is >= the
secondary header time. If apid is provided additionally seek to that
apid.
"""
if self.end_of_stream():
return
try:
if stamp:
self._seek_to_stamp(stamp)
if apid is not None:
self._seek_to_apid(apid)
except StopIteration:
pass
def _seek_to_stamp(self, stamp):
while True:
packet = self._next_packet()
hdr = packet.primary_header
if packet.stamp and packet.stamp > self._stampcache[hdr.apid]:
self._stream.push_back(packet)
return
def _seek_to_apid(self, apid):
while True:
packet = self._next_packet()
hdr = packet.primary_header
if hdr.apid == apid:
self._stream.push_back(packet)
return
def _check_if_missing(self, packet):
"""
Return True if this packet does not follow in the current packet in
sequence number where sequence number is per APID.
"""
prev_seqid = self._seqcache.get(packet.apid)
seqid = packet.seqid
missing = False
if prev_seqid:
missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID)
self._seqcache[packet.apid] = seqid
return missing
def read(self):
"""
Read all remaining packets from current position.
"""
while not self._finished:
yield self._next_packet()
def read_until_missing(self):
"""
Read packets until the first missing sequence number is encountered. The
next packet read will be the first packet after the missing.
"""
while not self.end_of_stream():
packet = self._next_packet()
# checking for a seqid gap requires getting the packet after the
# gap, therefore we have to put the packet back to be retreived next
# time if a missing packet was detected
if self._check_if_missing(packet):
self._stream.push_back(packet)
break
yield packet