diff --git a/edosl0util/dump.py b/edosl0util/dump.py index c2e6332c2b699377b53cbcb907245d572c711402..d3331a30411611a39dfa419015e9029c5b16d86b 100644 --- a/edosl0util/dump.py +++ b/edosl0util/dump.py @@ -23,7 +23,7 @@ def dump(filepath, handler=None): apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) for pkt in PacketStream(filepath): packets += 1 - h1 = pkt.primary_header + h1 = pkt._primary_header if h1.sequence_grouping == GROUP_FIRST: groups += 1 apids[h1.apid]['count'] += 1 diff --git a/edosl0util/merge.py b/edosl0util/merge.py new file mode 100644 index 0000000000000000000000000000000000000000..b62596b2d137d0b5b15698b692300e4ce21037ec --- /dev/null +++ b/edosl0util/merge.py @@ -0,0 +1,110 @@ +import sys +import logging +from datetime import datetime +from collections import defaultdict, deque + +from edosl0util.stream import PacketStream, MissingPackets + +# date used as a flag value for comparissons +_FLAG_DATE = datetime(1900, 1, 1) + + +LOG = logging.getLogger(__name__) + + +def _cmp_by_first_pkt(filea, fileb): + """ + Compare 2 L0 files by their first available timestamp. Used to sort L0 files + by time. + """ + stampa = stampb = None + for pkt in PacketStream(filea): + if pkt.stamp: + stampa = pkt.stamp + break + for pkt in PacketStream(fileb): + if pkt.stamp: + stampb = pkt.stamp + break + return cmp(stampa, stampb) + + +def _group_packets(stream): + """ + Returns a generator that yields all packets between timestamps. + """ + packet = stream.next(fail_on_missing=True) + while not packet.stamp: + yield packet + packet = stream.next(fail_on_missing=True) + # put the next packet with a stamp so it will be next + stream.push_back(packet) + + +def _is_valid_group(packets): + return packets[0].is_first() \ + and packets[-1].is_last() \ + and packets[0].apid == packets[-1].apid + + +def merge(filepaths, output=sys.stdout): + + filepaths.sort(_cmp_by_first_pkt) + LOG.debug("sorted inputs %s", filepaths) + + streams = [PacketStream(fn) for fn in filepaths] + last_apid = None + last_stamp = datetime(1900, 1, 1) + + while streams: + for stream in streams: + LOG.debug(stream) + try: + LOG.debug("seeking to %s, %s", last_stamp, last_apid) + stream.seek(last_stamp, last_apid) + + while True: + packets = deque() + + packet = stream.next(fail_on_missing=True) + if packet.is_standalone(): + packets.append(packet) + + else: + # Collect all packets between packets with timestamps. + # Missing packets and packets that form an invalid group + # will be skipped. + group = deque([packet]) + group.extend(_group_packets(stream)) + if _is_valid_group(group): + packets.extend(group) + + elif group[0].is_first(): + last_stamp = group[0].stamp + last_apid = group[0].apid + break + + else: + # yield to next stream, perhaps they have a valid + # group. + LOG.debug("invalid group, switching streams:%s", group) + break + + if not packets: + continue + + # first packet always has a stamp because it's either standalone or + # part of a valid group + last_stamp = packets[0].stamp + last_apid = packets[0].apid + + while packets: + output.write(packets.popleft().blob) + + except MissingPackets as err: + LOG.debug("missing packets, switching streams: %s", err.args) + + except StopIteration: + streams.remove(stream) + LOG.debug("end-of-stream %s", stream) + continue diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 5fa9109edb7a5ddaee716840d16d7fd32d314d80..17cabe0a7afe95ec9f5d87d99983b16d98d748c3 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,6 +1,6 @@ - +import logging import io -from collections import namedtuple, deque, defaultdict +from collections import deque, defaultdict from edos.ccsds import ( c, @@ -8,13 +8,14 @@ from edos.ccsds import ( JpssFirstSecondaryHeader, JpssSecondaryHeader, GROUP_FIRST, + GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE ) from edosl0util.timecode import cds_stamp -MAX_SEQ_ID = 2**14 +LOG = logging.getLogger(__name__) secondary_header_impls = { GROUP_FIRST: JpssFirstSecondaryHeader, @@ -22,20 +23,59 @@ secondary_header_impls = { } -class Packet(namedtuple('Packet', ('primary_header', 'secondary_header', - 'apid', 'seqid', 'stamp', 'blob'))): - def __str__(self): - return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp) - __repr__ = __str__ +class MissingPackets(Exception): + """ + Sequence ID for an APID are not sequential. It is expected that sequence ids + are sequential, rolling over at SequenceId.MAX. + """ -class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))): +class Packet(object): + def __init__(self, apid, seqid, stamp, blob, pheader, sheader): + self.apid = apid + self.seqid = seqid + self.stamp = stamp + self.blob = blob + self._primary_header = pheader + self._secondary_header = sheader + def __str__(self): - return '<Group apid=%s stamp=%s packets=%d>' % ( - self.apid, self.packets[0].stamp if self.packets else '', len(self.packets) - ) + return '<Packet apid=%d seqid=%d stamp=%s>' % \ + (self.apid, self.seqid, self.stamp) __repr__ = __str__ + def is_group(self): + return False + + def is_first(self): + return self._primary_header.sequence_grouping == GROUP_FIRST + + def is_continuine(self): + return self._primary_header.sequence_grouping == GROUP_CONTINUING + + def is_last(self): + return self._primary_header.sequence_grouping == GROUP_LAST + + def is_standalone(self): + return self._primary_header.sequence_grouping == GROUP_STANDALONE + + +class SequenceId(object): + MAX = 2**14 + def __init__(self, initial=0): + self._val = initial + + def __iter__(self): + return self + + @property + def val(self): + return self._val + + def next(self): + self._val = (self._val + 1) % self.MAX + return self.val + class PacketStream(object): """ @@ -45,17 +85,46 @@ class PacketStream(object): def __init__(self, filepath): self._filepath = filepath self._input = io.open(filepath, mode='rb') + self._seek_cache = deque() + self._seqids = {} + self.end_of_stream = False + + def __str__(self): + return '<PacketStream pos=%d %s>' % (self.tell(), self._filepath) def __iter__(self): return self + def end_of_stream(self): + return self._end_of_stream + + def push_back(self, packet): + self._seek_cache.append(packet) + def _read(self, size): data = self._input.read(size) if not data: + self.end_of_stream = True raise StopIteration return data - def next(self): + def tell(self): + return self._input.tell() + + def next(self, fail_on_missing=False): + """ + Return the next packet in the stream. + + :keyword fail_on_missing: If True MissingPackets will be raised when + sequence ids are non-sequential. + + :raises: StopIteration when no more data can be serialzed into Packets. + :raises: MissingPackets see `fail_on_missing` + """ + # return any items on the seek cache before reading more + if len(self._seek_cache): + return self._seek_cache.pop() + buf = self._read(c.sizeof(PrimaryHeader)) data = buf @@ -71,153 +140,37 @@ class PacketStream(object): h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)]) stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds) - return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data) - - -# FUTURE: Use an iter with a peek function so we don't have to keep items that -# were pushed back in memory. -class _IndecisiveIter(object): - """ - Iterator which allows you to put an item back. Warning, items that are pushed - back are kept in memory. - """ - def __init__(self, iterable): - self.iterable = iterable - self._cache = deque() - - def push_back(self, item): - self._cache.append(item) - - def __iter__(self): - return self - - def next(self): - if len(self._cache): - return self._cache.pop() - return self.iterable.next() - + packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2) -class PacketCollector(object): - """ - Collects packets from a PacketStream into PacketGroup tuples, if applicable. - Grouping uses PrimaryHeader.sequence_grouping flag such that any packet - between GROUP_FIRST and GROUP_LAST are considered part of the same group. - """ - def __init__(self, stream): - self._stream = stream - - def __iter__(self): - return self - - def next(self): - packet = self._stream.next() - grouping = packet.primary_header.sequence_grouping - if grouping == GROUP_FIRST: - blob = bytearray(packet.blob) - grppkts = deque([packet]) - while grouping != GROUP_LAST: - packet = self._stream.next() - assert not packet.stamp, "Expected only first packet in group to have stamp" - blob.extend(packet.blob) - grppkts.append(packet) - grouping = packet.primary_header.sequence_grouping - return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob)) + # initialize a new seqid counter + if packet.apid not in self._seqids: + self._seqids[packet.apid] = SequenceId(packet.seqid) + else: + next_seqid = self._seqids[packet.apid].next() + if fail_on_missing and packet.seqid != next_seqid: + raise MissingPackets("non-sequential sequence ids", + packet.seqid, next_seqid) return packet - -class PacketWalker(object): - """ - Provides additional features for a PacketStream, such as seeking and reading - packets until a missing sequence id is encountered. - """ - def __init__(self, stream): - self._stream = _IndecisiveIter(stream) - # seqnums for each apid for locating missing pkts - self._seqcache = {} - # stamps for each apid so we can seek - self._stampcache = defaultdict(lambda: datetime(1900, 1, 1)) - self._finished = False - - def _next_packet(self): - try: - packet = self._stream.next() - if packet.stamp: - self._stampcache[packet.apid] = packet.stamp - return packet - except StopIteration: - self._finished = True - raise - - def end_of_stream(self): - return self._finished - - def seek(self, stamp=None, apid=None): - """ - Seek forward to a position in the packet stream where stamp is >= the - secondary header time. If apid is provided additionally seek to that - apid. - """ - if self.end_of_stream(): - return - try: - if stamp: - self._seek_to_stamp(stamp) - if apid is not None: - self._seek_to_apid(apid) - except StopIteration: - pass - - def _seek_to_stamp(self, stamp): - while True: - packet = self._next_packet() - hdr = packet.primary_header - if packet.stamp and packet.stamp > self._stampcache[hdr.apid]: - self._stream.push_back(packet) - return - - def _seek_to_apid(self, apid): - while True: - packet = self._next_packet() - hdr = packet.primary_header - if hdr.apid == apid: - self._stream.push_back(packet) - return - - def _check_if_missing(self, packet): - """ - Return True if this packet does not follow in the current packet in - sequence number where sequence number is per APID. + def seek(self, stamp, apid=None): """ - prev_seqid = self._seqcache.get(packet.apid) - seqid = packet.seqid - missing = False + Seek forward such that the next packet provided will be the first packet + having a timestamp available in the secondary header >= stamp, and + optionally with apid as well. - if prev_seqid: - missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID) - - self._seqcache[packet.apid] = seqid - return missing - - def read(self): - """ - Read all remaining packets from current position. - """ - while not self._finished: - yield self._next_packet() - - def read_until_missing(self): - """ - Read packets until the first missing sequence number is encountered. The - next packet read will be the first packet after the missing. + :raises: StopIteration if the end of the stream is reached before the + criteria can be met. """ - while not self.end_of_stream(): - packet = self._next_packet() - - # checking for a seqid gap requires getting the packet after the - # gap, therefore we have to put the packet back to be retreived next - # time if a missing packet was detected - if self._check_if_missing(packet): - self._stream.push_back(packet) - break - - yield packet + packet = self.next() + while not packet.stamp: + packet = self.next() + current_stamp = packet.stamp + while current_stamp < stamp: + packet = self.next() + current_stamp = packet.stamp or current_stamp + self.push_back(packet) + + if apid is not None: + while packet.apid != apid: + packet = self.next() + self.push_back(packet)