diff --git a/edosl0util/merge.py b/edosl0util/merge.py index bb987b036c672b6873e1ed7220ee63f5443ef04f..2478fe874d4326789130a395f71000474feab493 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -15,9 +15,17 @@ from edosl0util.stream import ( LOG = logging.getLogger(__name__) +class InvalidPacketGroup(Exception): + """ + If a there exists packets in a group with different apids or if the group + does not end in a last packet. + """ + + def _group_packets(stream): """ - Returns a generator that yields all packets between timestamps. + Returns a generator that yields all packets until a timestamps, i.e., a + packet group. """ packet = stream.next() while not packet.stamp: @@ -27,65 +35,57 @@ def _group_packets(stream): stream.push_back(packet) -def _is_valid_group(packets): - return packets[0].is_first() \ - and packets[-1].is_last() \ - and packets[0].apid == packets[-1].apid - - def merge(streams, output=sys.stdout): last_packet = None - # streams are removed as they are exhausted + streams = list(streams) while streams: - if 1: - stream = streams.pop(0) - try: - if last_packet is not None: - LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid) - stream.seek_to(last_packet.stamp, last_packet.apid) - stream.seek_to_next_stamp() - - # Do until `next` causes StopIteration - while True: - packets = deque() - packet = stream.next() - - if packet.is_standalone(): - packets.append(packet) - - elif packet.is_first(): # packet group - group = deque([packet]) - group.extend(_group_packets(stream)) - if _is_valid_group(group): - packets.extend(group) - elif group[0].is_first(): - last_packet = group[0] - break - else: - LOG.debug("invalid group, switching streams:%s", group) - break - - else: - LOG.debug("skipping hanging packet: %s", packet) - - # First packet always has a stamp because it's either - # standalone or part of a valid group - last_packet = packets[0] - - while packets: - pkt = packets.popleft() - output.write(pkt.bytes()) - - except PacketTooShort as err: - LOG.error("corrupt stream, removing: %s", err) - - except NonConsecutiveSeqId: - steams.push(stream) - LOG.debug('missing sequence id, next stream') - - except StopIteration: - LOG.debug("end-of-stream %s", stream) + stream = streams.pop(0) + try: + if last_packet is not None: + LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid) + stream.seek_to(last_packet.stamp, last_packet.apid) + stream.seek_to_next_stamp() + + # Do until `next` causes StopIteration + while True: + packets_to_write = deque() + packet = stream.next() + if packet.is_standalone(): + packets_to_write.append(packet) + + elif packet.is_first(): # packet group + group = deque([packet]) + group.extend(_group_packets(stream)) + if not group[0].is_first() \ + and group[-1].is_last() \ + and group[0].apid == group[-1].apid: + raise InvalidPacketGroup() + packets_to_write.extend(group) + + else: + LOG.debug("skipping hanging packet: %s", packet) + + # First packet always has a stamp because it's either + # standalone or part of a valid group + last_packet = packets_to_write[0] + while packets_to_write: + pkt = packets_to_write.popleft() + output.write(pkt.bytes()) + + except NonConsecutiveSeqId: + streams.append(stream) # stream still usable + LOG.debug('missing sequence id, next stream') + + except InvalidPacketGroup: + streams.append(stream) # stream still usable + LOG.debug("invalid group, switching streams:%s", group) + + except PacketTooShort as err: + LOG.error("corrupt stream, removing: %s", err) + + except StopIteration: + LOG.debug("end-of-stream %s", stream) def merge_files(filepaths, destpath):