diff --git a/edosl0util/cli.py b/edosl0util/cli.py index a4456139966884222aa80116e9033a6a1ed0f048..270851685bd5927a6141149304ef37099c72bb2d 100644 --- a/edosl0util/cli.py +++ b/edosl0util/cli.py @@ -29,9 +29,9 @@ def cmd_dump(): args = parser.parse_args() def handler(pkt): - print(pkt.h1) - if pkt.h2: - print("\t%s : %s" % (pkt.stamp, pkt.h2)) + print(pkt.primary_header) + if pkt.secondary_header: + print("\t%s : %s" % (pkt.stamp, pkt.secondary_header)) stats = dump.dump(args.pds, handler=(handler if args.verbose else None)) print("First Packet: {}".format(stats['first_pkt'])) diff --git a/edosl0util/dump.py b/edosl0util/dump.py index b1dbc9717009498f3245646c64bc42a46ad1415f..c2e6332c2b699377b53cbcb907245d572c711402 100644 --- a/edosl0util/dump.py +++ b/edosl0util/dump.py @@ -3,7 +3,7 @@ from datetime import datetime from edos.ccsds import GROUP_FIRST -from edosl0util.stream import packetstream +from edosl0util.stream import PacketStream MAX_SEQ_ID = 2**14 @@ -21,18 +21,19 @@ def dump(filepath, handler=None): missing = 0 # sequence nums are per APID apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) - for pkt in packetstream(filepath): + for pkt in PacketStream(filepath): packets += 1 - if pkt.h1.sequence_grouping == GROUP_FIRST: + h1 = pkt.primary_header + if h1.sequence_grouping == GROUP_FIRST: groups += 1 - apids[pkt.h1.apid]['count'] += 1 + apids[h1.apid]['count'] += 1 - seq = pkt.h1.source_sequence_count - expected_seq = (apids[pkt.h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID + seq = h1.source_sequence_count + expected_seq = (apids[h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID if seq != expected_seq: missing += 1 - apids[pkt.h1.apid]['missing'] += 1 - apids[pkt.h1.apid]['prev_seq'] = seq + apids[h1.apid]['missing'] += 1 + apids[h1.apid]['prev_seq'] = seq if pkt.stamp: first_pkt = min(pkt.stamp, first_pkt) diff --git a/edosl0util/split.py b/edosl0util/split.py index e99aeb31eacf51d23c9184903de1a337115ca5c6..6629fb68c52906dce236892238c8815885a5a2cf 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -1,12 +1,7 @@ -import io -from cStringIO import StringIO +import array -from edosl0util.timecode import cds_stamp, unixtime -from edosl0util.stream import packetstream - - -def time_to_roll(time, minutes): - return time - time % (minutes * 60) +from edosl0util.timecode import unixtime +from edosl0util.stream import PacketStream def split(filepath, minutes): @@ -15,24 +10,23 @@ def split(filepath, minutes): A single output file is buffered in memory until it is written. """ - buf = StringIO() # buffer for a single data file until it is written + buf = array.array('c') # buffer for a single data file until it is written cur_bucket = 0 # cur time bucket of size 'minutes' - for pkt in packetstream(filepath): + for pkt in PacketStream(filepath): # do the bucketing based on secondary header timestamps - if pkt.h2: - stamp = cds_stamp(pkt.h2.day, pkt.h2.milliseconds, pkt.h2.microseconds) - hdrtime = unixtime(stamp) + if pkt.stamp: + hdrtime = unixtime(pkt.stamp) pkt_bucket = hdrtime - hdrtime % (minutes * 60) if cur_bucket == 0: cur_bucket = pkt_bucket if pkt_bucket > cur_bucket: - yield cur_bucket, buf.getvalue() - buf = StringIO() + yield cur_bucket, buf.tostring() + buf = array.array('c') cur_bucket = pkt_bucket - buf.write(pkt.data) + buf.fromstring(pkt.blob) - yield cur_bucket, buf.getvalue() \ No newline at end of file + yield cur_bucket, buf.tostring() \ No newline at end of file diff --git a/edosl0util/stream.py b/edosl0util/stream.py index fd48a69962149e974fdd288f1ceeb973f8e05dee..5fa9109edb7a5ddaee716840d16d7fd32d314d80 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,6 +1,6 @@ import io -from collections import namedtuple +from collections import namedtuple, deque, defaultdict from edos.ccsds import ( c, @@ -8,14 +8,13 @@ from edos.ccsds import ( JpssFirstSecondaryHeader, JpssSecondaryHeader, GROUP_FIRST, + GROUP_LAST, GROUP_STANDALONE ) from edosl0util.timecode import cds_stamp - -Packet = namedtuple('Packet', ('h1', 'h2', 'stamp', 'data')) - +MAX_SEQ_ID = 2**14 secondary_header_impls = { GROUP_FIRST: JpssFirstSecondaryHeader, @@ -23,55 +22,202 @@ secondary_header_impls = { } -def packetstream(filepath): +class Packet(namedtuple('Packet', ('primary_header', 'secondary_header', + 'apid', 'seqid', 'stamp', 'blob'))): + def __str__(self): + return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp) + __repr__ = __str__ + + +class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))): + def __str__(self): + return '<Group apid=%s stamp=%s packets=%d>' % ( + self.apid, self.packets[0].stamp if self.packets else '', len(self.packets) + ) + __repr__ = __str__ + + +class PacketStream(object): """ - Return a generator of `Packet` tuples that will contain the primary header - struct and all the packet data, including the header data. + Iterates over all CCSDS data producing `Packet` tuples. Only as much data is + necessary to generate a single packet is read into memory at a time. """ - with io.open(filepath, mode='rb', buffering=0) as datain: - buf = datain.read(c.sizeof(PrimaryHeader)) - while buf: - data = buf + def __init__(self, filepath): + self._filepath = filepath + self._input = io.open(filepath, mode='rb') - h1 = PrimaryHeader.from_buffer_copy(buf) + def __iter__(self): + return self - buf = datain.read(h1.data_length_minus1 + 1) - if not buf: - break - data += buf + def _read(self, size): + data = self._input.read(size) + if not data: + raise StopIteration + return data + + def next(self): + buf = self._read(c.sizeof(PrimaryHeader)) + data = buf - stamp = None - h2 = None # only first/standalone have h2 impls - H2Impl = secondary_header_impls.get(h1.sequence_grouping) - if H2Impl: - h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)]) - stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds) + h1 = PrimaryHeader.from_buffer_copy(buf) - yield Packet(h1, h2, stamp, data) + buf = self._read(h1.data_length_minus1 + 1) + data += buf - buf = datain.read(c.sizeof(PrimaryHeader)) + stamp = None + h2 = None # only first/standalone have h2 impls + H2Impl = secondary_header_impls.get(h1.sequence_grouping) + if H2Impl: + h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)]) + stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds) + return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data) -""" -pdsfiles = sort_pdsfiles_by_pkttime(filenames) +# FUTURE: Use an iter with a peek function so we don't have to keep items that +# were pushed back in memory. +class _IndecisiveIter(object): + """ + Iterator which allows you to put an item back. Warning, items that are pushed + back are kept in memory. + """ + def __init__(self, iterable): + self.iterable = iterable + self._cache = deque() -apid = None -seqnum = None -for pdsfile in pdsfiles: - try: - pdsfile.seekto(apid, seqnum) - for packet in pdsfile.read(): - output.write(packet) + def push_back(self, item): + self._cache.append(item) - except MissingPacket as err: - apid = err.apid - seqnum = err.seqnum - print("missing packet apid:{:d} seqnum:{:d} in {:s}".format(err.apid, err.seqnum, pdsfile)) + def __iter__(self): + return self - except NoMorePackets: - print("{} has no more packets".format(pdsfile)) - pdsfiles.pop(pdsfile) + def next(self): + if len(self._cache): + return self._cache.pop() + return self.iterable.next() -""" +class PacketCollector(object): + """ + Collects packets from a PacketStream into PacketGroup tuples, if applicable. + Grouping uses PrimaryHeader.sequence_grouping flag such that any packet + between GROUP_FIRST and GROUP_LAST are considered part of the same group. + """ + def __init__(self, stream): + self._stream = stream + + def __iter__(self): + return self + + def next(self): + packet = self._stream.next() + grouping = packet.primary_header.sequence_grouping + if grouping == GROUP_FIRST: + blob = bytearray(packet.blob) + grppkts = deque([packet]) + while grouping != GROUP_LAST: + packet = self._stream.next() + assert not packet.stamp, "Expected only first packet in group to have stamp" + blob.extend(packet.blob) + grppkts.append(packet) + grouping = packet.primary_header.sequence_grouping + return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob)) + return packet + + +class PacketWalker(object): + """ + Provides additional features for a PacketStream, such as seeking and reading + packets until a missing sequence id is encountered. + """ + def __init__(self, stream): + self._stream = _IndecisiveIter(stream) + # seqnums for each apid for locating missing pkts + self._seqcache = {} + # stamps for each apid so we can seek + self._stampcache = defaultdict(lambda: datetime(1900, 1, 1)) + self._finished = False + + def _next_packet(self): + try: + packet = self._stream.next() + if packet.stamp: + self._stampcache[packet.apid] = packet.stamp + return packet + except StopIteration: + self._finished = True + raise + + def end_of_stream(self): + return self._finished + + def seek(self, stamp=None, apid=None): + """ + Seek forward to a position in the packet stream where stamp is >= the + secondary header time. If apid is provided additionally seek to that + apid. + """ + if self.end_of_stream(): + return + try: + if stamp: + self._seek_to_stamp(stamp) + if apid is not None: + self._seek_to_apid(apid) + except StopIteration: + pass + + def _seek_to_stamp(self, stamp): + while True: + packet = self._next_packet() + hdr = packet.primary_header + if packet.stamp and packet.stamp > self._stampcache[hdr.apid]: + self._stream.push_back(packet) + return + + def _seek_to_apid(self, apid): + while True: + packet = self._next_packet() + hdr = packet.primary_header + if hdr.apid == apid: + self._stream.push_back(packet) + return + + def _check_if_missing(self, packet): + """ + Return True if this packet does not follow in the current packet in + sequence number where sequence number is per APID. + """ + prev_seqid = self._seqcache.get(packet.apid) + seqid = packet.seqid + missing = False + + if prev_seqid: + missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID) + + self._seqcache[packet.apid] = seqid + return missing + + def read(self): + """ + Read all remaining packets from current position. + """ + while not self._finished: + yield self._next_packet() + + def read_until_missing(self): + """ + Read packets until the first missing sequence number is encountered. The + next packet read will be the first packet after the missing. + """ + while not self.end_of_stream(): + packet = self._next_packet() + + # checking for a seqid gap requires getting the packet after the + # gap, therefore we have to put the packet back to be retreived next + # time if a missing packet was detected + if self._check_if_missing(packet): + self._stream.push_back(packet) + break + + yield packet diff --git a/setup.py b/setup.py index f70dbb0d0f72eb69f0304a1a66126478914b4597..8ecd8f464403736eea1af309304ed7be71766046 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setup( ], entry_points=""" [console_scripts] - l0split = edosl0util.cli:cmd_split + edosl0split = edosl0util.cli:cmd_split edosl0dump = edosl0util.cli:cmd_dump """ ) \ No newline at end of file