From 8323413e8a78643547adaba1e7f4b907d1382e7f Mon Sep 17 00:00:00 2001 From: Bruce Flynn <brucef@ssec.wisc.edu> Date: Tue, 29 Sep 2015 16:34:12 +0000 Subject: [PATCH] Trac info in PackeStream --- edosl0util/merge.py | 1 + edosl0util/stream.py | 72 ++++++++++++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 26 deletions(-) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 17b2a57..90746b9 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -33,6 +33,7 @@ def _is_valid_group(packets): def merge(streams, output=sys.stdout): last_apid = None last_stamp = datetime(1900, 1, 1) + apid_cache = {} # streams are removed as they are exhausted while streams: diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 4e148ca..a3eab8e 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,21 +1,21 @@ +import io import logging -from collections import deque +from collections import deque, defaultdict from edos.ccsds import ( c, PrimaryHeader, - GROUP_FIRST, - GROUP_CONTINUING, - GROUP_LAST, - GROUP_STANDALONE ) from edosl0util import headers LOG = logging.getLogger(__name__) +# Max CCSDS sequence id +MAX_SEQ_ID = 2**14 - 1 -def simple_stream(fobj=None, data=None): + +def simple_stream(fobj): """ Generator that yields PrimaryHeaders and data. Files are read using mmap. """ @@ -35,7 +35,7 @@ def simple_stream(fobj=None, data=None): yield h1, buf -def _secondary_stream(header_lookup, fobj=None, data=None): +def _full_stream(header_lookup, fobj): """ Generator that yields primary and secondary header and any left over bytes of "user" data. @@ -43,7 +43,7 @@ def _secondary_stream(header_lookup, fobj=None, data=None): :param header_lookup: function that takes a primary header and returns a secondary header struct to use, or None. """ - stream = simple_stream(fobj, data) + stream = simple_stream(fobj) for h1, buf in stream: h2 = '' if h1.secondary_header_flag: @@ -56,18 +56,18 @@ def _secondary_stream(header_lookup, fobj=None, data=None): yield h1, h2, buf -def jpss_secondary_stream(fobj=None, data=None): +def jpss_full_stream(fobj): """ - `_secondary_stream` impl for JPSS secondary headers. + `_full_stream` impl for JPSS secondary headers. """ - return _secondary_stream(headers.jpss_header_lookup, fobj, data) + return _full_stream(headers.jpss_header_lookup, fobj) -def aqua_secondary_stream(fobj=None, data=None): +def aqua_full_stream(fobj): """ - `_secondary_stream` impl for Aqua secondary headers. + `_full_stream` impl for Aqua secondary headers. """ - return _secondary_stream(headers.aqua_header_lookup, fobj, data) + return _full_stream(headers.aqua_header_lookup, fobj) class Packet(object): @@ -87,7 +87,7 @@ class Packet(object): @property def seqid(self): - return self.primary_header.sequence_grouping + return self.primary_header.source_sequence_count @property def stamp(self): @@ -102,16 +102,16 @@ class Packet(object): return False def is_first(self): - return self.primary_header.sequence_grouping == GROUP_FIRST + return self.primary_header.sequence_grouping == 0b01 def is_continuine(self): - return self.primary_header.sequence_grouping == GROUP_CONTINUING + return self.primary_header.sequence_grouping == 0b00 def is_last(self): - return self.primary_header.sequence_grouping == GROUP_LAST + return self.primary_header.sequence_grouping == 0b10 def is_standalone(self): - return self.primary_header.sequence_grouping == GROUP_STANDALONE + return self.primary_header.sequence_grouping == 0b11 class PacketStream(object): @@ -119,19 +119,23 @@ class PacketStream(object): Iterates over all CCSDS data producing `Packet` tuples. Only as much data is necessary to generate a single packet is read into memory at a time. """ - def __init__(self, fobj, fail_on_missing=False): + def __init__(self, fobj): """ :param fobj: File-like object to read stream from :keyword fail_on_missing: If True raise a MissingPackets error when packets are missing. """ - self._stream = jpss_secondary_stream(fobj) + self._stream = jpss_full_stream(fobj) self._seek_cache = deque() - self._fail_on_missing = fail_on_missing + self._apid_info = defaultdict(lambda: {'count': 0, 'last_seqid': -1, 'missing': 0}) def __iter__(self): return self + # python3 + def __next__(self): + return self.next() + def push_back(self, packet): self._seek_cache.append(packet) @@ -149,7 +153,23 @@ class PacketStream(object): if len(self._seek_cache): return self._seek_cache.popleft() - return Packet(*self._stream.next()) + packet = Packet(*self._stream.next()) + self._update_info(packet) + return packet + + def _update_info(self, packet): + apid = self._apid_info[packet.apid] + apid['count'] += 1 + if apid['last_seqid'] != -1: + if packet.seqid > apid['last_seqid']: + missing = packet.seqid - apid['last_seqid'] - 1 + else: + missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID + apid['missing'] += missing + apid['last_seqid'] = packet.seqid + + def info(self): + return self._apid_info def seek(self, stamp, apid=None): """ @@ -187,11 +207,11 @@ def _cmp_by_first_pkt(filea, fileb): by time. """ stampa = stampb = None - for pkt in PacketStream(open(filea, 'rb')): + for pkt in PacketStream(io.open(filea, 'rb')): if pkt.stamp: stampa = pkt.stamp break - for pkt in PacketStream(open(fileb, 'rb')): + for pkt in PacketStream(io.open(fileb, 'rb')): if pkt.stamp: stampb = pkt.stamp break @@ -200,4 +220,4 @@ def _cmp_by_first_pkt(filea, fileb): def make_streams(filepaths): filepaths.sort(_cmp_by_first_pkt) - return [PacketStream(open(f, 'rb')) for f in filepaths] + return [PacketStream(io.open(f, 'rb')) for f in filepaths] -- GitLab