Skip to content
Snippets Groups Projects
Commit c158eaba authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Initial support for non-sicence L0 types

parent 4cce1038
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@ __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights res
import os
import glob
import logging
import tempfile
from datetime import datetime, timedelta
from edosl0util import stream, merge, jpssrdr
......@@ -16,81 +17,157 @@ from edosl0util.cli import util
LOG = logging
satellite_to_scid = {
'snpp': 157,
}
def pdsfilename(product, created):
if len(product) < 20:
product = product + 'A'*(20-len(product))
return '{}XT{:%y%j%H%M%S}001.PDS'.format(product, created)
def remove_files(files):
[os.remove(f) for f in files]
def _do_dump(filepat, product, rdrs, start, end):
for filepath in rdrs:
LOG.info('dumping %s', filepath)
jpssrdr.write_rdr_datasets(filepath)
# alphanumeric sorting to bootstrap final sort
inputs = sorted(glob.glob(filepat))
streams = [stream.jpss_packet_stream(open(f, 'rb')) for f in inputs]
pdsname = pdsfilename(product, start)
LOG.info('merging to %s', pdsname)
with open(pdsname, 'wb') as dest:
merge.merge(streams, output=dest, trunc_to=[start, end])
return pdsname
def cris_hsk(satellite, rdrs, start, end):
product = 'P{}1280CRISHSK'.format(satellite_to_scid[satellite])
return _do_dump('*.telemetry.pkts', product, rdrs, start, end)
def cris_dwell(satellite, rdrs, start, end):
product = 'P{}1291CRISDWELL'.format(satellite_to_scid[satellite])
return _do_dump('*.dwell.pkts', product, rdrs, start, end)
def cris_sci(satellite, rdrs, start, end):
product = 'P{}1289CRISSCIENCE'.format(satellite_to_scid[satellite])
return _do_dump('*.science.pkts', product, rdrs, start, end)
def atms_hsk(satellite, rdrs, start, end):
product = 'P{}0518ATMSHSK'.format(satellite_to_scid[satellite])
return _do_dump('*.telemetry.pkts', product, rdrs, start, end)
def atms_dwell(satellite, rdrs, start, end):
product = 'P{}0517ATMSDWELL'.format(satellite_to_scid[satellite])
return _do_dump('*.dwell.pkts', product, rdrs, start, end)
def atms_sci(satellite, rdrs, start, end):
product = 'P{}0515ATMSSCIENCE'.format(satellite_to_scid[satellite])
return _do_dump('*.science.pkts', product, rdrs, start, end)
def viirs_sci(satellite, sci, start, end):
product = 'P{}0826VIIRSSCIENCE'.format(satellite_to_scid[satellite])
return _do_dump('*.science.pkts', product, rdrs, start, end)
def main():
# XXX: This currently uses standard CCSDS packet merging that does not have
# any context regarding fill packets. In the future if it is desired to
# keep fill packets a new RDR specific merge will be required.
parser = util.default_parser()
parser.description = __doc__
parser.add_argument('--leave-pkts', action='store_true',
help='Do not delete intermediate .pkts files')
parser.add_argument(
'--minutes', type=int, default=120,
help=('Output size in minutes. The resulting file will be truncated '
'to created + minutes.'))
parser.add_argument(
'--science', action='store_true',
help='Dump science dataset')
parser.add_argument(
'--ancillary', action='store_true',
help='Dump spacecraft ancillary datasets')
'-S', '--satellite', choices=['snpp'], default='snpp',
help='Satellite used to set SCID')
def timestamp(v):
return datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
parser.add_argument(
'--created', type=timestamp, default=datetime.utcnow(),
help=('Time to use for creation time (yyyy-mm-dd hh:mm:ss).'))
parser.add_argument('rdr', nargs='+')
'-s', '--start', type=timestamp, required=True,
help=('File start time. Data before this time will be dropped. This '
'time also determines the creation time in the filename. Format '
'is YYYY-mm-dd HH:MM:SS.'),
)
parser.add_argument(
'-e', '--end', nargs='?', type=timestamp,
help=('File end time. Data after this time will be dropped. If omitted '
'end will default to start + 2 hours. Format '
'is YYYY-mm-dd HH:MM:SS.'),
)
subs = parser.add_subparsers(title='Destination level 0 data type')
def cmd_cris_hsk(args):
cris_hsk(args.satellite, args.rcrit, args.start, args.end)
subp = subs.add_parser('CRISHSK')
subp.add_argument('rcrit', nargs='+')
subp.set_defaults(func=cmd_cris_hsk)
def cmd_cris_dwell(args):
return cris_dwell(args.satellite, args.rdrs, args.start, args.end)
subp = subs.add_parser('CRISDWELL')
subp.add_argument(
'rdrs', nargs='+',
help=('RCRIH, RCRIM, and RCRII files. The same number of each is '
'required to produce a valid L0 file.'))
subp.set_defaults(func=cmd_cris_dwell)
def cmd_cris_sci(args):
return cris_sci(args.satellite, args.rcris, args.start, args.end)
subp = subs.add_parser('CRISSCIENCE')
subp.add_argument('rcris', nargs='+')
subp.set_defaults(func=cmd_cris_sci)
def cmd_atms_hsk(args):
return atms_hsk(args.satellite, args.ratmt, args.start, args.end)
subp = subs.add_parser('ATMSHSK')
subp.add_argument('ratmt', nargs='+')
subp.set_defaults(func=cmd_atms_hsk)
def cmd_atms_dwell(args):
return atms_dwell(args.satellite, args.rdrs, args.start, args.end)
subp = subs.add_parser('ATMSDWELL')
subp.add_argument(
'rdrs', nargs='+',
help=('RATMW and RATMM files. The same number of each is required '
'to produce a valid L0 file.'))
subp.set_defaults(func=cmd_atms_dwell)
def cmd_atms_sci(args):
return atms_sci(args.satellite, args.ratms, args.start, args.end)
subp = subs.add_parser('ATMSSCIENCE')
subp.add_argument('ratms', nargs='+')
subp.set_defaults(func=cmd_atms_sci)
def cmd_viirs_sci(args):
return viirs_sci(args.satellite, args.rvirs, args.start, args.end)
subp = subs.add_parser('VIIRSSCIENCE')
subp.add_argument('rvirs', nargs='+')
subp.set_defaults(func=cmd_viirs_sci)
args = parser.parse_args()
if not args.end:
args.end = args.start + timedelta(hours=2)
util.configure_logging(args)
def pdsfilename(product):
return 'P157{}XT{:%y%j%H%M%S}001.PDS'.format(product, args.created)
def remove_files(files):
[os.remove(f) for f in files]
for filepath in args.rdr:
dirname, rdrname = os.path.split(filepath)
LOG.info("dumping %s", filepath)
jpssrdr.write_rdr_datasets(
filepath,
science=args.science,
ancillary=args.ancillary,
skipfill=True)
interval = [args.created, args.created + timedelta(minutes=args.minutes, microseconds=-1)]
LOG.info("merge and truncate to %s", interval)
if args.ancillary:
for apid in (0, 8, 11):
inputs = sorted(glob.glob('*.anc{:d}.pkts'.format(apid)))
product = '{:04d}AAAAAAAAAAA'.format(apid)
pdsname = pdsfilename(product)
LOG.info("merging apid %d to %s", apid, pdsname)
# there are potentially a very large number of inputs (RNSCA) so
# make sure to keep track of files so we can explicitly close them
files = [open(f, 'rb') for f in inputs]
streams = [stream.jpss_packet_stream(f) for f in files]
with open(pdsname, 'wb') as dest:
merge.merge(streams, output=dest, trunc_to=interval)
for f in files:
f.close()
remove_files(inputs)
science_products = {
'RNSCA-RVIRS': '0826VIIRSSCIENCE',
'RCRIS-RNSCA': '1289CRISSCIENCEA',
'RATMS-RNSCA': '0515ATMSSCIENCEA'}
if args.science:
file_type = os.path.basename(args.rdr[0]).split('_')[0]
if file_type not in science_products:
parser.exit(1, '%s is not a supported science file type\n' % file_type)
inputs = sorted(glob.glob('*.science.pkts'))
product = science_products[file_type]
pdsname = pdsfilename(product)
LOG.info("merging %s to %s", product, pdsname)
streams = [stream.jpss_packet_stream(open(f, 'rb')) for f in inputs]
with open(pdsname, 'wb') as dest:
merge.merge(streams, output=dest, trunc_to=interval)
remove_files(inputs)
pdsname = args.func(args)
if not args.leave_pkts:
remove_files(glob.glob('*.pkts'))
if __name__ == '__main__':
......
......@@ -155,11 +155,37 @@ def _generate_packet_datasets(group):
yield name, np.array(ds)
def _find_data_group(fobj, name, sensors=['viirs', 'cris', 'atms']):
for sensor in sensors:
group = fobj.get('/All_Data/{}-{}-RDR_All'.format(sensor.upper(), name.upper()))
if group:
return group
def _find_science_group(fobj):
for sensor in ['viirs', 'cris', 'atms']:
group = fobj.get('/All_Data/{}-SCIENCE-RDR_All'.format(sensor.upper()))
return _find_data_group(fobj, 'SCIENCE')
def _find_dwell_group(fobj):
for name in ('HSKDWELL', 'IMDWELL', 'SSMDWELL'):
group = _find_data_group(fobj, name, sensors=['cris'])
if group:
return group
group = _find_data_group(fobj, 'DWELL', sensors=['atms'])
if group:
return group
def _find_diag_group(fobj):
return _find_data_group(fobj, 'DIAGNOSTIC', sensors=['cris', 'atms'])
def _find_telemetry_group(fobj):
return _find_data_group(fobj, 'TELEMETRY', sensors=['cris', 'atms'])
def _find_spacecraft_group(fobj):
return _find_data_group(fobj, 'SPACESCRAFT', sensors=['cris', 'atms'])
def _rdrs_for_packet_dataset(group):
......@@ -177,10 +203,13 @@ def _rdrs_for_packet_dataset(group):
def rdr_datasets(filepath):
fobj = H5File(filepath)
rdr = {}
rdr['science'] = _rdrs_for_packet_dataset(_find_science_group(fobj))
rdr['ancillary'] = _rdrs_for_packet_dataset(
fobj.get('/All_Data/SPACECRAFT-DIARY-RDR_All'))
rdr = dict(
telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)),
diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)),
dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)),
science=_rdrs_for_packet_dataset(_find_science_group(fobj)),
ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)),
)
fobj.close()
return rdr
......@@ -222,26 +251,19 @@ def _write_packets(pkts, dest, skipfill):
dest.write(pkt.packet)
def write_rdr_datasets(filepath, science=True, ancillary=True, skipfill=False):
def write_rdr_datasets(filepath, skipfill=False):
rdrname = os.path.basename(filepath)
rdrs = rdr_datasets(filepath)
if science:
with open('{}.science.pkts'.format(rdrname), 'wb') as dest:
for idx, rdr in enumerate(rdrs['science']):
for typekey in rdrs:
if not rdrs[typekey]:
continue
pktfile = '{}.{}.pkts'.format(rdrname, typekey)
LOG.debug('writing %s', pktfile)
with open(pktfile, 'wb') as dest:
for idx, rdr in enumerate(rdrs[typekey]):
LOG.debug(
'writing science gran %d %s-%s-%s',
'... %s gran %d %s-%s-%s', typekey,
idx, rdr.header.satellite, rdr.header.sensor, rdr.header.type_id)
_write_packets(rdr.packets(), dest, skipfill)
if ancillary:
for idx, rdr in enumerate(rdrs['ancillary']):
packets = {a.value: rdr.packets_for_apid(a)
for a in rdr.apids}
for apid, pkts in packets.items():
LOG.debug(
'writing ancillary gran %d %s-%s-%s %d',
idx, rdr.header.satellite, rdr.header.sensor,
rdr.header.type_id, apid)
with open('{}.anc{}.pkts'.format(rdrname, apid), 'wb') as dest:
_write_packets(pkts, dest, skipfill)
return rdrs
......@@ -42,7 +42,7 @@ class _Ptr(object):
(that.stamp, that.apid)
)
# instances with same stamp/apid will compare the same
# instances with same stamp/apid/size will compare the same
def __hash__(self):
return hash((self.stamp, self.apid, self.size))
......@@ -94,6 +94,9 @@ def _sort_by_time_apid(index, order=None):
def _filter_duplicates_by_size(index):
"""
Take the packet with the largest size.
"""
filtered = OrderedDict()
for ptr in index:
key = (ptr.stamp, ptr.apid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment