Skip to content
Snippets Groups Projects
stream.py 5.76 KiB
import logging
from collections import deque

from edos.ccsds import (
    c,
    PrimaryHeader,
    JpssFirstSecondaryHeader,
    JpssSecondaryHeader,
    GROUP_FIRST,
    GROUP_CONTINUING,
    GROUP_LAST,
    GROUP_STANDALONE
)

from edosl0util import headers

LOG = logging.getLogger(__name__)


class MissingPackets(Exception):
    """
    Sequence ID for an APID are not sequential. It is expected that sequence ids
    are sequential, rolling over at SequenceId.MAX.
    """


class Packet(object):
    def __init__(self, apid, seqid, stamp, blob, pheader, sheader):
        self.apid = apid
        self.seqid = seqid
        self.stamp = stamp
        self.blob = blob
        self._primary_header = pheader
        self._secondary_header = sheader

    def __str__(self):
        return '<Packet apid=%d seqid=%d stamp=%s>' % \
               (self.apid, self.seqid, self.stamp)
    __repr__ = __str__

    def is_group(self):
        return False

    def is_first(self):
        return self._primary_header.sequence_grouping == GROUP_FIRST

    def is_continuine(self):
        return self._primary_header.sequence_grouping == GROUP_CONTINUING

    def is_last(self):
        return self._primary_header.sequence_grouping == GROUP_LAST

    def is_standalone(self):
        return self._primary_header.sequence_grouping == GROUP_STANDALONE


class SequenceId(object):
    MAX = 2**14

    def __init__(self, initial=0):
        self._val = initial

    def __iter__(self):
        return self

    @property
    def val(self):
        return self._val

    def next(self):
        self._val = (self._val + 1) % self.MAX
        return self.val


def default_header_lookup(primary_header):
    return (
        headers.aqua_header_lookup(primary_header) or
        headers.jpss_header_lookup(primary_header)
    )


class PacketStream(object):
    """
    Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
    necessary to generate a single packet is read into memory at a time.
    """
    def __init__(self, fobj, fail_on_missing=False, header_lookup=default_header_lookup):
        """
        :param fobj: File-like object to read stream from
        :keyword fail_on_missing: If True raise a MissingPackets error when packets
            are missing.
        :keyword header_lookup: A function that takes a PrimaryHeader and returns
            a secondary header implementation that will be used.
        """
        self._input = fobj
        self._seek_cache = deque()
        self._seqids = {}
        self._fail_on_missing = fail_on_missing
        self._header_lookup = header_lookup

    def __str__(self):
        return '<PacketStream pos=%d %s>' % (self.tell(), self._input)

    def __iter__(self):
        return self

    def push_back(self, packet):
        self._seek_cache.append(packet)

    def _read(self, size):
        data = self._input.read(size)
        if not data:
            raise StopIteration
        return data

    def next(self):
        """
        Return the next packet in the stream.

        :keyword fail_on_missing: If True MissingPackets will be raised when
            sequence ids are non-sequential.

        :raises: StopIteration when no more data can be serialzed into Packets.
        :raises: MissingPackets see `fail_on_missing`
        """
        # return any items on the seek cache before reading more
        if len(self._seek_cache):
            return self._seek_cache.pop()

        buf = self._read(c.sizeof(PrimaryHeader))
        data = buf

        h1 = PrimaryHeader.from_buffer_copy(buf)

        buf = self._read(h1.data_length_minus1 + 1)
        data += buf

        stamp = None
        h2 = None  # only first/standalone have h2 impls
        H2Impl = self._header_lookup(h1)
        if H2Impl:
            h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
            if hasattr(h2, 'timecode') and hasattr(h2.timecode, 'asdatetime'):
                stamp = h2.timecode.asdatetime()

        packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2)

        # initialize a new seqid counter
        if packet.apid not in self._seqids:
            self._seqids[packet.apid] = SequenceId(packet.seqid)
        else:
            next_seqid = self._seqids[packet.apid].next()
            if self._fail_on_missing and packet.seqid != next_seqid:
                raise MissingPackets("non-sequential sequence ids",
                                     packet.seqid, next_seqid)
        return packet

    def seek(self, stamp, apid=None):
        """
        Seek forward such that the next packet provided will be the first packet
        having a timestamp available in the secondary header >= stamp, and
        optionally with apid as well.

        :raises: StopIteration if the end of the stream is reached before the
                 criteria can be met.
        """
        packet = self.next()
        while not packet.stamp:
            packet = self.next()
        current_stamp = packet.stamp
        while current_stamp < stamp:
            packet = self.next()
            current_stamp = packet.stamp or current_stamp
        self.push_back(packet)

        if apid is not None:
            while packet.apid != apid:
                packet = self.next()
            self.push_back(packet)


def _cmp_by_first_pkt(filea, fileb):
    """
    Compare 2 L0 files by their first available timestamp. Used to sort L0 files
    by time.
    """
    stampa = stampb = None
    for pkt in PacketStream(open(filea)):
        if pkt.stamp:
            stampa = pkt.stamp
            break
    for pkt in PacketStream(open(fileb)):
        if pkt.stamp:
            stampb = pkt.stamp
            break
    return cmp(stampa, stampb)


def make_streams(filepaths):
    filepaths.sort(_cmp_by_first_pkt)
    return [PacketStream(io.open(f, 'rb')) for f in filepaths]