Skip to content
Snippets Groups Projects
stream.py 6.68 KiB

import io
from collections import namedtuple, deque, defaultdict

from edos.ccsds import (
    c,
    PrimaryHeader,
    JpssFirstSecondaryHeader,
    JpssSecondaryHeader,
    GROUP_FIRST,
    GROUP_LAST,
    GROUP_STANDALONE
)

from edosl0util.timecode import cds_stamp

MAX_SEQ_ID = 2**14

secondary_header_impls = {
    GROUP_FIRST: JpssFirstSecondaryHeader,
    GROUP_STANDALONE: JpssSecondaryHeader
}


class Packet(namedtuple('Packet', ('primary_header', 'secondary_header',
                                   'apid', 'seqid', 'stamp', 'blob'))):
    def __str__(self):
        return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp)
    __repr__ = __str__


class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))):
    def __str__(self):
        return '<Group apid=%s stamp=%s packets=%d>' % (
            self.apid, self.packets[0].stamp if self.packets else '', len(self.packets)
        )
    __repr__ = __str__


class PacketStream(object):
    """
    Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
    necessary to generate a single packet is read into memory at a time.
    """
    def __init__(self, filepath):
        self._filepath = filepath
        self._input = io.open(filepath, mode='rb')

    def __iter__(self):
        return self

    def _read(self, size):
        data = self._input.read(size)
        if not data:
            raise StopIteration
        return data

    def next(self):
        buf = self._read(c.sizeof(PrimaryHeader))
        data = buf

        h1 = PrimaryHeader.from_buffer_copy(buf)

        buf = self._read(h1.data_length_minus1 + 1)
        data += buf

        stamp = None
        h2 = None  # only first/standalone have h2 impls
        H2Impl = secondary_header_impls.get(h1.sequence_grouping)
        if H2Impl:
            h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
            stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)

        return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data)


# FUTURE: Use an iter with a peek function so we don't have to keep items that
#         were pushed back in memory.
class _IndecisiveIter(object):
    """
    Iterator which allows you to put an item back. Warning, items that are pushed
    back are kept in memory.
    """
    def __init__(self, iterable):
        self.iterable = iterable
        self._cache = deque()

    def push_back(self, item):
        self._cache.append(item)

    def __iter__(self):
        return self

    def next(self):
        if len(self._cache):
            return self._cache.pop()
        return self.iterable.next()


class PacketCollector(object):
    """
    Collects packets from a PacketStream into PacketGroup tuples, if applicable.
    Grouping uses PrimaryHeader.sequence_grouping flag such that any packet
    between GROUP_FIRST and GROUP_LAST are considered part of the same group.
    """
    def __init__(self, stream):
        self._stream = stream

    def __iter__(self):
        return self

    def next(self):
        packet = self._stream.next()
        grouping = packet.primary_header.sequence_grouping
        if grouping == GROUP_FIRST:
            blob = bytearray(packet.blob)
            grppkts = deque([packet])
            while grouping != GROUP_LAST:
                packet = self._stream.next()
                assert not packet.stamp, "Expected only first packet in group to have stamp"
                blob.extend(packet.blob)
                grppkts.append(packet)
                grouping = packet.primary_header.sequence_grouping
            return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob))
        return packet


class PacketWalker(object):
    """
    Provides additional features for a PacketStream, such as seeking and reading
    packets until a missing sequence id is encountered.
    """
    def __init__(self, stream):
        self._stream = _IndecisiveIter(stream)
        # seqnums for each apid for locating missing pkts
        self._seqcache = {}
        # stamps for each apid so we can seek
        self._stampcache = defaultdict(lambda: datetime(1900, 1, 1))
        self._finished = False

    def _next_packet(self):
        try:
            packet = self._stream.next()
            if packet.stamp:
                self._stampcache[packet.apid] = packet.stamp
            return packet
        except StopIteration:
            self._finished = True
            raise

    def end_of_stream(self):
        return self._finished

    def seek(self, stamp=None, apid=None):
        """
        Seek forward to a position in the packet stream where stamp is >= the
        secondary header time. If apid is provided additionally seek to that
        apid.
        """
        if self.end_of_stream():
            return
        try:
            if stamp:
                self._seek_to_stamp(stamp)
            if apid is not None:
                self._seek_to_apid(apid)
        except StopIteration:
            pass

    def _seek_to_stamp(self, stamp):
        while True:
            packet = self._next_packet()
            hdr = packet.primary_header
            if packet.stamp and packet.stamp > self._stampcache[hdr.apid]:
                self._stream.push_back(packet)
                return

    def _seek_to_apid(self, apid):
        while True:
            packet = self._next_packet()
            hdr = packet.primary_header
            if hdr.apid == apid:
                self._stream.push_back(packet)
                return

    def _check_if_missing(self, packet):
        """
        Return True if this packet does not follow in the current packet in
        sequence number where sequence number is per APID.
        """
        prev_seqid = self._seqcache.get(packet.apid)
        seqid = packet.seqid
        missing = False

        if prev_seqid:
            missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID)

        self._seqcache[packet.apid] = seqid
        return missing

    def read(self):
        """
        Read all remaining packets from current position.
        """
        while not self._finished:
            yield self._next_packet()

    def read_until_missing(self):
        """
        Read packets until the first missing sequence number is encountered. The
        next packet read will be the first packet after the missing.
        """
        while not self.end_of_stream():
            packet = self._next_packet()

            # checking for a seqid gap requires getting the packet after the
            # gap, therefore we have to put the packet back to be retreived next
            # time if a missing packet was detected
            if self._check_if_missing(packet):
                self._stream.push_back(packet)
                break

            yield packet