Skip to content
Snippets Groups Projects
Commit 8323413e authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Trac info in PackeStream

parent 3f6e3f87
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ def _is_valid_group(packets):
def merge(streams, output=sys.stdout):
last_apid = None
last_stamp = datetime(1900, 1, 1)
apid_cache = {}
# streams are removed as they are exhausted
while streams:
......
import io
import logging
from collections import deque
from collections import deque, defaultdict
from edos.ccsds import (
c,
PrimaryHeader,
GROUP_FIRST,
GROUP_CONTINUING,
GROUP_LAST,
GROUP_STANDALONE
)
from edosl0util import headers
LOG = logging.getLogger(__name__)
# Max CCSDS sequence id
MAX_SEQ_ID = 2**14 - 1
def simple_stream(fobj=None, data=None):
def simple_stream(fobj):
"""
Generator that yields PrimaryHeaders and data. Files are read using mmap.
"""
......@@ -35,7 +35,7 @@ def simple_stream(fobj=None, data=None):
yield h1, buf
def _secondary_stream(header_lookup, fobj=None, data=None):
def _full_stream(header_lookup, fobj):
"""
Generator that yields primary and secondary header and any left over bytes
of "user" data.
......@@ -43,7 +43,7 @@ def _secondary_stream(header_lookup, fobj=None, data=None):
:param header_lookup: function that takes a primary header and returns
a secondary header struct to use, or None.
"""
stream = simple_stream(fobj, data)
stream = simple_stream(fobj)
for h1, buf in stream:
h2 = ''
if h1.secondary_header_flag:
......@@ -56,18 +56,18 @@ def _secondary_stream(header_lookup, fobj=None, data=None):
yield h1, h2, buf
def jpss_secondary_stream(fobj=None, data=None):
def jpss_full_stream(fobj):
"""
`_secondary_stream` impl for JPSS secondary headers.
`_full_stream` impl for JPSS secondary headers.
"""
return _secondary_stream(headers.jpss_header_lookup, fobj, data)
return _full_stream(headers.jpss_header_lookup, fobj)
def aqua_secondary_stream(fobj=None, data=None):
def aqua_full_stream(fobj):
"""
`_secondary_stream` impl for Aqua secondary headers.
`_full_stream` impl for Aqua secondary headers.
"""
return _secondary_stream(headers.aqua_header_lookup, fobj, data)
return _full_stream(headers.aqua_header_lookup, fobj)
class Packet(object):
......@@ -87,7 +87,7 @@ class Packet(object):
@property
def seqid(self):
return self.primary_header.sequence_grouping
return self.primary_header.source_sequence_count
@property
def stamp(self):
......@@ -102,16 +102,16 @@ class Packet(object):
return False
def is_first(self):
return self.primary_header.sequence_grouping == GROUP_FIRST
return self.primary_header.sequence_grouping == 0b01
def is_continuine(self):
return self.primary_header.sequence_grouping == GROUP_CONTINUING
return self.primary_header.sequence_grouping == 0b00
def is_last(self):
return self.primary_header.sequence_grouping == GROUP_LAST
return self.primary_header.sequence_grouping == 0b10
def is_standalone(self):
return self.primary_header.sequence_grouping == GROUP_STANDALONE
return self.primary_header.sequence_grouping == 0b11
class PacketStream(object):
......@@ -119,19 +119,23 @@ class PacketStream(object):
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time.
"""
def __init__(self, fobj, fail_on_missing=False):
def __init__(self, fobj):
"""
:param fobj: File-like object to read stream from
:keyword fail_on_missing: If True raise a MissingPackets error when packets
are missing.
"""
self._stream = jpss_secondary_stream(fobj)
self._stream = jpss_full_stream(fobj)
self._seek_cache = deque()
self._fail_on_missing = fail_on_missing
self._apid_info = defaultdict(lambda: {'count': 0, 'last_seqid': -1, 'missing': 0})
def __iter__(self):
return self
# python3
def __next__(self):
return self.next()
def push_back(self, packet):
self._seek_cache.append(packet)
......@@ -149,7 +153,23 @@ class PacketStream(object):
if len(self._seek_cache):
return self._seek_cache.popleft()
return Packet(*self._stream.next())
packet = Packet(*self._stream.next())
self._update_info(packet)
return packet
def _update_info(self, packet):
apid = self._apid_info[packet.apid]
apid['count'] += 1
if apid['last_seqid'] != -1:
if packet.seqid > apid['last_seqid']:
missing = packet.seqid - apid['last_seqid'] - 1
else:
missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID
apid['missing'] += missing
apid['last_seqid'] = packet.seqid
def info(self):
return self._apid_info
def seek(self, stamp, apid=None):
"""
......@@ -187,11 +207,11 @@ def _cmp_by_first_pkt(filea, fileb):
by time.
"""
stampa = stampb = None
for pkt in PacketStream(open(filea, 'rb')):
for pkt in PacketStream(io.open(filea, 'rb')):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(open(fileb, 'rb')):
for pkt in PacketStream(io.open(fileb, 'rb')):
if pkt.stamp:
stampb = pkt.stamp
break
......@@ -200,4 +220,4 @@ def _cmp_by_first_pkt(filea, fileb):
def make_streams(filepaths):
filepaths.sort(_cmp_by_first_pkt)
return [PacketStream(open(f, 'rb')) for f in filepaths]
return [PacketStream(io.open(f, 'rb')) for f in filepaths]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment