Skip to content
Snippets Groups Projects
Commit 031daad8 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

massage merge stream iteration bug.

* Ensure streams is a know iterable type (deque)
* Make sure to reuse a stream in the case of a bad packet group. This was
  done with an exception so it was handled the same way as the other 2
  major cases.
parent fd59e81c
Branches
Tags
No related merge requests found
...@@ -15,9 +15,17 @@ from edosl0util.stream import ( ...@@ -15,9 +15,17 @@ from edosl0util.stream import (
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class InvalidPacketGroup(Exception):
"""
If a there exists packets in a group with different apids or if the group
does not end in a last packet.
"""
def _group_packets(stream): def _group_packets(stream):
""" """
Returns a generator that yields all packets between timestamps. Returns a generator that yields all packets until a timestamps, i.e., a
packet group.
""" """
packet = stream.next() packet = stream.next()
while not packet.stamp: while not packet.stamp:
...@@ -27,18 +35,11 @@ def _group_packets(stream): ...@@ -27,18 +35,11 @@ def _group_packets(stream):
stream.push_back(packet) stream.push_back(packet)
def _is_valid_group(packets):
return packets[0].is_first() \
and packets[-1].is_last() \
and packets[0].apid == packets[-1].apid
def merge(streams, output=sys.stdout): def merge(streams, output=sys.stdout):
last_packet = None last_packet = None
# streams are removed as they are exhausted # streams are removed as they are exhausted
streams = list(streams)
while streams: while streams:
if 1:
stream = streams.pop(0) stream = streams.pop(0)
try: try:
if last_packet is not None: if last_packet is not None:
...@@ -48,42 +49,41 @@ def merge(streams, output=sys.stdout): ...@@ -48,42 +49,41 @@ def merge(streams, output=sys.stdout):
# Do until `next` causes StopIteration # Do until `next` causes StopIteration
while True: while True:
packets = deque() packets_to_write = deque()
packet = stream.next() packet = stream.next()
if packet.is_standalone(): if packet.is_standalone():
packets.append(packet) packets_to_write.append(packet)
elif packet.is_first(): # packet group elif packet.is_first(): # packet group
group = deque([packet]) group = deque([packet])
group.extend(_group_packets(stream)) group.extend(_group_packets(stream))
if _is_valid_group(group): if not group[0].is_first() \
packets.extend(group) and group[-1].is_last() \
elif group[0].is_first(): and group[0].apid == group[-1].apid:
last_packet = group[0] raise InvalidPacketGroup()
break packets_to_write.extend(group)
else:
LOG.debug("invalid group, switching streams:%s", group)
break
else: else:
LOG.debug("skipping hanging packet: %s", packet) LOG.debug("skipping hanging packet: %s", packet)
# First packet always has a stamp because it's either # First packet always has a stamp because it's either
# standalone or part of a valid group # standalone or part of a valid group
last_packet = packets[0] last_packet = packets_to_write[0]
while packets_to_write:
while packets: pkt = packets_to_write.popleft()
pkt = packets.popleft()
output.write(pkt.bytes()) output.write(pkt.bytes())
except PacketTooShort as err:
LOG.error("corrupt stream, removing: %s", err)
except NonConsecutiveSeqId: except NonConsecutiveSeqId:
steams.push(stream) streams.append(stream) # stream still usable
LOG.debug('missing sequence id, next stream') LOG.debug('missing sequence id, next stream')
except InvalidPacketGroup:
streams.append(stream) # stream still usable
LOG.debug("invalid group, switching streams:%s", group)
except PacketTooShort as err:
LOG.error("corrupt stream, removing: %s", err)
except StopIteration: except StopIteration:
LOG.debug("end-of-stream %s", stream) LOG.debug("end-of-stream %s", stream)
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment