From 2049cbe1b03cb304006842a355aace713ba1ec63 Mon Sep 17 00:00:00 2001 From: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed, 14 Oct 2015 18:58:38 +0000 Subject: [PATCH] prevent merge from skipping packet groups on stream switch --- edosl0util/merge.py | 70 ++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 87f6424..698650b 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -1,10 +1,13 @@ import io import sys import logging -from datetime import datetime -from collections import deque, defaultdict +from collections import deque -from edosl0util.stream import make_streams +from edosl0util.stream import ( + make_streams, + NonConsecutiveSeqId, + PacketTooShort +) LOG = logging.getLogger(__name__) @@ -28,67 +31,55 @@ def _is_valid_group(packets): def merge(streams, output=sys.stdout): - last_apid = None - last_stamp = datetime(1900, 1, 1) - # num pkts missing per apid - missing = defaultdict(lambda: 0) + last_packet = None # streams are removed as they are exhausted while streams: for stream in streams: try: - LOG.debug("seeking to %s, %s", last_stamp, last_apid) - stream.seek(last_stamp, last_apid) + if last_packet is not None: + LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid) + stream.seek_to(last_packet.stamp, last_packet.apid) + stream.seek_to_next_stamp() # Do until `next` causes StopIteration while True: packets = deque() packet = stream.next() - # If the num missing is incremented there is a missing - # packet. If missing put the current packet back for - # fetching again and defer to the next stream. - new_missing = stream.info()[packet.apid]['num_missing'] - if new_missing > missing[packet.apid]: - missing[packet.apid] = new_missing - stream.push_back(packet) - LOG.debug("missing packets, switching streams: %s", packet) - break # defer to next stream - if packet.is_standalone(): packets.append(packet) - # Packet Group: Collect all packets between packets with - # timestamps. Missing packets and packets that form an - # invalid group will be skipped. - else: + elif packet.is_first(): # packet group group = deque([packet]) group.extend(_group_packets(stream)) if _is_valid_group(group): packets.extend(group) - elif group[0].is_first(): - last_stamp = group[0].stamp - last_apid = group[0].apid - break # defer to next stream - + last_packet = group[0] + break else: - # yield to next stream, perhaps they have a valid - # group. LOG.debug("invalid group, switching streams:%s", group) - break # defer to next stream + break - # first packet always has a stamp because it's either standalone or - # part of a valid group - last_stamp = packets[0].stamp - last_apid = packets[0].apid + else: + LOG.debug("skipping hanging packet: %s", packet) + + # First packet always has a stamp because it's either + # standalone or part of a valid group + last_packet = packets[0] while packets: pkt = packets.popleft() LOG.debug('writing %s: %s', stream, pkt) - output.write(pkt.primary_header) - output.write(pkt.secondary_header) - output.write(pkt.data) + output.write(pkt.bytes()) + + except PacketTooShort as err: + LOG.error("corrupt stream, removing: %s", err) + streams.remove(stream) + + except NonConsecutiveSeqId: + LOG.debug('missing sequence id, next stream') except StopIteration: streams.remove(stream) @@ -97,5 +88,6 @@ def merge(streams, output=sys.stdout): def merge_files(filepaths, destpath): - streams = make_streams(filepaths) + filepaths, streams = make_streams(filepaths, fail_on_missing=True) + LOG.debug("merge order %s", filepaths) merge(streams, output=io.open(destpath, 'wb')) -- GitLab