import os import errno import logging import ctypes as c from collections import deque, defaultdict, namedtuple from edosl0util.headers import ( PrimaryHeader, GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE ) from edosl0util import headers LOG = logging.getLogger(__name__) # Max CCSDS sequence id MAX_SEQ_ID = 2**14 - 1 class Error(Exception): """ Base stream error. """ class PacketTooShort(Error): """ The number of bytes read for a packet does not match the expected length. After this error occurs the stream is no longer usable because data offsets are no longer reliable. """ class NonConsecutiveSeqId(Error): """ Non-consecutive sequence numbers encounterd for apid. """ class BasicStream(object): """ Basic packet stream iterator that reads the primary and secondary headers and maintains offsets and read sizes. """ Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data']) def __init__(self, fobj, header_lookup=None, with_data=True): self.file = fobj self.header_lookup = header_lookup self.with_data = with_data try: self._offset = self.file.tell() except IOError as err: # handle illegal seek for pipes if err.errno != errno.ESPIPE: raise self._offset = 0 def __iter__(self): return self def __next__(self): return self.next() def _read(self, size): buf = self.file.read(size) if not buf: # EOF raise StopIteration() if len(buf) != size: raise PacketTooShort( 'expected to read {:d} bytes, got {:d}' .format(size, len(buf))) self._offset += size return buf def read_primary_header(self): buf = self._read(headers.PRIMARY_HEADER_SIZE) h1 = PrimaryHeader.from_buffer_copy(buf) return h1, headers.PRIMARY_HEADER_SIZE def read_secondary_header(self, ph): H2Impl = self.header_lookup(ph) if self.header_lookup else None if H2Impl is None: return bytes(), 0 h2size = c.sizeof(H2Impl) buf = self._read(h2size) return H2Impl.from_buffer_copy(buf), h2size def next(self): offset = self._offset h1, h1size = self.read_primary_header() h2, h2size = self.read_secondary_header(h1) # data length includes h2size total_size = h1size + h1.data_length_minus1 + 1 data_size = h1.data_length_minus1 + 1 - h2size if self.with_data: data = self._read(data_size) else: data = None self.fobj.seek(data_size, os.SEEK_CUR) return self.Tracker(h1, h2, total_size, offset, data) class Packet(object): def __init__(self, h1, h2, data, data_size=None, offset=None): self.primary_header = h1 self.secondary_header = h2 self.data = data self.size = data_size self.offset = offset self.apid = h1.apid self.seqid = self.primary_header.source_sequence_count def __str__(self): return '<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>' % \ (self.apid, self.seqid, self.stamp, self.size, self.offset) __repr__ = __str__ @property def stamp(self): return ( self.secondary_header and hasattr(self.secondary_header, 'timecode') and self.secondary_header.timecode.asdatetime() or None) @property def timestamp(self): return ( self.secondary_header and hasattr(self.secondary_header, 'timecode') and self.secondary_header.timecode.astimestamp() or None) def bytes(self): return buffer(self.primary_header) + \ buffer(self.secondary_header) + self.data def is_group(self): return self.is_first() or self.is_continuing() or self.is_last() def is_first(self): return self.primary_header.sequence_grouping == GROUP_FIRST def is_continuine(self): return self.primary_header.sequence_grouping == GROUP_CONTINUING def is_last(self): return self.primary_header.sequence_grouping == GROUP_LAST def is_standalone(self): return self.primary_header.sequence_grouping == GROUP_STANDALONE class PacketStream(object): SEQID_NOTSET = -1 def __init__(self, data_stream, fail_on_missing=False): self._stream = data_stream self._seek_cache = deque() self._first = None self._last = None self._apid_info = defaultdict( lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0}) self._fail_on_missing = fail_on_missing def __repr__(self): filepath = getattr(self.file, 'name', None) return '<{} file={}>'.format(self.__class__.__name__, filepath) def __iter__(self): return self # python3 def __next__(self): return self.next() @property def file(self): return self._stream.file def push_back(self, packet): self._seek_cache.append(packet) def next(self): # return any items on the seek cache before reading more if len(self._seek_cache): return self._seek_cache.popleft() h1, h2, data_size, offset, data = self._stream.next() packet = Packet(h1, h2, data, data_size=data_size, offset=offset) have_missing = self._update_info(packet) if have_missing and self._fail_on_missing: self.push_back(packet) raise NonConsecutiveSeqId() return packet def _update_info(self, packet): have_missing = False apid = self._apid_info[packet.apid] if packet.stamp: if self._first is None: self._first = packet.stamp else: self._first = min(packet.stamp, self._first) if self._last is None: self._last = packet.stamp else: self._last = max(packet.stamp, self._last) apid['count'] += 1 if apid['last_seqid'] != self.SEQID_NOTSET: if packet.seqid > apid['last_seqid']: num_missing = packet.seqid - apid['last_seqid'] - 1 else: num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID have_missing = num_missing != apid['num_missing'] apid['num_missing'] += num_missing apid['last_seqid'] = packet.seqid return have_missing def info(self): return self._first, self._last, self._apid_info def seek_to(self, stamp, apid=None): """ Seek forward such that the next packet provided will be the first packet having a timestamp available in the secondary header >= stamp and apid. """ # seek past partial groups packet = self.next() while not packet.stamp: packet = self.next() # seek to first packet => `stamp` current_stamp = packet.stamp while current_stamp < stamp: packet = self.next() current_stamp = packet.stamp or current_stamp # put back so next packet is the first packet > `stamp` self.push_back(packet) if apid is not None: packet = self.next() while packet.apid != apid: packet = self.next() self.push_back(packet) def jpss_packet_stream(fobj, **kwargs): stream = BasicStream(fobj, headers.jpss_header_lookup) return PacketStream(stream, **kwargs) def aqua_packet_stream(fobj, **kwargs): stream = BasicStream(fobj, headers.aqua_header_lookup) return PacketStream(stream, **kwargs)