Skip to content
Snippets Groups Projects
stream.py 7.73 KiB
import io
import logging
import ctypes as c
from collections import deque, defaultdict

from edosl0util.headers import (
    PrimaryHeader,
    GROUP_FIRST,
    GROUP_CONTINUING,
    GROUP_LAST,
    GROUP_STANDALONE
)

from edosl0util import headers

LOG = logging.getLogger(__name__)

# Max CCSDS sequence id
MAX_SEQ_ID = 2**14 - 1


class Error(Exception):
    """
    Base stream error.
    """


class PacketTooShort(Error):
    """
    The number of bytes read for a packet does not match the expected length.
    After this error occurs the stream is no longer usable because data offsets
    are no longer reliable.
    """
    def __init__(self, header=None, offset=None):
        self.offset = offset
        self.primary_header = header
        self.args = (self.offset, self.primary_header)


class NonConsecutiveSeqId(Error):
    """
    Non-consecutive sequence numbers encounterd for apid.
    """


class SimpleStream(object):
    """
    Generator that yields PrimaryHeaders and data. Files are read using mmap.
    """
    def __init__(self, fobj):
        self.fobj = fobj
        self.offset = fobj.tell()

    def __iter__(self):
        return self

    def __next__(self):
        return self.next()

    def next(self):
        psize = c.sizeof(PrimaryHeader)
        buf = self.fobj.read(psize)
        if not buf:
            raise StopIteration()
        if len(buf) < psize:
            raise PacketTooShort(header=None, offset=self.offset)
        h1 = PrimaryHeader.from_buffer_copy(buf)
        # read user data
        size = h1.data_length_minus1 + 1
        buf = self.fobj.read(size)
        if len(buf) < size:
            raise PacketTooShort(header=h1, offset=self.offset)
        self.offset += (psize + size)
        return h1, buf


class FullStream(SimpleStream):
    """
    Iterable returning primary and secondary header and any left over bytes
    of "user" data.
    """
    def __init__(self, header_lookup, fobj):
        """
        :param header_lookup: function that takes a primary header and returns
            a secondary header struct to use, or None.
        """
        self.header_lookup = header_lookup
        super(FullStream, self).__init__(fobj)

    def next(self):
        h1, buf = super(FullStream, self).next()
        h2 = ''
        if h1.secondary_header_flag:
            H2Impl = self.header_lookup(h1)
            if H2Impl:
                hsize = c.sizeof(H2Impl)
                h2 = H2Impl.from_buffer_copy(buf)
                return h1, h2, buf[hsize:]
        return h1, h2, buf


def jpss_full_stream(fobj):
    """
    `FullStream` impl for JPSS secondary headers.
    """
    return FullStream(headers.jpss_header_lookup, fobj)


def aqua_full_stream(fobj):
    """
    `FullStream` impl for Aqua secondary headers.
    """
    return FullStream(headers.aqua_header_lookup, fobj)


class Packet(object):
    def __init__(self, h1, h2, data):
        self.data = data
        self.primary_header = h1
        self.secondary_header = h2

    def __str__(self):
        return '<Packet apid=%d seqid=%d stamp=%s>' % \
               (self.apid, self.seqid, self.stamp)
    __repr__ = __str__

    @property
    def apid(self):
        return self.primary_header.apid

    @property
    def seqid(self):
        return self.primary_header.source_sequence_count

    @property
    def stamp(self):
        h2 = self.secondary_header
        return h2.timecode.asdatetime() if hasattr(h2, 'timecode') else None

    def bytes(self):
        return buffer(self.primary_header) + \
            buffer(self.secondary_header) + self.data

    def is_group(self):
        return False

    def is_first(self):
        return self.primary_header.sequence_grouping == GROUP_FIRST

    def is_continuine(self):
        return self.primary_header.sequence_grouping == GROUP_CONTINUING

    def is_last(self):
        return self.primary_header.sequence_grouping == GROUP_LAST

    def is_standalone(self):
        return self.primary_header.sequence_grouping == GROUP_STANDALONE


class PacketStream(object):
    """
    Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
    necessary to generate a single packet is read into memory at a time.
    """
    SEQID_NOTSET = -1

    def __init__(self, fobj, fail_on_missing=False):
        """
        :param fobj: File-like object to read stream from
        """
        self._stream = jpss_full_stream(fobj)
        self._seek_cache = deque()
        self._apid_info = defaultdict(
                lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0})
        self._fail_on_missing = fail_on_missing

    def __iter__(self):
        return self

    # python3
    def __next__(self):
        return self.next()

    def push_back(self, packet):
        self._seek_cache.append(packet)

    def next(self):
        # return any items on the seek cache before reading more
        if len(self._seek_cache):
            return self._seek_cache.popleft()

        packet = Packet(*self._stream.next())
        have_missing = self._update_info(packet)
        if have_missing and self._fail_on_missing:
            self.push_back(packet)
            raise NonConsecutiveSeqId()
        return packet

    def _update_info(self, packet):
        have_missing = False
        apid = self._apid_info[packet.apid]
        apid['count'] += 1
        if apid['last_seqid'] != self.SEQID_NOTSET:
            if packet.seqid > apid['last_seqid']:
                num_missing = packet.seqid - apid['last_seqid'] - 1
            else:
                num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID
            have_missing = num_missing != apid['num_missing']
            apid['num_missing'] += num_missing
        apid['last_seqid'] = packet.seqid
        return have_missing

    def info(self):
        return self._apid_info

    def seek_to(self, stamp, apid=None):
        """
        Seek forward such that the next packet provided will be the first packet
        having a timestamp available in the secondary header >= stamp and apid.

        :raises: StopIteration if the end of the stream is reached before the
                 criteria can be met.
        """
        # seek past partial groups
        packet = self.next()
        while not packet.stamp:
            packet = self.next()

        # seek to first packet => `stamp`
        current_stamp = packet.stamp
        while current_stamp < stamp:
            packet = self.next()
            current_stamp = packet.stamp or current_stamp

        # put back so next packet is the first packet > `stamp`
        self.push_back(packet)

        if apid is not None:
            packet = self.next()
            while packet.apid != apid:
                packet = self.next()
            self.push_back(packet)

    def seek_to_next_stamp(self):
        """
        Seek to the next packet with a timestamp available in the secondary
        header. For standalone packets this is essentially the same as `next`.
        For packet groups it is essentially a "next group" function.
        """
        packet = self.next() # get past current packet
        packet = self.next()
        while not packet.stamp:
            packet = self.next()
        self.push_back(packet)


def _cmp_by_first_pkt(filea, fileb):
    """
    Compare 2 L0 files by their first available timestamp. Used to sort L0 files
    by time.
    """
    stampa = stampb = None
    for pkt in PacketStream(io.open(filea, 'rb')):
        if pkt.stamp:
            stampa = pkt.stamp
            break
    for pkt in PacketStream(io.open(fileb, 'rb')):
        if pkt.stamp:
            stampb = pkt.stamp
            break
    return cmp(stampa, stampb)


def make_streams(filepaths, **kwargs):
    filepaths.sort(_cmp_by_first_pkt)
    return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths]