From 69356fc11f9aeb8f5ccdc2254ec6adea18caa4db Mon Sep 17 00:00:00 2001 From: Bruce Flynn <bmflynn@gmail.com> Date: Wed, 1 Jul 2015 13:28:37 +0000 Subject: [PATCH] Add/use stream module. --- jpssl0util/cli.py | 29 ++++++++++------ jpssl0util/dump.py | 57 +++++++++++++++++++++++-------- jpssl0util/split.py | 60 ++++++++++---------------------- jpssl0util/stream.py | 77 ++++++++++++++++++++++++++++++++++++++++++ jpssl0util/timecode.py | 6 ++++ 5 files changed, 161 insertions(+), 68 deletions(-) mode change 100755 => 100644 jpssl0util/dump.py mode change 100755 => 100644 jpssl0util/split.py create mode 100644 jpssl0util/stream.py diff --git a/jpssl0util/cli.py b/jpssl0util/cli.py index 436a68e..c35ba1e 100644 --- a/jpssl0util/cli.py +++ b/jpssl0util/cli.py @@ -1,12 +1,14 @@ +""" +Console script entry points for CLI tools. +""" from datetime import datetime from jpssl0util import split, dump -from jpssl0util.timecode import cds_stamp def cmd_split(): from argparse import ArgumentParser - parser = ArgumentParser() + parser = ArgumentParser(description=split.__doc__) parser.add_argument('--minutes', type=int, default=6) parser.add_argument('--output-format', default='%y%j%H%M%S001.PDS') parser.add_argument('filepath') @@ -21,19 +23,24 @@ def cmd_split(): def cmd_dump(): from argparse import ArgumentParser - parser = ArgumentParser() + parser = ArgumentParser(description=dump.__doc__) + parser.add_argument('-v', '--verbose', action='store_true') parser.add_argument('pds') args = parser.parse_args() - def handler(h1, h2): - print(h1) - if h2: - stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds) - print("\t%s : %s" % (stamp, h2)) + def handler(pkt): + print(pkt.h1) + if pkt.h2: + print("\t%s : %s" % (pkt.stamp, pkt.h2)) - stats = dump.dump(args.pds, handler=handler) + stats = dump.dump(args.pds, handler=(handler if args.verbose else None)) + print("First Packet: {}".format(stats['first_pkt'])) + print("Last Packet: {}".format(stats['last_pkt'])) print("Packet Count: {:d}".format(stats['packet_count'])) print("Group Count: {:d}".format(stats['group_count'])) - apids = sorted(stats['apid_counts']) + print("Missing Count: {:d}".format(stats['missing_count'])) + apids = sorted(stats['apids']) for apid in apids: - print("APID {:d} Count: {:d}".format(apid, stats['apid_counts'][apid])) + count = stats['apids'][apid]['count'] + missing = stats['apids'][apid]['missing'] + print("APID {:d} Count:{:d} Missing:{:d}".format(apid, count, missing)) diff --git a/jpssl0util/dump.py b/jpssl0util/dump.py old mode 100755 new mode 100644 index 85a87dd..4475383 --- a/jpssl0util/dump.py +++ b/jpssl0util/dump.py @@ -1,26 +1,53 @@ -#!/usr/bin/env python from collections import defaultdict +from datetime import datetime -from edos import ccsds +from edos.ccsds import GROUP_FIRST -from jpssl0util.timecode import cds_stamp +from jpssl0util.stream import packetstream + + +MAX_SEQ_ID = 2**14 def dump(filepath, handler=None): + """ + Dump PDS file statistics. The entire file will be iterated, which for larger + PDS files may take a while. + """ + first_pkt = datetime(3000, 1, 1) + last_pkt = datetime(1900, 1, 1) + packets = 0 + groups = 0 + missing = 0 + # sequence nums are per APID + apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) + for pkt in packetstream(filepath): + packets += 1 + if pkt.h1.sequence_grouping == GROUP_FIRST: + groups += 1 + apids[pkt.h1.apid]['count'] += 1 + + seq = pkt.h1.source_sequence_count + expected_seq = (apids[pkt.h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID + if seq != expected_seq: + missing += 1 + apids[pkt.h1.apid]['missing'] += 1 + apids[pkt.h1.apid]['prev_seq'] = seq - pktcnt = 0 - groupcnt = 0 - apidcnts = defaultdict(lambda: 0) - for h1, h2 in ccsds.walk(filepath): - pktcnt += 1 - if h1.sequence_grouping == ccsds.GROUP_FIRST: - groupcnt += 1 - apidcnts[h1.apid] += 1 + if pkt.stamp: + first_pkt = min(pkt.stamp, first_pkt) + last_pkt = max(pkt.stamp, last_pkt) - handler(h1, h2) + try: + handler and handler(pkt) + except Exception: + pass return dict( - packet_count = pktcnt, - group_count = groupcnt, - apid_counts = apidcnts + first_pkt=first_pkt, + last_pkt=last_pkt, + packet_count=packets, + group_count=groups, + missing_count=missing, + apids=apids ) \ No newline at end of file diff --git a/jpssl0util/split.py b/jpssl0util/split.py old mode 100755 new mode 100644 index 2c32f4d..90d0a63 --- a/jpssl0util/split.py +++ b/jpssl0util/split.py @@ -1,25 +1,8 @@ -#!/usr/bin/env python -""" - - -TODO: - - Deal with leapseconds, use grain? -""" -from io import open +import io from cStringIO import StringIO -from datetime import datetime, timedelta - -from edos import ccsds from jpssl0util.timecode import cds_stamp, unixtime - - -def first_pkt_in_scan(h1): - return h1.sequence_grouping == ccsds.GROUP_FIRST - - -def standalone_pkt(h1): - return h1.sequence_grouping == ccsds.GROUP_STANDALONE +from jpssl0util.stream import packetstream def time_to_roll(time, minutes): @@ -32,31 +15,24 @@ def split(filepath, minutes): A single output file is buffered in memory until it is written. """ - with open(filepath, 'rb') as datain: - - buf = StringIO() # buffer for a single data file until it is written - cur_bucket = 0 # cur time bucket of size 'minutes' - cursor = 0 # cur pos in file - headers = ccsds.walk(filepath) + buf = StringIO() # buffer for a single data file until it is written + cur_bucket = 0 # cur time bucket of size 'minutes' - for h1, h2 in headers: - if first_pkt_in_scan(h1) or standalone_pkt(h1): - stamp = cds_stamp(h2.day, h2.millisecods, h2.microseconds) - hdrtime = unixtime(stamp) + for pkt in packetstream(filepath): + # do the bucketing based on secondary header timestamps + if pkt.h2: + stamp = cds_stamp(pkt.h2.day, pkt.h2.milliseconds, pkt.h2.microseconds) + hdrtime = unixtime(stamp) - pkt_bucket = hdrtime - hdrtime % (minutes * 60) - if cur_bucket == 0: - cur_bucket = pkt_bucket - - if cursor != 0 and pkt_bucket > cur_bucket: - yield cur_bucket, buf.getvalue() - buf = StringIO() + pkt_bucket = hdrtime - hdrtime % (minutes * 60) + if cur_bucket == 0: cur_bucket = pkt_bucket - datain.seek(cursor) - # data_length_minus1 + 1 == len(h2) + len(data) - read_size = ccsds.c.sizeof(h1) + h1.data_length_minus1 + 1 - buf.write(datain.read(read_size)) - cursor += read_size + if pkt_bucket > cur_bucket: + yield cur_bucket, buf.getvalue() + buf = StringIO() + cur_bucket = pkt_bucket + + buf.write(pkt.data) - yield cur_bucket, buf.getvalue() \ No newline at end of file + yield cur_bucket, buf.getvalue() \ No newline at end of file diff --git a/jpssl0util/stream.py b/jpssl0util/stream.py new file mode 100644 index 0000000..865df6e --- /dev/null +++ b/jpssl0util/stream.py @@ -0,0 +1,77 @@ + +import io +from collections import namedtuple + +from edos.ccsds import ( + c, + PrimaryHeader, + JpssFirstSecondaryHeader, + JpssSecondaryHeader, + GROUP_FIRST, + GROUP_STANDALONE +) + +from jpssl0util.timecode import cds_stamp + + +Packet = namedtuple('Packet', ('h1', 'h2', 'stamp', 'data')) + + +secondary_header_impls = { + GROUP_FIRST: JpssFirstSecondaryHeader, + GROUP_STANDALONE: JpssSecondaryHeader +} + + +def packetstream(filepath): + """ + Return a generator of `Packet` tuples that will contain the primary header + struct and all the packet data, including the header data. + """ + with io.open(filepath, mode='rb', buffering=0) as datain: + buf = datain.read(c.sizeof(PrimaryHeader)) + while buf: + data = buf + + h1 = PrimaryHeader.from_buffer_copy(buf) + + buf = datain.read(h1.data_length_minus1 + 1) + if not buf: + break + data += buf + + stamp = None + h2 = None # only first/standalone have h2 impls + H2Impl = secondary_header_impls.get(h1.sequence_grouping) + if H2Impl: + h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)]) + stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds) + + yield Packet(h1, h2, stamp, data) + + buf = datain.read(c.sizeof(PrimaryHeader)) + + +""" + +pdsfiles = sort_pdsfiles_by_pkttime(filenames) + +apid = None +seqnum = None +for pdsfile in pdsfiles: + try: + pdsfile.seekto(apid, seqnum) + for packet in pdsfile.read(): + output.write(packet) + + except MissingPacket as err: + apid = err.apid + seqnum = err.seqnum + print("missing packet apid:{:d} seqnum:{:d} in {:s}".format(err.apid, err.seqnum, pdsfile)) + + except NoMorePackets: + print("{} has no more packets".format(pdsfile)) + pdsfiles.pop(pdsfile) + + +""" diff --git a/jpssl0util/timecode.py b/jpssl0util/timecode.py index 2bbff41..51ac4a3 100644 --- a/jpssl0util/timecode.py +++ b/jpssl0util/timecode.py @@ -6,10 +6,16 @@ UNIX_EPOCH = datetime(1970, 1, 1) def unixtime(dt): + """ + Datetime to Unix timestamp. + """ if dt > UNIX_EPOCH: return (dt - UNIX_EPOCH).total_seconds() return (UNIX_EPOCH - dt).total_seconds() def cds_stamp(days, millis, micros, epoch=JPSS_EPOCH): + """ + CCSDS Day Segmented timecode to UTC datetime. + """ return JPSS_EPOCH + timedelta(days=days, microseconds=1000 * millis + micros) \ No newline at end of file -- GitLab