Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
cli.py 3.36 KiB
# encoding: utf-8
"""
Console script entry points for CLI tools.
"""
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
import io
import os
import logging
from datetime import datetime
from edosl0util import split, trunc, stream, merge, jpssrdr
LOG = logging
def _timestamp(v):
return datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
def _default_parser():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true')
return parser
def _configure_logging(args):
level = logging.DEBUG if getattr(args, 'verbose', False) else logging.WARN
logging.basicConfig(level=level, format='%(message)s')
def cmd_trunc():
parser = _default_parser()
parser.add_argument('-o', '--output')
parser.add_argument('filename')
parser.add_argument('start', type=_timestamp, help='YYYY-MM-DD HH:MM:SS')
parser.add_argument('end', type=_timestamp, help='YYYY-MM-DD HH:MM:SS')
args = parser.parse_args()
_configure_logging(args)
output = args.output or os.path.basename(args.filename) + '.trunc'
with io.open(output, 'wb') as fptr:
for pkt in trunc.trunc_file(args.filename, args.start, args.end):
fptr.write(pkt.bytes())
def cmd_split():
parser = _default_parser()
parser.add_argument('--minutes', type=int, default=6)
parser.add_argument('filepath')
args = parser.parse_args()
_configure_logging(args)
for stamp, fpath in split.split_file(args.filepath, args.minutes, os.getcwd()):
LOG.info('wrote bucket {} to {}'.format(stamp.isoformat(), fpath))
def cmd_info():
parser = _default_parser()
parser.add_argument('filepath')
args = parser.parse_args()
_configure_logging(args)
num_packets = 0
packets = stream.PacketStream(io.open(args.filepath, 'rb'))
first = datetime(3000, 1, 1)
last = datetime(1970, 1, 1)
while True:
try:
packet = packets.next()
num_packets += 1
except stream.PacketTooShort as err:
LOG.warn("corrupt packet stream after %d packets: %s",
num_packets, err)
break
except StopIteration:
break
if packet.stamp:
first = min(packet.stamp, first)
last = max(packet.stamp, last)
total = 0
first, last, info = packets.info()
LOG.info("First: %s", first)
LOG.info("Last: %s", last)
for apid, dat in info.items():
total += dat['count']
LOG.info("%d: count=%d missing=%d", apid, dat['count'], dat['num_missing'])
LOG.info("{} total packets".format(total))
def cmd_merge():
parser = _default_parser()
parser.add_argument('-o', '--output', default='out.pds')
parser.add_argument('pds', nargs='+')
args = parser.parse_args()
_configure_logging(args)
merge.merge_files(args.pds, args.output)
def cmd_rdr2l0():
parser = _default_parser()
parser.add_argument('-o', '--output')
parser.add_argument('-f', '--skipfill', action='store_true')
parser.add_argument('sensor', choices=('viirs', 'atms', 'cris'))
parser.add_argument('rdr')
args = parser.parse_args()
_configure_logging(args)
output = args.output or args.rdr + '.pds'
with io.open(output, 'wb') as fptr:
for packet in jpssrdr.convert_to_nasa_l0(args.sensor, args.rdr):
fptr.write(packet)