Skip to content
Snippets Groups Projects
Commit 2049cbe1 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

prevent merge from skipping packet groups on stream switch

parent 37f84fe2
No related branches found
No related tags found
No related merge requests found
import io import io
import sys import sys
import logging import logging
from datetime import datetime from collections import deque
from collections import deque, defaultdict
from edosl0util.stream import make_streams from edosl0util.stream import (
make_streams,
NonConsecutiveSeqId,
PacketTooShort
)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -28,67 +31,55 @@ def _is_valid_group(packets): ...@@ -28,67 +31,55 @@ def _is_valid_group(packets):
def merge(streams, output=sys.stdout): def merge(streams, output=sys.stdout):
last_apid = None last_packet = None
last_stamp = datetime(1900, 1, 1)
# num pkts missing per apid
missing = defaultdict(lambda: 0)
# streams are removed as they are exhausted # streams are removed as they are exhausted
while streams: while streams:
for stream in streams: for stream in streams:
try: try:
LOG.debug("seeking to %s, %s", last_stamp, last_apid) if last_packet is not None:
stream.seek(last_stamp, last_apid) LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid)
stream.seek_to(last_packet.stamp, last_packet.apid)
stream.seek_to_next_stamp()
# Do until `next` causes StopIteration # Do until `next` causes StopIteration
while True: while True:
packets = deque() packets = deque()
packet = stream.next() packet = stream.next()
# If the num missing is incremented there is a missing
# packet. If missing put the current packet back for
# fetching again and defer to the next stream.
new_missing = stream.info()[packet.apid]['num_missing']
if new_missing > missing[packet.apid]:
missing[packet.apid] = new_missing
stream.push_back(packet)
LOG.debug("missing packets, switching streams: %s", packet)
break # defer to next stream
if packet.is_standalone(): if packet.is_standalone():
packets.append(packet) packets.append(packet)
# Packet Group: Collect all packets between packets with elif packet.is_first(): # packet group
# timestamps. Missing packets and packets that form an
# invalid group will be skipped.
else:
group = deque([packet]) group = deque([packet])
group.extend(_group_packets(stream)) group.extend(_group_packets(stream))
if _is_valid_group(group): if _is_valid_group(group):
packets.extend(group) packets.extend(group)
elif group[0].is_first(): elif group[0].is_first():
last_stamp = group[0].stamp last_packet = group[0]
last_apid = group[0].apid break
break # defer to next stream
else: else:
# yield to next stream, perhaps they have a valid
# group.
LOG.debug("invalid group, switching streams:%s", group) LOG.debug("invalid group, switching streams:%s", group)
break # defer to next stream break
# first packet always has a stamp because it's either standalone or else:
# part of a valid group LOG.debug("skipping hanging packet: %s", packet)
last_stamp = packets[0].stamp
last_apid = packets[0].apid # First packet always has a stamp because it's either
# standalone or part of a valid group
last_packet = packets[0]
while packets: while packets:
pkt = packets.popleft() pkt = packets.popleft()
LOG.debug('writing %s: %s', stream, pkt) LOG.debug('writing %s: %s', stream, pkt)
output.write(pkt.primary_header) output.write(pkt.bytes())
output.write(pkt.secondary_header)
output.write(pkt.data) except PacketTooShort as err:
LOG.error("corrupt stream, removing: %s", err)
streams.remove(stream)
except NonConsecutiveSeqId:
LOG.debug('missing sequence id, next stream')
except StopIteration: except StopIteration:
streams.remove(stream) streams.remove(stream)
...@@ -97,5 +88,6 @@ def merge(streams, output=sys.stdout): ...@@ -97,5 +88,6 @@ def merge(streams, output=sys.stdout):
def merge_files(filepaths, destpath): def merge_files(filepaths, destpath):
streams = make_streams(filepaths) filepaths, streams = make_streams(filepaths, fail_on_missing=True)
LOG.debug("merge order %s", filepaths)
merge(streams, output=io.open(destpath, 'wb')) merge(streams, output=io.open(destpath, 'wb'))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment