Skip to content
Snippets Groups Projects
Commit 1c6eac48 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Have PacketStream take a file-like object rather than filename

parent 974a003f
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ def dump(filepath, handler=None):
missing = 0
# sequence nums are per APID
apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0})
for pkt in PacketStream(filepath):
for pkt in PacketStream(open(filepath)):
packets += 1
h1 = pkt._primary_header
if h1.sequence_grouping == GROUP_FIRST:
......
import io
import sys
import logging
from datetime import datetime
from collections import defaultdict, deque
from collections import deque
from edosl0util.stream import PacketStream, MissingPackets
......@@ -12,31 +13,14 @@ _FLAG_DATE = datetime(1900, 1, 1)
LOG = logging.getLogger(__name__)
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(filea):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(fileb):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def _group_packets(stream):
"""
Returns a generator that yields all packets between timestamps.
"""
packet = stream.next(fail_on_missing=True)
packet = stream.next()
while not packet.stamp:
yield packet
packet = stream.next(fail_on_missing=True)
packet = stream.next()
# put the next packet with a stamp so it will be next
stream.push_back(packet)
......@@ -47,15 +31,33 @@ def _is_valid_group(packets):
and packets[0].apid == packets[-1].apid
def merge(filepaths, output=sys.stdout):
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(open(filea)):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(open(fileb)):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def make_streams(filepaths):
filepaths.sort(_cmp_by_first_pkt)
LOG.debug("sorted inputs %s", filepaths)
return [PacketStream(io.open(f, 'rb')) for f in filepaths]
streams = [PacketStream(fn) for fn in filepaths]
def merge(streams, output=sys.stdout):
last_apid = None
last_stamp = datetime(1900, 1, 1)
# streams are removed as they are exhausted
while streams:
for stream in streams:
LOG.debug(stream)
......@@ -66,7 +68,7 @@ def merge(filepaths, output=sys.stdout):
while True:
packets = deque()
packet = stream.next(fail_on_missing=True)
packet = stream.next()
if packet.is_standalone():
packets.append(packet)
......
......@@ -13,7 +13,7 @@ def split(filepath, minutes):
buf = array.array('c') # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes'
for pkt in PacketStream(filepath):
for pkt in PacketStream(open(filepath)):
# do the bucketing based on secondary header timestamps
if pkt.stamp:
hdrtime = unixtime(pkt.stamp)
......
import logging
import io
from collections import deque, defaultdict
from collections import deque
from edos.ccsds import (
c,
......@@ -41,7 +40,7 @@ class Packet(object):
def __str__(self):
return '<Packet apid=%d seqid=%d stamp=%s>' % \
(self.apid, self.seqid, self.stamp)
(self.apid, self.seqid, self.stamp)
__repr__ = __str__
def is_group(self):
......@@ -62,6 +61,7 @@ class Packet(object):
class SequenceId(object):
MAX = 2**14
def __init__(self, initial=0):
self._val = initial
......@@ -82,36 +82,31 @@ class PacketStream(object):
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time.
"""
def __init__(self, filepath):
self._filepath = filepath
self._input = io.open(filepath, mode='rb')
def __init__(self, fobj, fail_on_missing=False):
self._input = fobj
self._seek_cache = deque()
self._seqids = {}
self.end_of_stream = False
self._fail_on_missing = fail_on_missing
def __str__(self):
return '<PacketStream pos=%d %s>' % (self.tell(), self._filepath)
return '<PacketStream pos=%d %s>' % (self.tell(), self._input)
def __iter__(self):
return self
def end_of_stream(self):
return self._end_of_stream
def push_back(self, packet):
self._seek_cache.append(packet)
def _read(self, size):
data = self._input.read(size)
if not data:
self.end_of_stream = True
raise StopIteration
return data
def tell(self):
return self._input.tell()
def next(self, fail_on_missing=False):
def next(self):
"""
Return the next packet in the stream.
......@@ -147,7 +142,7 @@ class PacketStream(object):
self._seqids[packet.apid] = SequenceId(packet.seqid)
else:
next_seqid = self._seqids[packet.apid].next()
if fail_on_missing and packet.seqid != next_seqid:
if self._fail_on_missing and packet.seqid != next_seqid:
raise MissingPackets("non-sequential sequence ids",
packet.seqid, next_seqid)
return packet
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment