diff --git a/edosl0util/dump.py b/edosl0util/dump.py index d3331a30411611a39dfa419015e9029c5b16d86b..53cb61184860233e9800d54db1b7a5b19b4c61ca 100644 --- a/edosl0util/dump.py +++ b/edosl0util/dump.py @@ -21,7 +21,7 @@ def dump(filepath, handler=None): missing = 0 # sequence nums are per APID apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) - for pkt in PacketStream(filepath): + for pkt in PacketStream(open(filepath)): packets += 1 h1 = pkt._primary_header if h1.sequence_grouping == GROUP_FIRST: diff --git a/edosl0util/merge.py b/edosl0util/merge.py index b62596b2d137d0b5b15698b692300e4ce21037ec..05ac42da2fe041b2aaff9ea972bb921e3685bfc0 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -1,7 +1,8 @@ +import io import sys import logging from datetime import datetime -from collections import defaultdict, deque +from collections import deque from edosl0util.stream import PacketStream, MissingPackets @@ -12,31 +13,14 @@ _FLAG_DATE = datetime(1900, 1, 1) LOG = logging.getLogger(__name__) -def _cmp_by_first_pkt(filea, fileb): - """ - Compare 2 L0 files by their first available timestamp. Used to sort L0 files - by time. - """ - stampa = stampb = None - for pkt in PacketStream(filea): - if pkt.stamp: - stampa = pkt.stamp - break - for pkt in PacketStream(fileb): - if pkt.stamp: - stampb = pkt.stamp - break - return cmp(stampa, stampb) - - def _group_packets(stream): """ Returns a generator that yields all packets between timestamps. """ - packet = stream.next(fail_on_missing=True) + packet = stream.next() while not packet.stamp: yield packet - packet = stream.next(fail_on_missing=True) + packet = stream.next() # put the next packet with a stamp so it will be next stream.push_back(packet) @@ -47,15 +31,33 @@ def _is_valid_group(packets): and packets[0].apid == packets[-1].apid -def merge(filepaths, output=sys.stdout): +def _cmp_by_first_pkt(filea, fileb): + """ + Compare 2 L0 files by their first available timestamp. Used to sort L0 files + by time. + """ + stampa = stampb = None + for pkt in PacketStream(open(filea)): + if pkt.stamp: + stampa = pkt.stamp + break + for pkt in PacketStream(open(fileb)): + if pkt.stamp: + stampb = pkt.stamp + break + return cmp(stampa, stampb) + +def make_streams(filepaths): filepaths.sort(_cmp_by_first_pkt) - LOG.debug("sorted inputs %s", filepaths) + return [PacketStream(io.open(f, 'rb')) for f in filepaths] + - streams = [PacketStream(fn) for fn in filepaths] +def merge(streams, output=sys.stdout): last_apid = None last_stamp = datetime(1900, 1, 1) + # streams are removed as they are exhausted while streams: for stream in streams: LOG.debug(stream) @@ -66,7 +68,7 @@ def merge(filepaths, output=sys.stdout): while True: packets = deque() - packet = stream.next(fail_on_missing=True) + packet = stream.next() if packet.is_standalone(): packets.append(packet) diff --git a/edosl0util/split.py b/edosl0util/split.py index 6629fb68c52906dce236892238c8815885a5a2cf..4d44e52d386adbbe30a88270aaeca75eda826e2d 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -13,7 +13,7 @@ def split(filepath, minutes): buf = array.array('c') # buffer for a single data file until it is written cur_bucket = 0 # cur time bucket of size 'minutes' - for pkt in PacketStream(filepath): + for pkt in PacketStream(open(filepath)): # do the bucketing based on secondary header timestamps if pkt.stamp: hdrtime = unixtime(pkt.stamp) diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 17cabe0a7afe95ec9f5d87d99983b16d98d748c3..f7fe3c079d6d19cf7ae406bc00e3d72e9bb4d375 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,6 +1,5 @@ import logging -import io -from collections import deque, defaultdict +from collections import deque from edos.ccsds import ( c, @@ -41,7 +40,7 @@ class Packet(object): def __str__(self): return '<Packet apid=%d seqid=%d stamp=%s>' % \ - (self.apid, self.seqid, self.stamp) + (self.apid, self.seqid, self.stamp) __repr__ = __str__ def is_group(self): @@ -62,6 +61,7 @@ class Packet(object): class SequenceId(object): MAX = 2**14 + def __init__(self, initial=0): self._val = initial @@ -82,36 +82,31 @@ class PacketStream(object): Iterates over all CCSDS data producing `Packet` tuples. Only as much data is necessary to generate a single packet is read into memory at a time. """ - def __init__(self, filepath): - self._filepath = filepath - self._input = io.open(filepath, mode='rb') + def __init__(self, fobj, fail_on_missing=False): + self._input = fobj self._seek_cache = deque() self._seqids = {} - self.end_of_stream = False + self._fail_on_missing = fail_on_missing def __str__(self): - return '<PacketStream pos=%d %s>' % (self.tell(), self._filepath) + return '<PacketStream pos=%d %s>' % (self.tell(), self._input) def __iter__(self): return self - def end_of_stream(self): - return self._end_of_stream - def push_back(self, packet): self._seek_cache.append(packet) def _read(self, size): data = self._input.read(size) if not data: - self.end_of_stream = True raise StopIteration return data def tell(self): return self._input.tell() - def next(self, fail_on_missing=False): + def next(self): """ Return the next packet in the stream. @@ -147,7 +142,7 @@ class PacketStream(object): self._seqids[packet.apid] = SequenceId(packet.seqid) else: next_seqid = self._seqids[packet.apid].next() - if fail_on_missing and packet.seqid != next_seqid: + if self._fail_on_missing and packet.seqid != next_seqid: raise MissingPackets("non-sequential sequence ids", packet.seqid, next_seqid) return packet