Skip to content
Snippets Groups Projects
Commit 974a003f authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Initial merge code is working

parent ca5fe382
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,7 @@ def dump(filepath, handler=None):
apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0})
for pkt in PacketStream(filepath):
packets += 1
h1 = pkt.primary_header
h1 = pkt._primary_header
if h1.sequence_grouping == GROUP_FIRST:
groups += 1
apids[h1.apid]['count'] += 1
......
import sys
import logging
from datetime import datetime
from collections import defaultdict, deque
from edosl0util.stream import PacketStream, MissingPackets
# date used as a flag value for comparissons
_FLAG_DATE = datetime(1900, 1, 1)
LOG = logging.getLogger(__name__)
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(filea):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(fileb):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def _group_packets(stream):
"""
Returns a generator that yields all packets between timestamps.
"""
packet = stream.next(fail_on_missing=True)
while not packet.stamp:
yield packet
packet = stream.next(fail_on_missing=True)
# put the next packet with a stamp so it will be next
stream.push_back(packet)
def _is_valid_group(packets):
return packets[0].is_first() \
and packets[-1].is_last() \
and packets[0].apid == packets[-1].apid
def merge(filepaths, output=sys.stdout):
filepaths.sort(_cmp_by_first_pkt)
LOG.debug("sorted inputs %s", filepaths)
streams = [PacketStream(fn) for fn in filepaths]
last_apid = None
last_stamp = datetime(1900, 1, 1)
while streams:
for stream in streams:
LOG.debug(stream)
try:
LOG.debug("seeking to %s, %s", last_stamp, last_apid)
stream.seek(last_stamp, last_apid)
while True:
packets = deque()
packet = stream.next(fail_on_missing=True)
if packet.is_standalone():
packets.append(packet)
else:
# Collect all packets between packets with timestamps.
# Missing packets and packets that form an invalid group
# will be skipped.
group = deque([packet])
group.extend(_group_packets(stream))
if _is_valid_group(group):
packets.extend(group)
elif group[0].is_first():
last_stamp = group[0].stamp
last_apid = group[0].apid
break
else:
# yield to next stream, perhaps they have a valid
# group.
LOG.debug("invalid group, switching streams:%s", group)
break
if not packets:
continue
# first packet always has a stamp because it's either standalone or
# part of a valid group
last_stamp = packets[0].stamp
last_apid = packets[0].apid
while packets:
output.write(packets.popleft().blob)
except MissingPackets as err:
LOG.debug("missing packets, switching streams: %s", err.args)
except StopIteration:
streams.remove(stream)
LOG.debug("end-of-stream %s", stream)
continue
import logging
import io
from collections import namedtuple, deque, defaultdict
from collections import deque, defaultdict
from edos.ccsds import (
c,
......@@ -8,13 +8,14 @@ from edos.ccsds import (
JpssFirstSecondaryHeader,
JpssSecondaryHeader,
GROUP_FIRST,
GROUP_CONTINUING,
GROUP_LAST,
GROUP_STANDALONE
)
from edosl0util.timecode import cds_stamp
MAX_SEQ_ID = 2**14
LOG = logging.getLogger(__name__)
secondary_header_impls = {
GROUP_FIRST: JpssFirstSecondaryHeader,
......@@ -22,20 +23,59 @@ secondary_header_impls = {
}
class Packet(namedtuple('Packet', ('primary_header', 'secondary_header',
'apid', 'seqid', 'stamp', 'blob'))):
def __str__(self):
return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp)
__repr__ = __str__
class MissingPackets(Exception):
"""
Sequence ID for an APID are not sequential. It is expected that sequence ids
are sequential, rolling over at SequenceId.MAX.
"""
class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))):
class Packet(object):
def __init__(self, apid, seqid, stamp, blob, pheader, sheader):
self.apid = apid
self.seqid = seqid
self.stamp = stamp
self.blob = blob
self._primary_header = pheader
self._secondary_header = sheader
def __str__(self):
return '<Group apid=%s stamp=%s packets=%d>' % (
self.apid, self.packets[0].stamp if self.packets else '', len(self.packets)
)
return '<Packet apid=%d seqid=%d stamp=%s>' % \
(self.apid, self.seqid, self.stamp)
__repr__ = __str__
def is_group(self):
return False
def is_first(self):
return self._primary_header.sequence_grouping == GROUP_FIRST
def is_continuine(self):
return self._primary_header.sequence_grouping == GROUP_CONTINUING
def is_last(self):
return self._primary_header.sequence_grouping == GROUP_LAST
def is_standalone(self):
return self._primary_header.sequence_grouping == GROUP_STANDALONE
class SequenceId(object):
MAX = 2**14
def __init__(self, initial=0):
self._val = initial
def __iter__(self):
return self
@property
def val(self):
return self._val
def next(self):
self._val = (self._val + 1) % self.MAX
return self.val
class PacketStream(object):
"""
......@@ -45,17 +85,46 @@ class PacketStream(object):
def __init__(self, filepath):
self._filepath = filepath
self._input = io.open(filepath, mode='rb')
self._seek_cache = deque()
self._seqids = {}
self.end_of_stream = False
def __str__(self):
return '<PacketStream pos=%d %s>' % (self.tell(), self._filepath)
def __iter__(self):
return self
def end_of_stream(self):
return self._end_of_stream
def push_back(self, packet):
self._seek_cache.append(packet)
def _read(self, size):
data = self._input.read(size)
if not data:
self.end_of_stream = True
raise StopIteration
return data
def next(self):
def tell(self):
return self._input.tell()
def next(self, fail_on_missing=False):
"""
Return the next packet in the stream.
:keyword fail_on_missing: If True MissingPackets will be raised when
sequence ids are non-sequential.
:raises: StopIteration when no more data can be serialzed into Packets.
:raises: MissingPackets see `fail_on_missing`
"""
# return any items on the seek cache before reading more
if len(self._seek_cache):
return self._seek_cache.pop()
buf = self._read(c.sizeof(PrimaryHeader))
data = buf
......@@ -71,153 +140,37 @@ class PacketStream(object):
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data)
# FUTURE: Use an iter with a peek function so we don't have to keep items that
# were pushed back in memory.
class _IndecisiveIter(object):
"""
Iterator which allows you to put an item back. Warning, items that are pushed
back are kept in memory.
"""
def __init__(self, iterable):
self.iterable = iterable
self._cache = deque()
def push_back(self, item):
self._cache.append(item)
def __iter__(self):
return self
def next(self):
if len(self._cache):
return self._cache.pop()
return self.iterable.next()
packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2)
class PacketCollector(object):
"""
Collects packets from a PacketStream into PacketGroup tuples, if applicable.
Grouping uses PrimaryHeader.sequence_grouping flag such that any packet
between GROUP_FIRST and GROUP_LAST are considered part of the same group.
"""
def __init__(self, stream):
self._stream = stream
def __iter__(self):
return self
def next(self):
packet = self._stream.next()
grouping = packet.primary_header.sequence_grouping
if grouping == GROUP_FIRST:
blob = bytearray(packet.blob)
grppkts = deque([packet])
while grouping != GROUP_LAST:
packet = self._stream.next()
assert not packet.stamp, "Expected only first packet in group to have stamp"
blob.extend(packet.blob)
grppkts.append(packet)
grouping = packet.primary_header.sequence_grouping
return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob))
# initialize a new seqid counter
if packet.apid not in self._seqids:
self._seqids[packet.apid] = SequenceId(packet.seqid)
else:
next_seqid = self._seqids[packet.apid].next()
if fail_on_missing and packet.seqid != next_seqid:
raise MissingPackets("non-sequential sequence ids",
packet.seqid, next_seqid)
return packet
class PacketWalker(object):
"""
Provides additional features for a PacketStream, such as seeking and reading
packets until a missing sequence id is encountered.
"""
def __init__(self, stream):
self._stream = _IndecisiveIter(stream)
# seqnums for each apid for locating missing pkts
self._seqcache = {}
# stamps for each apid so we can seek
self._stampcache = defaultdict(lambda: datetime(1900, 1, 1))
self._finished = False
def _next_packet(self):
try:
packet = self._stream.next()
if packet.stamp:
self._stampcache[packet.apid] = packet.stamp
return packet
except StopIteration:
self._finished = True
raise
def end_of_stream(self):
return self._finished
def seek(self, stamp=None, apid=None):
"""
Seek forward to a position in the packet stream where stamp is >= the
secondary header time. If apid is provided additionally seek to that
apid.
"""
if self.end_of_stream():
return
try:
if stamp:
self._seek_to_stamp(stamp)
if apid is not None:
self._seek_to_apid(apid)
except StopIteration:
pass
def _seek_to_stamp(self, stamp):
while True:
packet = self._next_packet()
hdr = packet.primary_header
if packet.stamp and packet.stamp > self._stampcache[hdr.apid]:
self._stream.push_back(packet)
return
def _seek_to_apid(self, apid):
while True:
packet = self._next_packet()
hdr = packet.primary_header
if hdr.apid == apid:
self._stream.push_back(packet)
return
def _check_if_missing(self, packet):
"""
Return True if this packet does not follow in the current packet in
sequence number where sequence number is per APID.
def seek(self, stamp, apid=None):
"""
prev_seqid = self._seqcache.get(packet.apid)
seqid = packet.seqid
missing = False
Seek forward such that the next packet provided will be the first packet
having a timestamp available in the secondary header >= stamp, and
optionally with apid as well.
if prev_seqid:
missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID)
self._seqcache[packet.apid] = seqid
return missing
def read(self):
"""
Read all remaining packets from current position.
"""
while not self._finished:
yield self._next_packet()
def read_until_missing(self):
"""
Read packets until the first missing sequence number is encountered. The
next packet read will be the first packet after the missing.
:raises: StopIteration if the end of the stream is reached before the
criteria can be met.
"""
while not self.end_of_stream():
packet = self._next_packet()
# checking for a seqid gap requires getting the packet after the
# gap, therefore we have to put the packet back to be retreived next
# time if a missing packet was detected
if self._check_if_missing(packet):
self._stream.push_back(packet)
break
yield packet
packet = self.next()
while not packet.stamp:
packet = self.next()
current_stamp = packet.stamp
while current_stamp < stamp:
packet = self.next()
current_stamp = packet.stamp or current_stamp
self.push_back(packet)
if apid is not None:
while packet.apid != apid:
packet = self.next()
self.push_back(packet)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment