diff --git a/MANIFEST.in b/MANIFEST.in index 7abf97da450eb0fbc58a2fa8206c95d10bba0a21..bc8352c8fd312aa7e97ba839b07358e382372a5a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include .version +include edosl0util/.repostate diff --git a/edosl0util/cli.py b/edosl0util/cli.py index 4cfb50c870ea7b1de7e974821199660c3080b63e..b283aa9bc6c809b2d8b1688d8edccd578991e743 100644 --- a/edosl0util/cli.py +++ b/edosl0util/cli.py @@ -3,24 +3,38 @@ Console script entry points for CLI tools. """ import io import os +import logging from datetime import datetime -from collections import defaultdict -from edosl0util import split, trunc, stream +from edosl0util import split, trunc, stream, merge, jpssrdr + +LOG = logging def _timestamp(v): return datetime.strptime(v, '%Y-%m-%d %H:%M:%S') +def _default_parser(): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='store_true') + return parser + + +def _configure_logging(args): + level = logging.DEBUG if getattr(args, 'verbose', False) else logging.WARN + logging.basicConfig(level=level, format='%(message)s') + + def cmd_trunc(): - from argparse import ArgumentParser - parser = ArgumentParser() + parser = _default_parser() parser.add_argument('-o', '--output') parser.add_argument('filename') - parser.add_argument('start', type=_timestamp) - parser.add_argument('end', type=_timestamp) + parser.add_argument('start', type=_timestamp, help='YYYY-MM-DD HH:MM:SS') + parser.add_argument('end', type=_timestamp, help='YYYY-MM-DD HH:MM:SS') args = parser.parse_args() + _configure_logging(args) output = args.output or os.path.basename(args.filename) + '.trunc' with io.open(output, 'wb') as fptr: @@ -29,38 +43,58 @@ def cmd_trunc(): def cmd_split(): - from argparse import ArgumentParser - parser = ArgumentParser() + parser = _default_parser() parser.add_argument('--minutes', type=int, default=6) parser.add_argument('filepath') args = parser.parse_args() + _configure_logging(args) for stamp, fpath in split.split_file(args.filepath, args.minutes, os.getcwd()): - print('wrote bucket {} to {}'.format(stamp.isoformat(), fpath)) + LOG.info('wrote bucket {} to {}'.format(stamp.isoformat(), fpath)) def cmd_info(): - from argparse import ArgumentParser - parser = ArgumentParser(description=split.__doc__) + parser = _default_parser() parser.add_argument('filepath') args = parser.parse_args() + _configure_logging(args) packets = stream.PacketStream(io.open(args.filepath, 'rb')) - - first = datetime(2016, 1, 1) - last = datetime(2000, 1, 1) - apids = defaultdict(lambda: {'count': 0}) - for p in packets: - if p.stamp: - first = min(first, p.stamp) - last = max(last, p.stamp) - apids[p.apid]['count'] += 1 - - print "First:", first - print "Last:", last + first = datetime(3000, 1, 1) + last = datetime(1970, 1, 1) + for packet in packets: + if packet.stamp: + first = min(packet.stamp, first) + last = max(packet.stamp, last) total = 0 - for apid in sorted(apids): - count = apids[apid]['count'] - total += count - print "{}: {}".format(apid, count) - print "{} total packets".format(total) + LOG.info("First: %s", first) + LOG.info("Last: %s", last) + for apid, dat in packets.info().items(): + total += dat['count'] + LOG.info("%d: count=%d missing=%d", apid, dat['count'], dat['num_missing']) + LOG.info("{} total packets".format(total)) + + +def cmd_merge(): + parser = _default_parser() + parser.add_argument('-o', '--output', default='out.pds') + parser.add_argument('pds', nargs='+') + args = parser.parse_args() + _configure_logging(args) + + merge.merge_files(args.pds, args.output) + + +def cmd_rdr2l0(): + parser = _default_parser() + parser.add_argument('-o', '--output') + parser.add_argument('-f', '--skipfill', action='store_true') + parser.add_argument('sensor', choices=('viirs', 'atms', 'cris')) + parser.add_argument('rdr') + args = parser.parse_args() + _configure_logging(args) + + output = args.output or args.rdr + '.pds' + with io.open(output, 'wb') as fptr: + for packet in jpssrdr.convert_to_nasa_l0(args.sensor, args.rdr): + fptr.write(packet) diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index 8ebe45d7f6a5a9a3ef1ac14d25e324a8e8679bc2..f5eb1e99ded45282f124eeb61d9c9dfa82de680c 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -207,25 +207,3 @@ def convert_to_nasa_l0(sensor, filename, skipfill=False): if skipfill: continue yield packet.packet - - -if __name__ == '__main__': - import argparse - parser = argparse.ArgumentParser() - parser.add_argument('-v', action='store_true') - parser.add_argument('-o', '--output') - parser.add_argument('-f', '--skipfill', action='store_true') - parser.add_argument('sensor', choices=('viirs', 'atms', 'cris')) - parser.add_argument('rdr') - args = parser.parse_args() - - if args.v: - lvl = logging.DEBUG - else: - lvl = logging.WARN - logging.basicConfig(level=lvl, format='%(message)s') - - output = args.output or args.rdr + '.pds' - with io.open(output, 'wb') as fptr: - for packet in convert_to_nasa_l0(args.sensor, args.rdr): - fptr.write(packet) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 90746b9d93c3253e54a94e9036febfd704a0eeb0..87f642443d2c72cd516f4c78717003962a9b2813 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -2,12 +2,9 @@ import io import sys import logging from datetime import datetime -from collections import deque +from collections import deque, defaultdict -from edosl0util.stream import ( - MissingPackets, - make_streams, -) +from edosl0util.stream import make_streams LOG = logging.getLogger(__name__) @@ -33,28 +30,38 @@ def _is_valid_group(packets): def merge(streams, output=sys.stdout): last_apid = None last_stamp = datetime(1900, 1, 1) - apid_cache = {} + # num pkts missing per apid + missing = defaultdict(lambda: 0) # streams are removed as they are exhausted while streams: for stream in streams: - LOG.debug(stream) try: LOG.debug("seeking to %s, %s", last_stamp, last_apid) stream.seek(last_stamp, last_apid) - # until `next` causes StopIteration + # Do until `next` causes StopIteration while True: packets = deque() - packet = stream.next() + + # If the num missing is incremented there is a missing + # packet. If missing put the current packet back for + # fetching again and defer to the next stream. + new_missing = stream.info()[packet.apid]['num_missing'] + if new_missing > missing[packet.apid]: + missing[packet.apid] = new_missing + stream.push_back(packet) + LOG.debug("missing packets, switching streams: %s", packet) + break # defer to next stream + if packet.is_standalone(): packets.append(packet) + # Packet Group: Collect all packets between packets with + # timestamps. Missing packets and packets that form an + # invalid group will be skipped. else: - # Collect all packets between packets with timestamps. - # Missing packets and packets that form an invalid group - # will be skipped. group = deque([packet]) group.extend(_group_packets(stream)) if _is_valid_group(group): @@ -63,13 +70,13 @@ def merge(streams, output=sys.stdout): elif group[0].is_first(): last_stamp = group[0].stamp last_apid = group[0].apid - break + break # defer to next stream else: # yield to next stream, perhaps they have a valid # group. LOG.debug("invalid group, switching streams:%s", group) - break + break # defer to next stream # first packet always has a stamp because it's either standalone or # part of a valid group @@ -78,13 +85,11 @@ def merge(streams, output=sys.stdout): while packets: pkt = packets.popleft() + LOG.debug('writing %s: %s', stream, pkt) output.write(pkt.primary_header) output.write(pkt.secondary_header) output.write(pkt.data) - except MissingPackets as err: - LOG.debug("missing packets, switching streams: %s", err.args) - except StopIteration: streams.remove(stream) LOG.debug("end-of-stream %s", stream) diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 993298caaadc82ad87f19d5639c14b21ba3ce15a..67f59259c5df12796f87c6028ae22b0a2b22ad08 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -121,15 +121,16 @@ class PacketStream(object): Iterates over all CCSDS data producing `Packet` tuples. Only as much data is necessary to generate a single packet is read into memory at a time. """ + SEQID_NOTSET = -1 + def __init__(self, fobj): """ :param fobj: File-like object to read stream from - :keyword fail_on_missing: If True raise a MissingPackets error when packets - are missing. """ self._stream = jpss_full_stream(fobj) self._seek_cache = deque() - self._apid_info = defaultdict(lambda: {'count': 0, 'last_seqid': -1, 'missing': 0}) + self._apid_info = defaultdict( + lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0}) def __iter__(self): return self @@ -160,15 +161,18 @@ class PacketStream(object): return packet def _update_info(self, packet): + have_missing = False apid = self._apid_info[packet.apid] apid['count'] += 1 - if apid['last_seqid'] != -1: + if apid['last_seqid'] != self.SEQID_NOTSET: if packet.seqid > apid['last_seqid']: - missing = packet.seqid - apid['last_seqid'] - 1 + num_missing = packet.seqid - apid['last_seqid'] - 1 else: - missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID - apid['missing'] += missing + num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID + have_missing = num_missing != apid['num_missing'] + apid['num_missing'] += num_missing apid['last_seqid'] = packet.seqid + return have_missing def info(self): return self._apid_info diff --git a/setup.py b/setup.py index 1a7889d822c513659263a6e7a2cb8116f3b63d22..43ab8f9867b53eab1e2f4dbec26fdeda7f838f80 100644 --- a/setup.py +++ b/setup.py @@ -3,14 +3,9 @@ from setuptools import setup, find_packages setup( name='EdosL0Util', description='Utilities for working with EDOS L0 PDS files', - version=':pygittools:', - pygitmeta=True, + version='0.4.dev8', packages=find_packages(), - setup_requires=[ - 'PyGitTools>=0.1.3' - ], install_requires=[ - 'edos', 'h5py', ], entry_points=""" @@ -18,5 +13,7 @@ setup( edosl0split = edosl0util.cli:cmd_split edosl0trunc = edosl0util.cli:cmd_trunc edosl0info = edosl0util.cli:cmd_info + edosl0merge = edosl0util.cli:cmd_merge + rdr2l0 = edosl0util.cli:cmd_rdr2l0 """ )