Skip to content
Snippets Groups Projects
stream.py 10.4 KiB
Newer Older
import ctypes as c
Bruce Flynn's avatar
Bruce Flynn committed
import errno
import logging
import os
from collections import defaultdict, deque, namedtuple
Bruce Flynn's avatar
Bruce Flynn committed

from edosl0util import headers
from edosl0util.headers import (GROUP_CONTINUING, GROUP_FIRST, GROUP_LAST,
                                GROUP_STANDALONE, PrimaryHeader)
Bruce Flynn's avatar
Bruce Flynn committed
from edosl0util.timecode import dt_to_cds
LOG = logging.getLogger(__name__)
Bruce Flynn's avatar
Bruce Flynn committed
# Max CCSDS sequence id
Bruce Flynn's avatar
Bruce Flynn committed
MAX_SEQ_ID = 2 ** 14 - 1
class Error(Exception):
    """
    Base stream error.
    """


class PacketTooShort(Error):
    """
    The number of bytes read for a packet does not match the expected length.
    After this error occurs the stream is no longer usable because data offsets
    are no longer reliable.
    """


class NonConsecutiveSeqId(Error):
    """
    Non-consecutive sequence numbers encounterd for apid, i.e., packet gap.
class BasicStream(object):
Bruce Flynn's avatar
Bruce Flynn committed
    """
Bruce Flynn's avatar
Bruce Flynn committed
    Basic packet stream iterator that reads the primary and secondary headers
    and maintains offsets and read sizes.
Bruce Flynn's avatar
Bruce Flynn committed
    """
Bruce Flynn's avatar
Bruce Flynn committed

    Tracker = namedtuple("Tracker", ["h1", "h2", "size", "offset", "data"])

    def __init__(self, fobj, header_lookup=None, with_data=True):
Bruce Flynn's avatar
Bruce Flynn committed
        self.file = fobj
        self.header_lookup = header_lookup
        self.with_data = with_data
Bruce Flynn's avatar
Bruce Flynn committed
        try:
            self._offset = self.file.tell()
        except IOError as err:  # handle illegal seek for pipes
            if err.errno != errno.ESPIPE:
                raise
            self._offset = 0

    def __iter__(self):
        return self

    def __next__(self):
        return self.next()

    def _read(self, size):
Bruce Flynn's avatar
Bruce Flynn committed
        buf = self.file.read(size)
            raise StopIteration()
        if len(buf) != size:
            raise PacketTooShort(
Bruce Flynn's avatar
Bruce Flynn committed
                "expected to read {:d} bytes, got {:d} at offset {:d}".format(
                    size, len(buf), self._offset
                ),
                self.file,
            )
Bruce Flynn's avatar
Bruce Flynn committed
        self._offset += size
        return buf

    def read_primary_header(self):
        buf = self._read(headers.PRIMARY_HEADER_SIZE)
        h1 = PrimaryHeader.from_buffer_copy(buf)
        return h1, headers.PRIMARY_HEADER_SIZE
    def read_secondary_header(self, ph):
        H2Impl = self.header_lookup(ph) if self.header_lookup else None
        if H2Impl is None:
            return bytes(), 0
        h2size = c.sizeof(H2Impl)
        buf = self._read(h2size)
        return H2Impl.from_buffer_copy(buf), h2size
Bruce Flynn's avatar
Bruce Flynn committed
        offset = self._offset
        h1, h1size = self.read_primary_header()
        h2, h2size = self.read_secondary_header(h1)
        # data length includes h2size
        total_size = h1size + h1.data_length_minus1 + 1
        data_size = h1.data_length_minus1 + 1 - h2size
        data = None
        if self.with_data and data_size:
            data = self._read(data_size)
        else:
Bruce Flynn's avatar
Bruce Flynn committed
            self.file.seek(data_size, os.SEEK_CUR)
            self._offset += data_size
        return self.Tracker(h1, h2, total_size, offset, data)
Bruce Flynn's avatar
.  
Bruce Flynn committed


class Packet(object):
    def __init__(self, h1, h2, data, data_size=None, offset=None):
        self.primary_header = h1
        self.secondary_header = h2
        self.data = data
        self.size = data_size
        self.offset = offset
        self.apid = h1.apid
        self.seqid = self.primary_header.source_sequence_count
Bruce Flynn's avatar
.  
Bruce Flynn committed
    def __str__(self):
Bruce Flynn's avatar
Bruce Flynn committed
        return "<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>" % (
            self.apid,
            self.seqid,
            self.stamp,
            self.size,
            self.offset,
        )

Bruce Flynn's avatar
.  
Bruce Flynn committed
    __repr__ = __str__

    @property
Bruce Flynn's avatar
Bruce Flynn committed
            self.secondary_header
            and hasattr(self.secondary_header, "timecode")
            and self.secondary_header.timecode.day_segmented_timecode()
            or None
        )
Bruce Flynn's avatar
Bruce Flynn committed
            self.secondary_header
            and hasattr(self.secondary_header, "timecode")
            and self.secondary_header.timecode.asdatetime()
            or None
        )

    def bytes(self):
Bruce Flynn's avatar
Bruce Flynn committed
        return (
            bytearray(self.primary_header)
            + bytearray(self.secondary_header)
            + self.data
        )
    def is_group(self):
        return self.is_first() or self.is_continuing() or self.is_last()

    def is_first(self):
        return self.primary_header.sequence_grouping == GROUP_FIRST
Bruce Flynn's avatar
Bruce Flynn committed
    def is_continuing(self):
        return self.primary_header.sequence_grouping == GROUP_CONTINUING
Bruce Flynn's avatar
Bruce Flynn committed
    # compatibility
    is_continuine = is_continuing

    def is_last(self):
        return self.primary_header.sequence_grouping == GROUP_LAST

    def is_standalone(self):
        return self.primary_header.sequence_grouping == GROUP_STANDALONE
Bruce Flynn's avatar
.  
Bruce Flynn committed
class PacketStream(object):
Bruce Flynn's avatar
Bruce Flynn committed
    SEQID_NOTSET = -1

Bruce Flynn's avatar
Bruce Flynn committed
    def __init__(self, data_stream, fail_on_missing=False, fail_on_tooshort=False):
        """
        :param data_stream: An interable of ``Tracker`` objects
        :keyword fail_on_missing:
            Raise a ``NonConsecutiveSeqId`` error when sequence id gaps are
            found. If this is False sequence id gaps are ignored.
        :keyword fail_on_tooshort:
            Raise a ``PacketTooShort`` error when a packet is encountered for
            which less bytes can be read than are expected according to the
            packet headers. Most of the this would be due to a truncated file
            with an incomplete packet at the end. If this is False a
            ``StopIteration`` will be raised which will effectively truncate
            the file.
        """
        self._stream = data_stream
        self._seek_cache = deque()
        self._first = None
        self._last = None
Bruce Flynn's avatar
Bruce Flynn committed
        self._apid_info = defaultdict(
Bruce Flynn's avatar
Bruce Flynn committed
            lambda: {"count": 0, "last_seqid": self.SEQID_NOTSET, "num_missing": 0}
        )
        self._fail_on_missing = fail_on_missing
        self._fail_on_tooshort = fail_on_tooshort
Bruce Flynn's avatar
Bruce Flynn committed
    def __repr__(self):
Bruce Flynn's avatar
Bruce Flynn committed
        filepath = getattr(self.file, "name", None)
        return "<{} file={}>".format(self.__class__.__name__, filepath)
Bruce Flynn's avatar
.  
Bruce Flynn committed
    def __iter__(self):
        return self
Bruce Flynn's avatar
Bruce Flynn committed
    # python3
    def __next__(self):
        return self.next()

Bruce Flynn's avatar
Bruce Flynn committed
    @property
    def file(self):
        return self._stream.file

    def push_back(self, packet):
        """
        Put a packet back such that it is the next file provided when
        ``next`` is called.
        """
        self._seek_cache.append(packet)

        # return any items on the seek cache before reading more
        if len(self._seek_cache):
Bruce Flynn's avatar
Bruce Flynn committed
            return self._seek_cache.popleft()
            h1, h2, data_size, offset, data = next(self._stream)
        except PacketTooShort as err:
            if self._fail_on_tooshort:
                raise
Bruce Flynn's avatar
Bruce Flynn committed
            LOG.error("Packet too short, aborting stream: %s", err)
            # The result of this is essentially a file truncation
            raise StopIteration()
        packet = Packet(h1, h2, data, data_size=data_size, offset=offset)
        have_missing = self._update_info(packet)
        if have_missing and self._fail_on_missing:
            self.push_back(packet)
            raise NonConsecutiveSeqId()
Bruce Flynn's avatar
Bruce Flynn committed
        return packet

    def _update_info(self, packet):
        """
        Handle gap detection and first/last. Returns whether any missing
        packets were detected.
        """
Bruce Flynn's avatar
Bruce Flynn committed
        have_missing = False
Bruce Flynn's avatar
Bruce Flynn committed
        apid = self._apid_info[packet.apid]
        if packet.stamp:
            if self._first is None:
                self._first = packet.stamp
            else:
                self._first = min(packet.stamp, self._first)
            if self._last is None:
                self._last = packet.stamp
            else:
                self._last = max(packet.stamp, self._last)
Bruce Flynn's avatar
Bruce Flynn committed
        apid["count"] += 1
        if apid["last_seqid"] != self.SEQID_NOTSET:
            if packet.seqid > apid["last_seqid"]:
                num_missing = packet.seqid - apid["last_seqid"] - 1
Bruce Flynn's avatar
Bruce Flynn committed
            else:
Bruce Flynn's avatar
Bruce Flynn committed
                num_missing = packet.seqid - apid["last_seqid"] + MAX_SEQ_ID
Bruce Flynn's avatar
Bruce Flynn committed
            # FIXME: shouldn't this just be num_missing != 0?
Bruce Flynn's avatar
Bruce Flynn committed
            have_missing = num_missing != apid["num_missing"]
            apid["num_missing"] += num_missing
        apid["last_seqid"] = packet.seqid
Bruce Flynn's avatar
Bruce Flynn committed
        return have_missing
Bruce Flynn's avatar
Bruce Flynn committed

    def info(self):
        return self._first, self._last, self._apid_info
Bruce Flynn's avatar
.  
Bruce Flynn committed

    def seek_to(self, stamp, apid=None):
Bruce Flynn's avatar
.  
Bruce Flynn committed
        """
        Seek forward such that the next packet provided will be the first packet
        having a timestamp available in the secondary header >= stamp and apid.
Bruce Flynn's avatar
.  
Bruce Flynn committed
        """
Bruce Flynn's avatar
Bruce Flynn committed
        target_tc = dt_to_cds(stamp)
Bruce Flynn's avatar
Bruce Flynn committed
        # seek past partial groups
        packet = self.next()
        while not packet.cds_timecode:
            packet = self.next()
Bruce Flynn's avatar
Bruce Flynn committed

        # seek to first packet => `stamp`
        current_tc = packet.cds_timecode
Bruce Flynn's avatar
Bruce Flynn committed
        while current_tc < target_tc:
            packet = self.next()
            current_tc = packet.cds_timecode or current_tc
Bruce Flynn's avatar
Bruce Flynn committed

        # put back so next packet is the first packet > `stamp`
        self.push_back(packet)

        if apid is not None:
Bruce Flynn's avatar
Bruce Flynn committed
            packet = self.next()
            while packet.apid != apid:
                packet = self.next()
            self.push_back(packet)
def collect_groups(stream):
    """
    Collects packets into lists of groupped packets. If a packet is standalone
    it will be a list of just a single packet. If a packet is groupped it will
    be a list of packets in the group. There is no garauntee the group will be
    complete. The first packet in a group will always have a timestamp.
    """
    done = object()  # sentinal for stopping iter
    group = deque()
    while True:
        # drop leading packets without timestamp
        p = next(stream, done)
        if p is done:
            return
        # just pass through any non-groupped packets
        if p.is_standalone():
            yield [p]
            continue

        # Yield and start a new group if either we get a first packet or the current
        # packets APID does not match the current group.
        if p.is_first() or (group and group[-1].apid != p.apid):
            if group:
                yield list(group)
            group = deque([p])
            continue

        group.append(p)


def jpss_packet_stream(fobj, **kwargs):
    stream = BasicStream(fobj, headers.jpss_header_lookup)
    return PacketStream(stream, **kwargs)

def aqua_packet_stream(fobj, **kwargs):
    stream = BasicStream(fobj, headers.aqua_header_lookup)
    return PacketStream(stream, **kwargs)