diff --git a/edosl0util/__init__.py b/edosl0util/__init__.py index 3d096a1e02125cd6a672db736d777dd9eeccd041..023cacdb58c2a617afac9babdce57d812fc79ced 100644 --- a/edosl0util/__init__.py +++ b/edosl0util/__init__.py @@ -1,3 +1,3 @@ from pkg_resources import get_distribution -__version__ = get_distribution(__name__).version \ No newline at end of file +__version__ = get_distribution(__name__).version diff --git a/edosl0util/cli/crgen.py b/edosl0util/cli/crgen.py index ac1cc318e1b9751d63f6c7091f13d008972c8c7d..2c120504ce0ff0a2ef5d86c75a9327d24832a056 100644 --- a/edosl0util/cli/crgen.py +++ b/edosl0util/cli/crgen.py @@ -1,4 +1,3 @@ - """Generate a PDS construction record from a PDS data file""" import logging @@ -9,10 +8,15 @@ from edosl0util.crgen import build_cr def main(): parser = util.default_parser(description=__doc__) - parser.add_argument('input_file') - parser.add_argument('-o', '--output-file', help='generated from input file name by default') - parser.add_argument('-p', '--prev_pds_file', - help='previous PDS data file, used for detecting cross-file packet gaps') + parser.add_argument("input_file") + parser.add_argument( + "-o", "--output-file", help="generated from input file name by default" + ) + parser.add_argument( + "-p", + "--prev_pds_file", + help="previous PDS data file, used for detecting cross-file packet gaps", + ) args = parser.parse_args() util.configure_logging(args) crgen(args.input_file, args.output_file, args.prev_pds_file) @@ -21,13 +25,13 @@ def main(): def crgen(input_file, output_file=None, prev_pds_file=None): cr = build_cr(input_file, prev_pds_file) if output_file is None: - output_file = cr['pds_id'] + '.PDS' - logger.info('writing {}'.format(output_file)) + output_file = cr["pds_id"] + ".PDS" + logger.info("writing {}".format(output_file)) crio.write(cr, output_file) logger = logging.getLogger(__name__) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/crinfo.py b/edosl0util/cli/crinfo.py index 2a53ce592a69303983f843e3dff62198e0ddc7f8..be5eaefba5db6a67fd2c7a2b02c0feff3b98115b 100644 --- a/edosl0util/cli/crinfo.py +++ b/edosl0util/cli/crinfo.py @@ -9,12 +9,12 @@ LOG = logging def main(): parser = util.default_parser() - parser.add_argument('filepath') + parser.add_argument("filepath") args = parser.parse_args() util.configure_logging(args) pprint(crio.read(args.filepath)) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/info.py b/edosl0util/cli/info.py index 0065cac6565739da8df104138bd64c1c88a1bd96..cb957fe5dbe5477cb441e64776bb4360470f6449 100644 --- a/edosl0util/cli/info.py +++ b/edosl0util/cli/info.py @@ -6,16 +6,16 @@ from edosl0util.cli import util def main(): parser = util.default_parser() - parser.add_argument('-a', '--aqua', action='store_true') - parser.add_argument('filepath') + parser.add_argument("-a", "--aqua", action="store_true") + parser.add_argument("filepath") args = parser.parse_args() util.configure_logging(args) num_packets = 0 if not args.aqua: - packets = stream.jpss_packet_stream(io.open(args.filepath, 'rb')) + packets = stream.jpss_packet_stream(io.open(args.filepath, "rb")) else: - packets = stream.aqua_packet_stream(io.open(args.filepath, 'rb')) + packets = stream.aqua_packet_stream(io.open(args.filepath, "rb")) while True: try: packets.next() @@ -30,10 +30,10 @@ def main(): print("First: %s" % first) print("Last: %s" % last) for apid, dat in info.items(): - total += dat['count'] - print("%d: count=%d missing=%d" % (apid, dat['count'], dat['num_missing'])) + total += dat["count"] + print("%d: count=%d missing=%d" % (apid, dat["count"], dat["num_missing"])) print("{} total packets".format(total)) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/merge.py b/edosl0util/cli/merge.py index 36fea3b7d47debf1d0ff529d0ea6ec0666a41b67..c351ead430aefbebe80c24211b40f67d1a081c9c 100644 --- a/edosl0util/cli/merge.py +++ b/edosl0util/cli/merge.py @@ -3,34 +3,51 @@ from datetime import datetime from edosl0util.cli import util from edosl0util import merge, stream + def main(): parser = util.default_parser() - parser.add_argument('-o', '--output', default='out.pds') + parser.add_argument("-o", "--output", default="out.pds") + def interval(v): - dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S') - return [dt(x) for x in v.split(',')] + dt = lambda v: datetime.strptime(v, "%Y-%m-%d %H:%M:%S") + return [dt(x) for x in v.split(",")] + parser.add_argument( - '-t', '--trunc-to', type=interval, - help=('Truncate to the interval given as coma separated timestamps of ' - 'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the ' - 'end time is exclusive.')) + "-t", + "--trunc-to", + type=interval, + help=( + "Truncate to the interval given as coma separated timestamps of " + "the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the " + "end time is exclusive." + ), + ) parser.add_argument( - '-a', '--apid-order', choices=['viirs', 'numerical'], default='numerical', - help=('Packets will be sorted by time and the named order provided. ' - '"viirs" will sort packets in APID order as expectet by the VIIRS ' - 'L1 software. "numerical" will simply sort APIDs in numerical ' - 'order')) - parser.add_argument('pds', nargs='+') + "-a", + "--apid-order", + choices=["viirs", "numerical"], + default="numerical", + help=( + "Packets will be sorted by time and the named order provided. " + '"viirs" will sort packets in APID order as expectet by the VIIRS ' + 'L1 software. "numerical" will simply sort APIDs in numerical ' + "order" + ), + ) + parser.add_argument("pds", nargs="+") args = parser.parse_args() util.configure_logging(args) - apid_order = {'viirs': merge.VIIRS_APID_ORDER, 'numerical': None}[args.apid_order] - streams = [stream.jpss_packet_stream(io.open(f, 'rb')) for f in args.pds] + apid_order = {"viirs": merge.VIIRS_APID_ORDER, "numerical": None}[args.apid_order] + streams = [stream.jpss_packet_stream(io.open(f, "rb")) for f in args.pds] merge.merge( - streams, output=io.open(args.output, 'wb'), - trunc_to=args.trunc_to, apid_order=apid_order) + streams, + output=io.open(args.output, "wb"), + trunc_to=args.trunc_to, + apid_order=apid_order, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/rdr2l0.py b/edosl0util/cli/rdr2l0.py index 4b6a5b106011eb02524cd03b2893c75411630501..b757ea5b6767d6d8085d00d0736d564199937ba7 100644 --- a/edosl0util/cli/rdr2l0.py +++ b/edosl0util/cli/rdr2l0.py @@ -36,89 +36,114 @@ def main(): # keep fill packets a new RDR specific merge will be required. parser = util.default_parser() parser.description = __doc__ - parser.add_argument('--leave-pkts', action='store_true', - help='Do not delete intermediate .pkts files') parser.add_argument( - '-S', '--satellite', choices=['snpp', 'j01'], default='snpp', - help='Satellite used to set SCID') + "--leave-pkts", + action="store_true", + help="Do not delete intermediate .pkts files", + ) + parser.add_argument( + "-S", + "--satellite", + choices=["snpp", "j01"], + default="snpp", + help="Satellite used to set SCID", + ) def timestamp(v): - return datetime.strptime(v, '%Y-%m-%d %H:%M:%S') + return datetime.strptime(v, "%Y-%m-%d %H:%M:%S") parser.add_argument( - '-s', '--start', type=timestamp, required=True, - help=('File start time. Data before this time will be dropped. This ' - 'time also determines the creation time in the filename. Format ' - 'is YYYY-mm-dd HH:MM:SS.'), + "-s", + "--start", + type=timestamp, + required=True, + help=( + "File start time. Data before this time will be dropped. This " + "time also determines the creation time in the filename. Format " + "is YYYY-mm-dd HH:MM:SS." + ), ) parser.add_argument( - '-e', '--end', nargs='?', type=timestamp, - help=('File end time. Data after this time will be dropped. If omitted ' - 'end will default to start + 2 hours. Format ' - 'is YYYY-mm-dd HH:MM:SS.'), + "-e", + "--end", + nargs="?", + type=timestamp, + help=( + "File end time. Data after this time will be dropped. If omitted " + "end will default to start + 2 hours. Format " + "is YYYY-mm-dd HH:MM:SS." + ), ) - subs = parser.add_subparsers(title='Destination level 0 data type sub-commands') + subs = parser.add_subparsers(title="Destination level 0 data type sub-commands") def cmd_cris_hsk(args): cris_hsk_to_l0(args.satellite, args.rcrit, args.start, args.end) - subp = subs.add_parser('CRISHSK') - subp.add_argument('rcrit', nargs='+') + subp = subs.add_parser("CRISHSK") + subp.add_argument("rcrit", nargs="+") subp.set_defaults(func=cmd_cris_hsk) def cmd_cris_dwell(args): return cris_dwell_to_l0(args.satellite, args.rdrs, args.start, args.end) - subp = subs.add_parser('CRISDWELL') + subp = subs.add_parser("CRISDWELL") subp.add_argument( - 'rdrs', nargs='+', - help=('RCRIH, RCRIM, and RCRII files. The same number of each is ' - 'required to produce a valid L0 file.')) + "rdrs", + nargs="+", + help=( + "RCRIH, RCRIM, and RCRII files. The same number of each is " + "required to produce a valid L0 file." + ), + ) subp.set_defaults(func=cmd_cris_dwell) def cmd_cris_sci(args): return cris_sci_to_l0(args.satellite, args.rcris, args.start, args.end) - subp = subs.add_parser('CRISSCIENCE') - subp.add_argument('rcris', nargs='+') + subp = subs.add_parser("CRISSCIENCE") + subp.add_argument("rcris", nargs="+") subp.set_defaults(func=cmd_cris_sci) def cmd_atms_hsk(args): return atms_hsk_to_l0(args.satellite, args.ratmt, args.start, args.end) - subp = subs.add_parser('ATMSHSK') - subp.add_argument('ratmt', nargs='+') + subp = subs.add_parser("ATMSHSK") + subp.add_argument("ratmt", nargs="+") subp.set_defaults(func=cmd_atms_hsk) def cmd_atms_dwell(args): return atms_dwell_to_l0(args.satellite, args.rdrs, args.start, args.end) - subp = subs.add_parser('ATMSDWELL') + subp = subs.add_parser("ATMSDWELL") subp.add_argument( - 'rdrs', nargs='+', - help=('RATMW and RATMM files. The same number of each is required ' - 'to produce a valid L0 file.')) + "rdrs", + nargs="+", + help=( + "RATMW and RATMM files. The same number of each is required " + "to produce a valid L0 file." + ), + ) subp.set_defaults(func=cmd_atms_dwell) def cmd_atms_sci(args): return atms_sci_to_l0(args.satellite, args.ratms, args.start, args.end) - subp = subs.add_parser('ATMSSCIENCE') - subp.add_argument('ratms', nargs='+') + subp = subs.add_parser("ATMSSCIENCE") + subp.add_argument("ratms", nargs="+") subp.set_defaults(func=cmd_atms_sci) def cmd_viirs_sci(args): return viirs_sci_to_l0(args.satellite, args.rvirs, args.start, args.end) - subp = subs.add_parser('VIIRSSCIENCE') - subp.add_argument('rvirs', nargs='+') + subp = subs.add_parser("VIIRSSCIENCE") + subp.add_argument("rvirs", nargs="+") subp.set_defaults(func=cmd_viirs_sci) def cmd_spacecraft(args): return spacecraft_to_l0(args.satellite, args.rnsca, args.start, args.end) - subp = subs.add_parser('SPACECRAFT') - subp.add_argument('rnsca', nargs='+') + subp = subs.add_parser("SPACECRAFT") + subp.add_argument("rnsca", nargs="+") subp.set_defaults(func=cmd_spacecraft) args = parser.parse_args() @@ -130,8 +155,8 @@ def main(): args.func(args) if not args.leave_pkts: - remove_files(glob.glob('*.pkts')) + remove_files(glob.glob("*.pkts")) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/rdrgen.py b/edosl0util/cli/rdrgen.py index 6e832443036196a23b9dd3f676c1846f05f9948f..546c0a9bc561bf0b4d8e1e08b19b150a70197aa9 100644 --- a/edosl0util/cli/rdrgen.py +++ b/edosl0util/cli/rdrgen.py @@ -5,37 +5,37 @@ from edosl0util.rdrgen import packets_to_rdrs def main(): parser = util.default_parser(description=__doc__) parser.add_argument( - '--aggr-type', - choices=['idps', 'full'], - default='idps', - help= - ('Aggregation type to perform. Use idps to have the aggregation buckets ' - 'determined by the same method IDPS uses, or full to create a single ' - 'large RDR containg all the data.')) + "--aggr-type", + choices=["idps", "full"], + default="idps", + help=( + "Aggregation type to perform. Use idps to have the aggregation buckets " + "determined by the same method IDPS uses, or full to create a single " + "large RDR containg all the data." + ), + ) g = parser.add_mutually_exclusive_group() g.add_argument( - '--aggr-level', + "--aggr-level", type=int, - help= - 'Number of aggregation granules used to determine alignment. Only used for type = idps.' + help="Number of aggregation granules used to determine alignment. Only used for type = idps.", ) - g.add_argument('--aggr', type=int, help='Deprecated, use --aggr-level') - parser.add_argument('sat', choices=['npp', 'snpp', 'j01']) - parser.add_argument('pds', nargs='+') + g.add_argument("--aggr", type=int, help="Deprecated, use --aggr-level") + parser.add_argument("sat", choices=["npp", "snpp", "j01"]) + parser.add_argument("pds", nargs="+") args = parser.parse_args() util.configure_logging(args) level = args.aggr_level or args.aggr - if args.sat == 'npp': - args.sat = 'snpp' + if args.sat == "npp": + args.sat = "snpp" if level: - packets_to_rdrs( - args.sat, args.pds, aggr_type=args.aggr_type, aggr_level=level) + packets_to_rdrs(args.sat, args.pds, aggr_type=args.aggr_type, aggr_level=level) else: packets_to_rdrs(args.sat, args.pds, aggr_type=args.aggr_type) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/rdrmerge.py b/edosl0util/cli/rdrmerge.py new file mode 100644 index 0000000000000000000000000000000000000000..396b3541bc16c3db1e588b8205c6cebd8fc60239 --- /dev/null +++ b/edosl0util/cli/rdrmerge.py @@ -0,0 +1,99 @@ +import logging +import os +from datetime import datetime +from glob import glob +from os.path import basename, join +from tempfile import TemporaryDirectory + +from ..jpssrdr import atms_sci_to_l0, cris_sci_to_l0, spacecraft_to_l0, viirs_sci_to_l0 +from ..rdrgen import packets_to_rdrs + +LOG = logging.getLogger(__name__) + + +def merge_rdrs(rdrs): + # filter and validiate rdrs + to_process = { + (s, p): [] + for s in ("snpp", "j01") + for p in ("RNSCA-RVIRS", "RCRIS-RNSCA", "RATMS-RNSCA") + } + for rdr in rdrs: + parts = basename(rdr).split("_") + if len(parts) < 2: + continue + product = parts[0] + # validate product + if product not in ("RNSCA-RVIRS", "RCRIS-RNSCA", "RATMS-RNSCA"): + LOG.debug("skipping unsupported product %s", rdr) + continue + # validate satellite + sat = parts[1] + sat = {"jpss1": "j01", "noaa20": "j01", "npp": "snpp"}.get(sat, sat) + sat = "j01" if sat in ("jpss1", "noaa20") else sat + if sat not in ("snpp", "j01"): + LOG.debug("skipping unsupported satellite %s", sat) + continue + + to_process[sat, product].append(rdr) + + # use same start/end for all extracting since we don't care about file + # timestamps + start = datetime(1970, 1, 1) + end = datetime.utcnow() + + # do each set of files separately to handle case where different products + # are provided + outputs = [] + for sat, product in to_process.keys(): + inputs = to_process[sat, product] + if not inputs: + continue + + sci_to_l0 = { + "RNSCA-RVIRS": viirs_sci_to_l0, + "RCRIS-RNSCA": cris_sci_to_l0, + "RATMS-RNSCA": atms_sci_to_l0, + }[product] + + with TemporaryDirectory(dir=os.getcwd()) as tmpdir: + # extract RDRs separately, name them after RDR + for fpath in inputs: + # extract the SCI data + fname = basename(fpath) + pds = sci_to_l0(sat, [fpath], start, end, tmpdir) + os.rename(pds, join(tmpdir, fname + ".sci.pds")) + # extract the spacecraft data + scs = spacecraft_to_l0(sat, [fpath], start, end, tmpdir) + for idx, pds in enumerate(scs): + os.rename(pds, join(tmpdir, fname + ".sc%d.pds" % idx)) + + # rdrs contains only file names + rdrs = packets_to_rdrs( + sat, glob(join(tmpdir, "*.pds")), aggr_type="full", output_dir=tmpdir + ) + assert len(rdrs) == 1, "Should have gotten a single RDR" + + # copy back to CWD + os.rename(join(tmpdir, rdrs[0]), rdrs[0]) + outputs.append(rdrs[0]) + + return outputs + + +def main(): + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("rdr", nargs="+", help="Any science rdr (RVIRS, RCRIS, RATMS)") + args = parser.parse_args() + + if not args.rdrs: + parser.exit(1, "no RDR's provided") + + for o in merge_rdrs(args.rdr): + LOG.info("created %s", o) + + +if __name__ == "__main__": + main() diff --git a/edosl0util/cli/split.py b/edosl0util/cli/split.py index 6f7e8aa7d3d3779a2fbe775bd542fc7deb09a368..0a06d2d9efd834848f4c56b09b81bc78d93e8129 100644 --- a/edosl0util/cli/split.py +++ b/edosl0util/cli/split.py @@ -8,14 +8,14 @@ LOG = logging def main(): parser = util.default_parser() - parser.add_argument('--minutes', type=int, default=6) - parser.add_argument('filepath') + parser.add_argument("--minutes", type=int, default=6) + parser.add_argument("filepath") args = parser.parse_args() util.configure_logging(args) for stamp, fpath in split.split_file(args.filepath, args.minutes, os.getcwd()): - LOG.info('wrote bucket {} to {}'.format(stamp.isoformat(), fpath)) + LOG.info("wrote bucket {} to {}".format(stamp.isoformat(), fpath)) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/trunc.py b/edosl0util/cli/trunc.py index 85af935d1367c0cbc9d149280d3e90f7ceffc655..79c5522d74161584b30ef0b604dd746165618fd3 100644 --- a/edosl0util/cli/trunc.py +++ b/edosl0util/cli/trunc.py @@ -6,18 +6,18 @@ from edosl0util import trunc def main(): parser = util.default_parser() - parser.add_argument('-o', '--output') - parser.add_argument('filename') - parser.add_argument('start', type=util.timestamp, help='YYYY-MM-DD HH:MM:SS') - parser.add_argument('end', type=util.timestamp, help='YYYY-MM-DD HH:MM:SS') + parser.add_argument("-o", "--output") + parser.add_argument("filename") + parser.add_argument("start", type=util.timestamp, help="YYYY-MM-DD HH:MM:SS") + parser.add_argument("end", type=util.timestamp, help="YYYY-MM-DD HH:MM:SS") args = parser.parse_args() util.configure_logging(args) - output = args.output or os.path.basename(args.filename) + '.trunc' - with io.open(output, 'wb') as fptr: + output = args.output or os.path.basename(args.filename) + ".trunc" + with io.open(output, "wb") as fptr: for pkt in trunc.trunc_file(args.filename, args.start, args.end): fptr.write(pkt.bytes()) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/edosl0util/cli/util.py b/edosl0util/cli/util.py index 352e7521fa49e235b67982731c15b14bdf671b2d..a3782257c2fb122bdd8f5d0bb0617b5e4bd32274 100644 --- a/edosl0util/cli/util.py +++ b/edosl0util/cli/util.py @@ -4,15 +4,15 @@ from datetime import datetime def timestamp(v): - return datetime.strptime(v, '%Y-%m-%d %H:%M:%S') + return datetime.strptime(v, "%Y-%m-%d %H:%M:%S") def default_parser(**kwargs): parser = argparse.ArgumentParser(**kwargs) - parser.add_argument('-v', '--verbose', action='store_true') + parser.add_argument("-v", "--verbose", action="store_true") return parser def configure_logging(args): - level = logging.DEBUG if getattr(args, 'verbose', False) else logging.WARN - logging.basicConfig(level=level, format='%(message)s') + level = logging.DEBUG if getattr(args, "verbose", False) else logging.WARN + logging.basicConfig(level=level, format="%(message)s") diff --git a/edosl0util/crgen.py b/edosl0util/crgen.py index ccb606eb0ebffdb9e6aadff6050701cbf6b50a4c..00ad1e593f1143dbb81977ee3bbf3e2e23b89f87 100644 --- a/edosl0util/crgen.py +++ b/edosl0util/crgen.py @@ -1,4 +1,3 @@ - """EDOS PDS construction record generation for SUOMI NPP""" from datetime import datetime @@ -26,15 +25,19 @@ def diff_crs(real_file, generated_file): generated = crio.read(generated_file) make_comparable(real, generated) if real != generated: - diff = difflib.ndiff(pprint.pformat(real).splitlines(), - pprint.pformat(generated).splitlines()) + diff = difflib.ndiff( + pprint.pformat(real).splitlines(), + pprint.pformat(generated).splitlines(), + ) for line in diff: print(line) def make_comparable(real, generated): insert_fake_cr_info(real) - del generated['completion_time'] # it seems CR completion time does not match PDS - del real['completion_time'] # creation time from the file name + del generated[ + "completion_time" + ] # it seems CR completion time does not match PDS + del real["completion_time"] # creation time from the file name main() @@ -48,31 +51,44 @@ def build_cr(pds_file, prev_pds_file=None): def main(): scan = scan_packets(pds_file, prev_pds_file) rv = {} - rv['pds_id'] = pds_id_from_path(pds_file) - rv['completion_time'] = get_pds_creation_time(pds_file) - rv['first_packet_time'] = scan['first_packet_time'] - rv['last_packet_time'] = scan['last_packet_time'] - rv['apid_info'] = build_apid_info(scan['apid_info']) - rv['apid_count'] = len(rv['apid_info']) - rv['file_info'] = file_info(rv['pds_id'], rv['apid_info']) - rv['file_count'] = len(rv['file_info']) - rv.update(aggregated_values(rv['apid_info'])) + rv["pds_id"] = pds_id_from_path(pds_file) + rv["completion_time"] = get_pds_creation_time(pds_file) + rv["first_packet_time"] = scan["first_packet_time"] + rv["last_packet_time"] = scan["last_packet_time"] + rv["apid_info"] = build_apid_info(scan["apid_info"]) + rv["apid_count"] = len(rv["apid_info"]) + rv["file_info"] = file_info(rv["pds_id"], rv["apid_info"]) + rv["file_count"] = len(rv["file_info"]) + rv.update(aggregated_values(rv["apid_info"])) insert_fake_cr_info(rv) return rv def file_info(pds_id, apid_info): - cr_entry = {'file_name': pds_id + '.PDS', 'apid_count': 0, 'apid_info': []} - data_entry = {'file_name': os.path.basename(pds_file), - 'apid_count': len(apid_info), - 'apid_info': [{'scid': npp_scid, 'apid': entry['apid'], - 'first_packet_time': entry['first_packet_time'], - 'last_packet_time': entry['last_packet_time']} - for entry in apid_info]} + cr_entry = {"file_name": pds_id + ".PDS", "apid_count": 0, "apid_info": []} + data_entry = { + "file_name": os.path.basename(pds_file), + "apid_count": len(apid_info), + "apid_info": [ + { + "scid": npp_scid, + "apid": entry["apid"], + "first_packet_time": entry["first_packet_time"], + "last_packet_time": entry["last_packet_time"], + } + for entry in apid_info + ], + } return [cr_entry, data_entry] def aggregated_values(apid_info): - keys = ['total_packets', 'total_bytes', 'gap_count', 'fill_bytes', - 'mismatched_length_packets', 'rs_corrected_packets'] + keys = [ + "total_packets", + "total_bytes", + "gap_count", + "fill_bytes", + "mismatched_length_packets", + "rs_corrected_packets", + ] return {key: sum(entry[key] for entry in apid_info) for key in keys} return main() @@ -80,27 +96,46 @@ def build_cr(pds_file, prev_pds_file=None): def insert_fake_cr_info(cr): """Populate a CR with phony values for fields that can't be discovered via a packet scan""" - cr.update({'edos_sw_ver_major': 0, 'edos_sw_ver_minor': 0, 'cr_type': 1, 'test_flag': 0, - 'scs_count': 1, 'scs_info': [{'start': missing_time_value, - 'stop': missing_time_value}], - 'first_packet_esh_time': missing_time_value, - 'last_packet_esh_time': missing_time_value, - 'fill_bytes': 0, 'mismatched_length_packets': 0, 'rs_corrected_packets': 0}) - insert_fake_apid_info(cr['apid_info']) + cr.update( + { + "edos_sw_ver_major": 0, + "edos_sw_ver_minor": 0, + "cr_type": 1, + "test_flag": 0, + "scs_count": 1, + "scs_info": [{"start": missing_time_value, "stop": missing_time_value}], + "first_packet_esh_time": missing_time_value, + "last_packet_esh_time": missing_time_value, + "fill_bytes": 0, + "mismatched_length_packets": 0, + "rs_corrected_packets": 0, + } + ) + insert_fake_apid_info(cr["apid_info"]) def insert_fake_apid_info(apid_info): """Fill CR apid_info with phony values for fields that can't be found via a packet scan""" for entry in apid_info: - entry.update({ - 'fill_packets': 0, 'fill_packet_info': [], 'fill_bytes': 0, - 'mismatched_length_packets': 0, 'mismatched_length_packet_ssc_list': [], - 'first_packet_esh_time': missing_time_value, - 'last_packet_esh_time': missing_time_value, - 'rs_corrected_packets': 0}) - for gap in entry['gap_info']: - gap.update({'pre_gap_packet_esh_time': missing_time_value, - 'post_gap_packet_esh_time': missing_time_value}) + entry.update( + { + "fill_packets": 0, + "fill_packet_info": [], + "fill_bytes": 0, + "mismatched_length_packets": 0, + "mismatched_length_packet_ssc_list": [], + "first_packet_esh_time": missing_time_value, + "last_packet_esh_time": missing_time_value, + "rs_corrected_packets": 0, + } + ) + for gap in entry["gap_info"]: + gap.update( + { + "pre_gap_packet_esh_time": missing_time_value, + "post_gap_packet_esh_time": missing_time_value, + } + ) def pds_id_from_path(pds_file): @@ -108,29 +143,39 @@ def pds_id_from_path(pds_file): file_name = os.path.basename(pds_file) pds_file_name_length = 40 if len(file_name) != pds_file_name_length: - raise ValueError('PDS file name {} not of expected length {}'.format( - file_name, pds_file_name_length)) - return file_name[:34] + '00' + raise ValueError( + "PDS file name {} not of expected length {}".format( + file_name, pds_file_name_length + ) + ) + return file_name[:34] + "00" def get_pds_creation_time(pds_file_or_id): """Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode""" pds_file_or_id = os.path.basename(pds_file_or_id) - return create_timecode(dt_to_cds(datetime.strptime(pds_file_or_id[22:33], '%y%j%H%M%S'))) + return create_timecode( + dt_to_cds(datetime.strptime(pds_file_or_id[22:33], "%y%j%H%M%S")) + ) def build_apid_info(scan_apid_info): """Build up apid_info resulting from scan_packets into a full apid_info for a CR""" apid_info = copy.deepcopy(scan_apid_info) for entry in apid_info: - entry['scid'] = npp_scid - entry['vcid_count'] = 1 - entry['vcid_info'] = [{'scid': npp_scid, 'vcid': npp_apid_to_vcid_map[entry['apid']]}] - entry['gap_count'] = len(entry['gap_info']) - del entry['last_packet_ssc'] # field was needed for bookkeeping but isn't in the CR + entry["scid"] = npp_scid + entry["vcid_count"] = 1 + entry["vcid_info"] = [ + {"scid": npp_scid, "vcid": npp_apid_to_vcid_map[entry["apid"]]} + ] + entry["gap_count"] = len(entry["gap_info"]) + del entry[ + "last_packet_ssc" + ] # field was needed for bookkeeping but isn't in the CR insert_fake_apid_info(apid_info) return apid_info + missing_time_value = DaySegmentedTimecode(0, 0, 0) npp_scid = 157 @@ -138,16 +183,21 @@ npp_scid = 157 # ripped from MDFCB, 2014-06-05 revision # modified to place CrIS FOV 6 in VCID 7 after testing against some real data npp_vcid_to_apids_map = { - 0: (list(range(0, 15)) + list(range(16, 50)) + - [65, 70, 100, 146, 155, 512, 513, 518, 543, 544, 545, 550, 768, 769, 773] + - list(range(1280, 1289))), + 0: ( + list(range(0, 15)) + + list(range(16, 50)) + + [65, 70, 100, 146, 155, 512, 513, 518, 543, 544, 545, 550, 768, 769, 773] + + list(range(1280, 1289)) + ), 1: [50, 101, 515, 528, 530, 531], 2: [102], 3: [103, 514, 536], 4: [104], 5: [105], - 6: [106, 1289, 1290] + list(set(range(1315, 1396)) - set(range(1318, 1391, 9)) - - set(range(1320, 1393, 9))), + 6: [106, 1289, 1290] + + list( + set(range(1315, 1396)) - set(range(1318, 1391, 9)) - set(range(1320, 1393, 9)) + ), 7: [107] + list(range(1318, 1391, 9)) + list(range(1320, 1393, 9)), 8: [108, 1294, 1295, 1296, 1398], 9: [109], @@ -164,10 +214,11 @@ npp_vcid_to_apids_map = { 20: [120], 21: [121, 517, 524, 549, 556, 780, 1291, 1292, 1293, 1397], 22: [122], - 24: [147, 148, 149, 150]} -npp_apid_to_vcid_map = {apid: vcid - for vcid, apids in npp_vcid_to_apids_map.items() - for apid in apids} + 24: [147, 148, 149, 150], +} +npp_apid_to_vcid_map = { + apid: vcid for vcid, apids in npp_vcid_to_apids_map.items() for apid in apids +} def scan_packets(pds_file, prev_pds_file=None): @@ -176,8 +227,8 @@ def scan_packets(pds_file, prev_pds_file=None): def main(): prev_apid_map = build_prev_apid_map(prev_pds_file) apid_map = {} - logger.info('scanning {}'.format(pds_file)) - stream = jpss_packet_stream(open(pds_file, 'rb')) + logger.info("scanning {}".format(pds_file)) + stream = jpss_packet_stream(open(pds_file, "rb")) first_pkt_time = None last_pkt_time = None for pkt in stream: @@ -191,47 +242,64 @@ def scan_packets(pds_file, prev_pds_file=None): if not first_pkt_time: first_pkt_time = pkt.cds_timecode last_pkt_time = pkt.cds_timecode - return {'first_packet_time': create_timecode(first_pkt_time), - 'last_packet_time': create_timecode(last_pkt_time), - 'apid_info': [apid_map[k] for k in sorted(apid_map)]} + return { + "first_packet_time": create_timecode(first_pkt_time), + "last_packet_time": create_timecode(last_pkt_time), + "apid_info": [apid_map[k] for k in sorted(apid_map)], + } def build_prev_apid_map(prev_pds_file): if prev_pds_file: - return {entry['apid']: entry for entry in scan_packets(prev_pds_file)['apid_info']} + return { + entry["apid"]: entry + for entry in scan_packets(prev_pds_file)["apid_info"] + } else: return {} def init_entry(pkt, entry_from_prev_pds): - rv = {'apid': pkt.apid, - 'first_packet_time': create_timecode(pkt.cds_timecode), 'first_packet_offset': pkt.offset, - 'last_packet_time': create_timecode(pkt.cds_timecode), 'last_packet_ssc': pkt.seqid, - 'total_packets': 1, 'total_bytes': pkt.size, 'gap_info': []} + rv = { + "apid": pkt.apid, + "first_packet_time": create_timecode(pkt.cds_timecode), + "first_packet_offset": pkt.offset, + "last_packet_time": create_timecode(pkt.cds_timecode), + "last_packet_ssc": pkt.seqid, + "total_packets": 1, + "total_bytes": pkt.size, + "gap_info": [], + } if entry_from_prev_pds: - update_gap_info(rv['gap_info'], entry_from_prev_pds['last_packet_ssc'], - entry_from_prev_pds['last_packet_time'], pkt) + update_gap_info( + rv["gap_info"], + entry_from_prev_pds["last_packet_ssc"], + entry_from_prev_pds["last_packet_time"], + pkt, + ) return rv def update_entry(entry, new_pkt): - prev_last_ssc = entry['last_packet_ssc'] - prev_last_time = entry['last_packet_time'] + prev_last_ssc = entry["last_packet_ssc"] + prev_last_time = entry["last_packet_time"] if new_pkt.cds_timecode: - if entry['first_packet_time'] == DaySegmentedTimecode(): - entry['first_packet_time'] = create_timecode(new_pkt.cds_timecode) - entry['last_packet_time'] = create_timecode(new_pkt.cds_timecode) - entry['last_packet_ssc'] = new_pkt.seqid - entry['total_packets'] += 1 - entry['total_bytes'] += new_pkt.size - update_gap_info(entry['gap_info'], prev_last_ssc, prev_last_time, new_pkt) + if entry["first_packet_time"] == DaySegmentedTimecode(): + entry["first_packet_time"] = create_timecode(new_pkt.cds_timecode) + entry["last_packet_time"] = create_timecode(new_pkt.cds_timecode) + entry["last_packet_ssc"] = new_pkt.seqid + entry["total_packets"] += 1 + entry["total_bytes"] += new_pkt.size + update_gap_info(entry["gap_info"], prev_last_ssc, prev_last_time, new_pkt) def update_gap_info(gap_info, last_ssc, last_pkt_time, new_pkt): ssc_limit = 16384 # one more than highest SSC expected_new_ssc = (last_ssc + 1) % ssc_limit if new_pkt.seqid != expected_new_ssc: - gap_entry = {'first_missing_ssc': expected_new_ssc, - 'missing_packet_count': (new_pkt.seqid - expected_new_ssc) % ssc_limit, - 'pre_gap_packet_time': last_pkt_time, - 'post_gap_packet_time': create_timecode(new_pkt.cds_timecode), - 'post_gap_packet_offset': new_pkt.offset} + gap_entry = { + "first_missing_ssc": expected_new_ssc, + "missing_packet_count": (new_pkt.seqid - expected_new_ssc) % ssc_limit, + "pre_gap_packet_time": last_pkt_time, + "post_gap_packet_time": create_timecode(new_pkt.cds_timecode), + "post_gap_packet_offset": new_pkt.offset, + } gap_info.append(gap_entry) return main() diff --git a/edosl0util/crio.py b/edosl0util/crio.py index fa0d98b364509b0138ff53bbd87aad29092fac8a..578182d5bd785268d0564057c9ac045a53c58d82 100644 --- a/edosl0util/crio.py +++ b/edosl0util/crio.py @@ -15,38 +15,48 @@ def read(cr_file): def main(): rv = {} - with open(cr_file, 'rb') as f: + with open(cr_file, "rb") as f: read_into_dict(f, Main1Struct, rv) - rv['scs_info'] = [read_struct(f, ScsStruct) for _ in range(rv['scs_count'])] + rv["scs_info"] = [read_struct(f, ScsStruct) for _ in range(rv["scs_count"])] read_into_dict(f, Main2Struct, rv) - rv['apid_info'] = [] - for _ in range(rv['apid_count']): + rv["apid_info"] = [] + for _ in range(rv["apid_count"]): d = {} read_into_dict(f, Apid1Struct, d) - d['vcid_info'] = [read_struct(f, ApidVcidStruct) for _ in range(d['vcid_count'])] + d["vcid_info"] = [ + read_struct(f, ApidVcidStruct) for _ in range(d["vcid_count"]) + ] read_into_dict(f, Apid2Struct, d) - d['gap_info'] = [read_struct(f, ApidGapStruct) for _ in range(d['gap_count'])] + d["gap_info"] = [ + read_struct(f, ApidGapStruct) for _ in range(d["gap_count"]) + ] read_into_dict(f, Apid3Struct, d) - d['fill_packet_info'] = [ - read_struct(f, ApidFillStruct) for _ in range(d['fill_packets'])] + d["fill_packet_info"] = [ + read_struct(f, ApidFillStruct) for _ in range(d["fill_packets"]) + ] read_into_dict(f, Apid4Struct, d) - d['mismatched_length_packet_ssc_list'] = [ - read_struct(f, ApidMismatchedLengthStruct)['packet_ssc'] - for i in range(d['mismatched_length_packets'])] + d["mismatched_length_packet_ssc_list"] = [ + read_struct(f, ApidMismatchedLengthStruct)["packet_ssc"] + for i in range(d["mismatched_length_packets"]) + ] read_into_dict(f, Apid5Struct, d) - rv['apid_info'].append(d) + rv["apid_info"].append(d) read_into_dict(f, Main3Struct, rv) - rv['file_info'] = [] - for _ in range(rv['file_count']): + rv["file_info"] = [] + for _ in range(rv["file_count"]): d = {} read_into_dict(f, FileStruct, d) - d['apid_info'] = [read_struct(f, FileApidStruct) for _ in range(d['apid_count'])] - if d['apid_count'] == 0: # bogus all-zero apid struct is present for the CR file + d["apid_info"] = [ + read_struct(f, FileApidStruct) for _ in range(d["apid_count"]) + ] + if ( + d["apid_count"] == 0 + ): # bogus all-zero apid struct is present for the CR file read_struct(f, FileApidStruct) - rv['file_info'].append(d) + rv["file_info"].append(d) extra = f.read() if extra: - raise ReadError('{} bytes remain after reading CR'.format(len(extra))) + raise ReadError("{} bytes remain after reading CR".format(len(extra))) return rv def read_into_dict(f, struct, data): @@ -55,11 +65,14 @@ def read(cr_file): def read_struct(f, struct): buf = f.read(c.sizeof(struct)) if len(buf) < c.sizeof(struct): - raise ReadError('Unexpected EOF reading CR') + raise ReadError("Unexpected EOF reading CR") rv = struct_to_dict(struct.from_buffer_copy(buf)) - rv = {k: v for k, v in rv.items() if not k.startswith('spare_')} # no spare fields - return {k: int(v) if isinstance(v, numbers.Integral) else v for k, v in - rv.items()} # no longs + rv = { + k: v for k, v in rv.items() if not k.startswith("spare_") + } # no spare fields + return { + k: int(v) if isinstance(v, numbers.Integral) else v for k, v in rv.items() + } # no longs return main() @@ -68,32 +81,34 @@ def write(cr, out_file): """Write out a PDS construction record file""" def main(): - with open(out_file, 'wb') as f: + with open(out_file, "wb") as f: write_struct(cr, Main1Struct, f) - for d in cr['scs_info']: + for d in cr["scs_info"]: write_struct(d, ScsStruct, f) write_struct(cr, Main2Struct, f) - for d in cr['apid_info']: + for d in cr["apid_info"]: write_struct(d, Apid1Struct, f) - for dd in d['vcid_info']: + for dd in d["vcid_info"]: write_struct(dd, ApidVcidStruct, f) write_struct(d, Apid2Struct, f) - for dd in d['gap_info']: + for dd in d["gap_info"]: write_struct(dd, ApidGapStruct, f) write_struct(d, Apid3Struct, f) - for dd in d['fill_packet_info']: + for dd in d["fill_packet_info"]: write_struct(dd, ApidFillStruct, f) write_struct(d, Apid4Struct, f) - for ssc in d['mismatched_length_packet_ssc_list']: - write_struct({'ssc': ssc}, ApidMismatchedLengthStruct, f) + for ssc in d["mismatched_length_packet_ssc_list"]: + write_struct({"ssc": ssc}, ApidMismatchedLengthStruct, f) write_struct(d, Apid5Struct, f) write_struct(cr, Main3Struct, f) - for d in cr['file_info']: + for d in cr["file_info"]: write_struct(d, FileStruct, f) - for dd in d['apid_info']: + for dd in d["apid_info"]: write_struct(dd, FileApidStruct, f) - if d['apid_count'] == 0: - write_struct({}, FileApidStruct, f) # one all-zero apid struct if no others + if d["apid_count"] == 0: + write_struct( + {}, FileApidStruct, f + ) # one all-zero apid struct if no others def write_struct(data, struct, out): fields = [f[0] for f in struct._fields_] @@ -101,7 +116,7 @@ def write(cr, out_file): for k, v in data.items(): if k in fields: if isinstance(v, str): - struct_data[k] = v.encode('utf-8') + struct_data[k] = v.encode("utf-8") else: struct_data[k] = v out.write(memoryview(struct(**struct_data))) @@ -114,112 +129,127 @@ def struct_to_dict(s): class Main1Struct(BaseStruct): - _fields_ = [('edos_sw_ver_major', c.c_uint8), - ('edos_sw_ver_minor', c.c_uint8), - ('cr_type', c.c_uint8), - ('spare_1', c.c_uint8 * 1), - ('pds_id', c.c_char * 36), - ('test_flag', c.c_uint8), - ('spare_2', c.c_uint8 * 9), - ('scs_count', c.c_uint16)] + _fields_ = [ + ("edos_sw_ver_major", c.c_uint8), + ("edos_sw_ver_minor", c.c_uint8), + ("cr_type", c.c_uint8), + ("spare_1", c.c_uint8 * 1), + ("pds_id", c.c_char * 36), + ("test_flag", c.c_uint8), + ("spare_2", c.c_uint8 * 9), + ("scs_count", c.c_uint16), + ] class ScsStruct(BaseStruct): - _fields_ = [('start', DaySegmentedTimecode), - ('stop', DaySegmentedTimecode)] + _fields_ = [("start", DaySegmentedTimecode), ("stop", DaySegmentedTimecode)] class Main2Struct(BaseStruct): - _fields_ = [('fill_bytes', c.c_uint64), - ('mismatched_length_packets', c.c_uint32), - ('first_packet_time', DaySegmentedTimecode), - ('last_packet_time', DaySegmentedTimecode), - ('first_packet_esh_time', DaySegmentedTimecode), - ('last_packet_esh_time', DaySegmentedTimecode), - ('rs_corrected_packets', c.c_uint32), - ('total_packets', c.c_uint32), - ('total_bytes', c.c_uint64), - ('gap_count', c.c_uint32), - ('completion_time', DaySegmentedTimecode), - ('spare_3', c.c_uint8 * 7), - ('apid_count', c.c_uint8)] + _fields_ = [ + ("fill_bytes", c.c_uint64), + ("mismatched_length_packets", c.c_uint32), + ("first_packet_time", DaySegmentedTimecode), + ("last_packet_time", DaySegmentedTimecode), + ("first_packet_esh_time", DaySegmentedTimecode), + ("last_packet_esh_time", DaySegmentedTimecode), + ("rs_corrected_packets", c.c_uint32), + ("total_packets", c.c_uint32), + ("total_bytes", c.c_uint64), + ("gap_count", c.c_uint32), + ("completion_time", DaySegmentedTimecode), + ("spare_3", c.c_uint8 * 7), + ("apid_count", c.c_uint8), + ] class Apid1Struct(BaseStruct): - _fields_ = [('spare_1', c.c_uint8 * 1), - ('scid', c.c_uint8), - ('apid', c.c_uint16), - ('first_packet_offset', c.c_uint64), - ('spare_2', c.c_uint8 * 3), - ('vcid_count', c.c_uint8)] + _fields_ = [ + ("spare_1", c.c_uint8 * 1), + ("scid", c.c_uint8), + ("apid", c.c_uint16), + ("first_packet_offset", c.c_uint64), + ("spare_2", c.c_uint8 * 3), + ("vcid_count", c.c_uint8), + ] class ApidVcidStruct(BaseStruct): - _fields_ = [('spare_1', c.c_uint16), - ('spare_2', c.c_uint16, 2), - ('scid', c.c_uint16, 8), - ('vcid', c.c_uint16, 6)] + _fields_ = [ + ("spare_1", c.c_uint16), + ("spare_2", c.c_uint16, 2), + ("scid", c.c_uint16, 8), + ("vcid", c.c_uint16, 6), + ] class Apid2Struct(BaseStruct): - _fields_ = [('gap_count', c.c_uint32)] + _fields_ = [("gap_count", c.c_uint32)] class ApidGapStruct(BaseStruct): - _fields_ = [('first_missing_ssc', c.c_uint32), - ('post_gap_packet_offset', c.c_uint64), - ('missing_packet_count', c.c_uint32), - ('pre_gap_packet_time', DaySegmentedTimecode), - ('post_gap_packet_time', DaySegmentedTimecode), - ('pre_gap_packet_esh_time', DaySegmentedTimecode), - ('post_gap_packet_esh_time', DaySegmentedTimecode)] + _fields_ = [ + ("first_missing_ssc", c.c_uint32), + ("post_gap_packet_offset", c.c_uint64), + ("missing_packet_count", c.c_uint32), + ("pre_gap_packet_time", DaySegmentedTimecode), + ("post_gap_packet_time", DaySegmentedTimecode), + ("pre_gap_packet_esh_time", DaySegmentedTimecode), + ("post_gap_packet_esh_time", DaySegmentedTimecode), + ] class Apid3Struct(BaseStruct): - _fields_ = [('fill_packets', c.c_uint32)] + _fields_ = [("fill_packets", c.c_uint32)] class ApidFillStruct(BaseStruct): - _fields_ = [('packet_ssc', c.c_uint32), - ('packet_offset', c.c_uint64), - ('first_fill_byte', c.c_uint32)] + _fields_ = [ + ("packet_ssc", c.c_uint32), + ("packet_offset", c.c_uint64), + ("first_fill_byte", c.c_uint32), + ] class Apid4Struct(BaseStruct): - _fields_ = [('fill_bytes', c.c_uint64), - ('mismatched_length_packets', c.c_uint32)] + _fields_ = [("fill_bytes", c.c_uint64), ("mismatched_length_packets", c.c_uint32)] class ApidMismatchedLengthStruct(BaseStruct): - _fields_ = [('packet_ssc', c.c_uint32)] + _fields_ = [("packet_ssc", c.c_uint32)] class Apid5Struct(BaseStruct): - _fields_ = [('first_packet_time', DaySegmentedTimecode), - ('last_packet_time', DaySegmentedTimecode), - ('first_packet_esh_time', DaySegmentedTimecode), - ('last_packet_esh_time', DaySegmentedTimecode), - ('rs_corrected_packets', c.c_uint32), - ('total_packets', c.c_uint32), - ('total_bytes', c.c_uint64), - ('spare_3', c.c_uint64)] + _fields_ = [ + ("first_packet_time", DaySegmentedTimecode), + ("last_packet_time", DaySegmentedTimecode), + ("first_packet_esh_time", DaySegmentedTimecode), + ("last_packet_esh_time", DaySegmentedTimecode), + ("rs_corrected_packets", c.c_uint32), + ("total_packets", c.c_uint32), + ("total_bytes", c.c_uint64), + ("spare_3", c.c_uint64), + ] class Main3Struct(BaseStruct): - _fields_ = [('spare_4', c.c_uint8 * 3), - ('file_count', c.c_uint8)] + _fields_ = [("spare_4", c.c_uint8 * 3), ("file_count", c.c_uint8)] class FileStruct(BaseStruct): - _fields_ = [('file_name', c.c_char * 40), - ('spare_1', c.c_uint8 * 3), - ('apid_count', c.c_uint8)] + _fields_ = [ + ("file_name", c.c_char * 40), + ("spare_1", c.c_uint8 * 3), + ("apid_count", c.c_uint8), + ] class FileApidStruct(BaseStruct): - _fields_ = [('spare_1', c.c_uint8 * 1), - ('scid', c.c_uint8), - ('apid', c.c_uint16), - ('first_packet_time', DaySegmentedTimecode), - ('last_packet_time', DaySegmentedTimecode), - ('spare_2', c.c_uint8 * 4)] + _fields_ = [ + ("spare_1", c.c_uint8 * 1), + ("scid", c.c_uint8), + ("apid", c.c_uint16), + ("first_packet_time", DaySegmentedTimecode), + ("last_packet_time", DaySegmentedTimecode), + ("spare_2", c.c_uint8 * 4), + ] diff --git a/edosl0util/headers.py b/edosl0util/headers.py index 12b319e7ace0a9f138251b42a4612624a06c314c..345e8e74761742a8ba8a1eb66455a2b2edcae257 100644 --- a/edosl0util/headers.py +++ b/edosl0util/headers.py @@ -28,11 +28,17 @@ class BaseStruct(c.BigEndianStructure): _pack_ = 1 def __str__(self): - return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >" + return ( + "<" + + " ".join("%s=%s" % (f[0], getattr(self, f[0])) for f in self._fields_) + + " >" + ) def __repr__(self): - fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_) - return '<%s (%s)>' % (self.__class__.__name__, fields) + fields = ", ".join( + "%s=%s" % (f[0], repr(getattr(self, f[0]))) for f in self._fields_ + ) + return "<%s (%s)>" % (self.__class__.__name__, fields) def __eq__(self, other): return all(getattr(self, f[0]) == getattr(other, f[0]) for f in self._fields_) @@ -42,14 +48,15 @@ class PrimaryHeader(BaseStruct): """ CCSDS Primary Header. """ + _fields_ = [ - ('version_number', c.c_uint8, 3), # == 0 - ('type_indicator', c.c_uint8, 1), - ('secondary_header_flag', c.c_uint8, 1), - ('apid', c.c_uint16, 11), - ('sequence_grouping', c.c_uint16, 2), - ('source_sequence_count', c.c_uint16, 14), - ('data_length_minus1', c.c_uint16) # octet count minus one + ("version_number", c.c_uint8, 3), # == 0 + ("type_indicator", c.c_uint8, 1), + ("secondary_header_flag", c.c_uint8, 1), + ("apid", c.c_uint16, 11), + ("sequence_grouping", c.c_uint16, 2), + ("source_sequence_count", c.c_uint16, 14), + ("data_length_minus1", c.c_uint16), # octet count minus one ] @@ -84,15 +91,16 @@ class AquaCucTimecode(Timecode): FIXME: Not tested or validated!! """ + _fields_ = [ - ('extension_flag', c.c_uint8, 1), - ('timecode_epoch', c.c_uint8, 3), - ('coarse_time_count', c.c_uint8, 2), - ('fine_time_count', c.c_uint8, 2), - ('no_continuation_flag', c.c_uint8, 1), - ('leap_seconds', c.c_uint8, 7), - ('seconds', c.c_uint32), - ('sub_seconds', c.c_uint16) + ("extension_flag", c.c_uint8, 1), + ("timecode_epoch", c.c_uint8, 3), + ("coarse_time_count", c.c_uint8, 2), + ("fine_time_count", c.c_uint8, 2), + ("no_continuation_flag", c.c_uint8, 1), + ("leap_seconds", c.c_uint8, 7), + ("seconds", c.c_uint32), + ("sub_seconds", c.c_uint16), ] EPOCH = datetime(1958, 1, 1) @@ -102,20 +110,19 @@ class AquaCucTimecode(Timecode): def day_segmented_timecode(self): micros = self.SUB_SECOND_UNITS * self.sub_seconds seconds = self.seconds + self.leap_seconds - return ((seconds - self.leap_seconds) // 86400, - micros // 1e3, - micros % 1e3) + return ((seconds - self.leap_seconds) // 86400, micros // 1e3, micros % 1e3) class DaySegmentedTimecode(Timecode): """ CCSDS Day Segmented Timecode """ + _pack_ = 1 _fields_ = [ - ('days', c.c_uint16), - ('milliseconds', c.c_uint32), - ('microseconds', c.c_uint16) + ("days", c.c_uint16), + ("milliseconds", c.c_uint32), + ("microseconds", c.c_uint16), ] def day_segmented_timecode(self): @@ -123,54 +130,46 @@ class DaySegmentedTimecode(Timecode): class AquaGirdSecondaryHeader(BaseStruct): - _fields_ = [ - ('flags', c.c_uint8), - ('timecode', AquaCucTimecode), - ] + _fields_ = [("flags", c.c_uint8), ("timecode", AquaCucTimecode)] class AquaGiisSecondaryHeader(BaseStruct): _fields_ = [ - ('timecode', DaySegmentedTimecode), - ('quicklook_flag', c.c_uint8, 1), - ('user_flags', c.c_uint8, 7) + ("timecode", DaySegmentedTimecode), + ("quicklook_flag", c.c_uint8, 1), + ("user_flags", c.c_uint8, 7), ] class AquaSpacecraftBusSecondaryHeader(BaseStruct): - _fields_ = [ - ('timecode', AquaCucTimecode) - ] + _fields_ = [("timecode", AquaCucTimecode)] class JpssSecondaryHeader(BaseStruct): """ Secondary header using Day Segmented timecodes. """ + _pack_ = 1 - _fields_ = [ - ('timecode', DaySegmentedTimecode) - ] + _fields_ = [("timecode", DaySegmentedTimecode)] class JpssFirstSecondaryHeader(BaseStruct): """ Secondary header using Day Segmented timecodes and with a packet count. """ + _pack_ = 1 _fields_ = [ - ('timecode', DaySegmentedTimecode), - ('packet_count', c.c_uint8), - ('_spare', c.c_uint8) + ("timecode", DaySegmentedTimecode), + ("packet_count", c.c_uint8), + ("_spare", c.c_uint8), ] class ViirsPacketId(BaseStruct): _pack_ = 1 - _fields_ = [ - ('scan_number', c.c_uint32), - ('packet_time', DaySegmentedTimecode), - ] + _fields_ = [("scan_number", c.c_uint32), ("packet_time", DaySegmentedTimecode)] _jpss_headers = { @@ -188,14 +187,18 @@ def jpss_header_lookup(primary_header): def amsu_headers(): apids = [ # AMSU-A1 - 257, 259, 260, 261, 262, + 257, + 259, + 260, + 261, + 262, # AMSU-A2 - 288, 289, 290 + 288, + 289, + 290, ] flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE] - return {(apid, flag): AquaGirdSecondaryHeader - for apid in apids - for flag in flags} + return {(apid, flag): AquaGirdSecondaryHeader for apid in apids for flag in flags} def airs_headers(): @@ -215,7 +218,7 @@ def airs_headers(): def hsb_headers(): return { # HSB - (342, GROUP_STANDALONE): AquaGirdSecondaryHeader, + (342, GROUP_STANDALONE): AquaGirdSecondaryHeader } @@ -233,9 +236,15 @@ def modis_headers(): def ceres_headers(): apids = ( # CERES+Y - 141, 142, 143, 144, + 141, + 142, + 143, + 144, # CERES-Y - 157, 158, 159, 160 + 157, + 158, + 159, + 160, ) groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE) return {(a, g): AquaGiisSecondaryHeader for a in apids for g in groupings} diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index 8e56ee366125c125958046d2092fa7f32870b2a3..b30ae498f2ca4d0a830f04623251b9097c773246 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -7,48 +7,43 @@ Code for reading/writing/manipulating JPSS Common RDR files as documented in: http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf """ -import glob - -from edosl0util.stream import jpss_packet_stream - -__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." - import ctypes as c +import glob import logging import os from collections import namedtuple import numpy as np +from edosl0util.stream import jpss_packet_stream from h5py import File as H5File from .headers import BaseStruct -from .merge import merge, VIIRS_APID_ORDER +from .merge import VIIRS_APID_ORDER, merge + +__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." + LOG = logging.getLogger(__name__) -satellite_to_scid = { - 'npp': 157, - 'snpp': 157, - 'j01': 159, - 'jpss1': 159, -} +satellite_to_scid = {"npp": 157, "snpp": 157, "j01": 159, "jpss1": 159} class StaticHeader(BaseStruct): """ Common RDR static header. """ + _fields_ = [ - ('satellite', c.c_char * 4), - ('sensor', c.c_char * 16), - ('type_id', c.c_char * 16), - ('num_apids', c.c_uint32), - ('apid_list_offset', c.c_uint32), - ('pkt_tracker_offset', c.c_uint32), - ('ap_storage_offset', c.c_uint32), - ('next_pkt_pos', c.c_uint32), - ('start_boundary', c.c_uint64), - ('end_boundary', c.c_uint64), + ("satellite", c.c_char * 4), + ("sensor", c.c_char * 16), + ("type_id", c.c_char * 16), + ("num_apids", c.c_uint32), + ("apid_list_offset", c.c_uint32), + ("pkt_tracker_offset", c.c_uint32), + ("ap_storage_offset", c.c_uint32), + ("next_pkt_pos", c.c_uint32), + ("start_boundary", c.c_uint64), + ("end_boundary", c.c_uint64), ] @@ -56,12 +51,13 @@ class Apid(BaseStruct): """ Entry in the ApidList storage area. """ + _fields_ = [ - ('name', c.c_char * 16), - ('value', c.c_uint32), - ('pkt_tracker_start_idx', c.c_uint32), - ('pkts_reserved', c.c_uint32), - ('pkts_received', c.c_uint32), + ("name", c.c_char * 16), + ("value", c.c_uint32), + ("pkt_tracker_start_idx", c.c_uint32), + ("pkts_reserved", c.c_uint32), + ("pkts_received", c.c_uint32), ] @@ -69,12 +65,13 @@ class PacketTracker(BaseStruct): """ Entry in the PacketTracker storage area. """ + _fields_ = [ - ('obs_time', c.c_int64), - ('sequence_number', c.c_int32), - ('size', c.c_int32), - ('offset', c.c_int32), - ('fill_percent', c.c_int32), + ("obs_time", c.c_int64), + ("sequence_number", c.c_int32), + ("size", c.c_int32), + ("offset", c.c_int32), + ("fill_percent", c.c_int32), ] @@ -88,23 +85,23 @@ def _make_packet_impl(size): class PktImpl(BaseStruct): # noqa _fields_ = [ - ('version', c.c_uint8, 3), - ('type', c.c_uint8, 1), - ('secondary_header', c.c_uint8, 1), - ('apid', c.c_uint16, 11), - ('sequence_grouping', c.c_uint8, 2), - ('sequence_number', c.c_uint16, 14), - ('length_minus1', c.c_uint16), - ('data', c.c_uint8 * data_size) + ("version", c.c_uint8, 3), + ("type", c.c_uint8, 1), + ("secondary_header", c.c_uint8, 1), + ("apid", c.c_uint16, 11), + ("sequence_grouping", c.c_uint8, 2), + ("sequence_number", c.c_uint16, 14), + ("length_minus1", c.c_uint16), + ("data", c.c_uint8 * data_size), ] return PktImpl -Packet = namedtuple('Packet', ('tracker', 'packet')) +Packet = namedtuple("Packet", ("tracker", "packet")) -class CommonRdr(namedtuple('CommonRdr', ('buf', 'header', 'apids'))): +class CommonRdr(namedtuple("CommonRdr", ("buf", "header", "apids"))): """ A single common rdr as found in a RawApplicationPackets_X dataset. """ @@ -131,8 +128,9 @@ def _packets_for_apid(buf, header, apid): """ Generate tuples of (PacketTracker, Packet) """ - t_off = header.pkt_tracker_offset + \ - apid.pkt_tracker_start_idx * c.sizeof(PacketTracker) + t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof( + PacketTracker + ) for idx in range(apid.pkts_received): tracker = PacketTracker.from_buffer(buf, t_off) t_off += c.sizeof(PacketTracker) @@ -158,8 +156,8 @@ def _read_apid_list(header, buf): def _sorted_packet_dataset_names(names): - names = [k for k in names if k.startswith('RawApplicationPackets_')] - return sorted(names, key=lambda x: int(x.split('_')[-1])) + names = [k for k in names if k.startswith("RawApplicationPackets_")] + return sorted(names, key=lambda x: int(x.split("_")[-1])) def _generate_packet_datasets(group): @@ -170,45 +168,43 @@ def _generate_packet_datasets(group): def _find_data_group(fobj, name, sensors=None): - sensors = sensors or ['viirs', 'cris', 'atms'] + sensors = sensors or ["viirs", "cris", "atms"] for sensor in sensors: - group = fobj.get( - '/All_Data/{}-{}-RDR_All'.format(sensor.upper(), name.upper())) + group = fobj.get("/All_Data/{}-{}-RDR_All".format(sensor.upper(), name.upper())) if group: return group # Some files have CrIS rather than CRIS - if sensor == 'cris': - group = fobj.get( - '/All_Data/{}-{}-RDR_All'.format('CrIS', name.upper())) + if sensor == "cris": + group = fobj.get("/All_Data/{}-{}-RDR_All".format("CrIS", name.upper())) if group: return group def _find_science_group(fobj): - return _find_data_group(fobj, 'SCIENCE') + return _find_data_group(fobj, "SCIENCE") def _find_dwell_group(fobj): - for name in ('HSKDWELL', 'IMDWELL', 'SSMDWELL'): - group = _find_data_group(fobj, name, sensors=['cris']) + for name in ("HSKDWELL", "IMDWELL", "SSMDWELL"): + group = _find_data_group(fobj, name, sensors=["cris"]) if group: return group - group = _find_data_group(fobj, 'DWELL', sensors=['atms']) + group = _find_data_group(fobj, "DWELL", sensors=["atms"]) if group: return group def _find_diag_group(fobj): - return _find_data_group(fobj, 'DIAGNOSTIC', sensors=['cris', 'atms']) + return _find_data_group(fobj, "DIAGNOSTIC", sensors=["cris", "atms"]) def _find_telemetry_group(fobj): - return _find_data_group(fobj, 'TELEMETRY', sensors=['cris', 'atms']) + return _find_data_group(fobj, "TELEMETRY", sensors=["cris", "atms"]) def _find_spacecraft_group(fobj): - return _find_data_group(fobj, 'DIARY', sensors=['SPACECRAFT']) + return _find_data_group(fobj, "DIARY", sensors=["SPACECRAFT"]) def _rdrs_for_packet_dataset(group): @@ -218,7 +214,7 @@ def _rdrs_for_packet_dataset(group): try: rdr = decode_rdr_blob(buf) except ValueError as e: - LOG.warning('{} ({})'.format(e, name)) + LOG.warning("{} ({})".format(e, name)) continue rdrs.append(rdr) return rdrs @@ -226,7 +222,7 @@ def _rdrs_for_packet_dataset(group): def decode_rdr_blob(buf): if buf.shape[0] < c.sizeof(StaticHeader): - raise ValueError('Not enough bytes for static header') + raise ValueError("Not enough bytes for static header") header = StaticHeader.from_buffer(buf) apids = _read_apid_list(header, buf) return CommonRdr(buf, header, list(apids)) @@ -251,7 +247,7 @@ def rdr_ccsds_packets(rdr, skipfill=False): """ for packet in rdr.packets(): if packet.tracker.fill_percent != 0: - LOG.debug('fill: %s', packet.tracker) + LOG.debug("fill: %s", packet.tracker) if skipfill: continue yield packet.packet @@ -289,109 +285,128 @@ def write_rdr_datasets(filepath, ancillary=False, skipfill=False): for typekey in rdrs: if not rdrs[typekey]: continue - if ancillary and typekey == 'ancillary': - for idx, rdr in enumerate(rdrs['ancillary']): - packets = {a.value: rdr.packets_for_apid(a) - for a in rdr.apids} + if ancillary and typekey == "ancillary": + for idx, rdr in enumerate(rdrs["ancillary"]): + packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids} for apid, pkts in packets.items(): LOG.debug( - 'writing ancillary gran %d %s-%s-%s %d', - idx, rdr.header.satellite.decode(), rdr.header.sensor.decode(), - rdr.header.type_id.decode(), apid) - pktfile = '{}.{}{}.pkts'.format(rdrname, typekey, apid) - with open(pktfile, 'ab') as dest: + "writing ancillary gran %d %s-%s-%s %d", + idx, + rdr.header.satellite.decode(), + rdr.header.sensor.decode(), + rdr.header.type_id.decode(), + apid, + ) + pktfile = "{}.{}{}.pkts".format(rdrname, typekey, apid) + with open(pktfile, "ab") as dest: _write_packets(pkts, dest, skipfill) else: - pktfile = '{}.{}.pkts'.format(rdrname, typekey) - LOG.debug('writing %s', pktfile) - with open(pktfile, 'wb') as dest: + pktfile = "{}.{}.pkts".format(rdrname, typekey) + LOG.debug("writing %s", pktfile) + with open(pktfile, "wb") as dest: for idx, rdr in enumerate(rdrs[typekey]): LOG.debug( - '... %s gran %d %s-%s-%s', typekey, - idx, rdr.header.satellite.decode(), rdr.header.sensor.decode(), - rdr.header.type_id.decode()) + "... %s gran %d %s-%s-%s", + typekey, + idx, + rdr.header.satellite.decode(), + rdr.header.sensor.decode(), + rdr.header.type_id.decode(), + ) _write_packets(rdr.packets(), dest, skipfill) return rdrs def pdsfilename(product, created): if len(product) < 20: - product = product + 'A' * (20 - len(product)) - return '{}XT{:%y%j%H%M%S}001.PDS'.format(product, created) + product = product + "A" * (20 - len(product)) + return "{}XT{:%y%j%H%M%S}001.PDS".format(product, created) -def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end): - product = 'P{}{}'.format(satellite_to_scid[satellite], product) +def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir): + workdir = workdir or "." + product = "P{}{}".format(satellite_to_scid[satellite], product) for filepath in rdrs: - LOG.info('dumping %s', filepath) + LOG.info("dumping %s", filepath) write_rdr_datasets(filepath, skipfill=True) # alphanumeric sorting to bootstrap final sort inputs = sorted(glob.glob(filepat)) - streams = [jpss_packet_stream(open(f, 'rb')) for f in inputs] + streams = [jpss_packet_stream(open(f, "rb")) for f in inputs] pdsname = pdsfilename(product, start) - LOG.info('merging to %s', pdsname) - order = VIIRS_APID_ORDER if 'VIIRSSCIENCE' in product else None - with open(pdsname, 'wb') as dest: + LOG.info("merging to %s", pdsname) + order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None + with open(os.path.join(workdir, pdsname), "wb") as dest: merge(streams, output=dest, trunc_to=[start, end], apid_order=order) - return pdsname + return os.path.join(workdir, pdsname) if workdir != "." else pdsname -def cris_hsk_to_l0(satellite, rdrs, start, end): - product = '1280CRISHSK' - return _do_rdr_to_l0('*.telemetry.pkts', satellite, product, rdrs, start, end) +def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None): + product = "1280CRISHSK" + return _do_rdr_to_l0( + "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir + ) -def cris_dwell_to_l0(satellite, rdrs, start, end): - product = '1291CRISDWELL' - return _do_rdr_to_l0('*.dwell.pkts', satellite, product, rdrs, start, end) +def cris_dwell_to_l0(satellite, rdrs, start, end, workdir=None): + product = "1291CRISDWELL" + return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir) -def cris_sci_to_l0(satellite, rdrs, start, end): - product = '1289CRISSCIENCE' - return _do_rdr_to_l0('*.science.pkts', satellite, product, rdrs, start, end) +def cris_sci_to_l0(satellite, rdrs, start, end, workdir=None): + product = "1289CRISSCIENCE" + return _do_rdr_to_l0( + "*.science.pkts", satellite, product, rdrs, start, end, workdir + ) -def atms_hsk_to_l0(satellite, rdrs, start, end): - product = '0518ATMSHSK' - return _do_rdr_to_l0('*.telemetry.pkts', satellite, product, rdrs, start, end) +def atms_hsk_to_l0(satellite, rdrs, start, end, workdir=None): + product = "0518ATMSHSK" + return _do_rdr_to_l0( + "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir + ) -def atms_dwell_to_l0(satellite, rdrs, start, end): - product = '0517ATMSDWELL' - return _do_rdr_to_l0('*.dwell.pkts', satellite, product, rdrs, start, end) +def atms_dwell_to_l0(satellite, rdrs, start, end, workdir=None): + product = "0517ATMSDWELL" + return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir) -def atms_sci_to_l0(satellite, rdrs, start, end): - product = '0515ATMSSCIENCE' - return _do_rdr_to_l0('*.science.pkts', satellite, product, rdrs, start, end) +def atms_sci_to_l0(satellite, rdrs, start, end, workdir=None): + product = "0515ATMSSCIENCE" + return _do_rdr_to_l0( + "*.science.pkts", satellite, product, rdrs, start, end, workdir + ) -def viirs_sci_to_l0(satellite, rdrs, start, end): - product = '0826VIIRSSCIENCE' - return _do_rdr_to_l0('*.science.pkts', satellite, product, rdrs, start, end) +def viirs_sci_to_l0(satellite, rdrs, start, end, workdir=None): + product = "0826VIIRSSCIENCE" + return _do_rdr_to_l0( + "*.science.pkts", satellite, product, rdrs, start, end, workdir + ) -def spacecraft_to_l0(satellite, rdrs, start, end): +def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None): + workdir = workdir or "." for filepath in rdrs: - LOG.info('dumping %s', filepath) + LOG.info("dumping %s", filepath) write_rdr_datasets(filepath, ancillary=True, skipfill=True) filenames = [] scid = satellite_to_scid[satellite] for apid in (0, 8, 11): # alphanumeric sorting to bootstrap final sort - inputs = sorted(glob.glob('*.ancillary{}.pkts'.format(apid))) - files = [open(f, 'rb') for f in inputs] + inputs = sorted(glob.glob("*.ancillary{}.pkts".format(apid))) + files = [open(f, "rb") for f in inputs] streams = [jpss_packet_stream(f) for f in files] - product = 'P{}{:04d}'.format(scid, apid) + product = "P{}{:04d}".format(scid, apid) pdsname = pdsfilename(product, start) - LOG.info('merging to %s', pdsname) - with open(pdsname, 'wb') as dest: + LOG.info("merging to %s", pdsname) + with open(os.path.join(workdir, pdsname), "wb") as dest: merge(streams, output=dest, trunc_to=[start, end]) - filenames.append(pdsname) + filenames.append(os.path.join(workdir, pdsname).replace("./", "")) # lots of files so make sure they're closed [f.close() for f in files] @@ -400,14 +415,13 @@ def spacecraft_to_l0(satellite, rdrs, start, end): def rdr_to_l0(satellite, rdrs, start, end): filepaths = [] - for product, filepath in {f.split('_', 1)[0]: f for f in rdrs}.items(): - if 'RNSCA' in product: + for product, filepath in {f.split("_", 1)[0]: f for f in rdrs}.items(): + if "RNSCA" in product: filepaths += spacecraft_to_l0(satellite, [filepath], start, end) - if 'RVIRS' in product: - filepaths.append(viirs_sci_to_l0( - satellite, [filepath], start, end)) - if 'RCRIS' in product: + if "RVIRS" in product: + filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end)) + if "RCRIS" in product: filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end)) - if 'RATMS' in product: + if "RATMS" in product: filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end)) return filepaths diff --git a/edosl0util/merge.py b/edosl0util/merge.py index b41dc0cfcae80724516b9d352d97e25ef2209231..ad1d304b89aa478336a416a41f383da84bb421fe 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -33,12 +33,13 @@ class _Ptr(object): self.count = 1 def __repr__(self): - attrs = ' '.join( - '{}={}'.format(k, v) + attrs = " ".join( + "{}={}".format(k, v) for k, v in sorted(vars(self).items()) - if not k.startswith('_')) + if not k.startswith("_") + ) - return '<{:s} {:s}>'.format(self.__class__.__name__, attrs) + return "<{:s} {:s}>".format(self.__class__.__name__, attrs) def __eq__(self, that): return (self.stamp, self.apid) == (that.stamp, that.apid) @@ -51,7 +52,7 @@ class _Ptr(object): # hash by stamp, apid, size so we can dedup in index using set def __hash__(self): - return hash((self.stamp, self.apid, self.size)) + return hash((self.stamp, self.apid, self.size)) def bytes(self): self.fobj.seek(self.offset, os.SEEK_SET) @@ -68,7 +69,7 @@ def read_packet_index(stream): packet = stream.next() count += 1 if count: - LOG.info('dropped %d leading packets', count) + LOG.info("dropped %d leading packets", count) while True: if not packet.stamp: @@ -106,7 +107,9 @@ def _sort_by_time_apid(index, order=None): Sort pointers by time and apid in-place. """ if order: - index = sorted(index, key=lambda p: order.index(p.apid) if p.apid in order else -1) + index = sorted( + index, key=lambda p: order.index(p.apid) if p.apid in order else -1 + ) else: index = sorted(index, key=lambda p: p.apid) return sorted(index, key=lambda p: p.stamp) @@ -144,15 +147,15 @@ def merge(streams, output, trunc_to=None, apid_order=None): """ index = set() # will remove duplicates for stream in streams: - LOG.debug('indexing %s', stream) + LOG.debug("indexing %s", stream) index.update(read_packet_index(stream)) - LOG.debug('sorting index with %d pointers', len(index)) + LOG.debug("sorting index with %d pointers", len(index)) index = _sort_by_time_apid(index, order=apid_order) index = _filter_duplicates_by_size(index) - LOG.debug('writing index to %s', output) + LOG.debug("writing index to %s", output) for ptr in index: if trunc_to: if trunc_to[0] <= ptr.stamp < trunc_to[1]: diff --git a/edosl0util/pkted.py b/edosl0util/pkted.py index 478c8a643dbb0769702b696fffe69953c6eb6a7f..2190a4db5d590076eca0e9c37156436c8d92b775 100644 --- a/edosl0util/pkted.py +++ b/edosl0util/pkted.py @@ -5,6 +5,7 @@ import numpy as np from edosl0util.stream import BasicStream + @attr.s class PacketEditSpec(object): apid = attr.ib() @@ -32,7 +33,7 @@ def edit_packets(file_path, specs): new_value = spec.new_value(old_value) else: new_value = np.asarray(spec.new_value) - new_value = new_value.astype(new_value.dtype.newbyteorder('>')) + new_value = new_value.astype(new_value.dtype.newbyteorder(">")) new_value = np.unpackbits(new_value[None].view(np.uint8))[-sz:] bits[ini:fin] = new_value pkt.write(np.packbits(bits).tobytes()) @@ -41,7 +42,7 @@ def edit_packets(file_path, specs): class EditablePacketStream(object): def __init__(self, file_path): self._tracker_stream = BasicStream(open(file_path), with_data=False) - self._file = open(file_path, 'r+') + self._file = open(file_path, "r+") def __iter__(self): return self diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index adbc60aeb5d48acaf26f65cd79d637e6549c254b..444df09e10d4b1e11d8a161b6a0e3f29a2292e1f 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -12,7 +12,11 @@ import numpy as np import edosl0util from edosl0util import compat from edosl0util.jpssrdr import ( - StaticHeader, Apid as ApidListItem, PacketTracker, decode_rdr_blob) + StaticHeader, + Apid as ApidListItem, + PacketTracker, + decode_rdr_blob, +) from edosl0util.stream import jpss_packet_stream from edosl0util.timecode import cds_to_iet, iet_to_dt @@ -20,15 +24,22 @@ from edosl0util.timecode import cds_to_iet, iet_to_dt def packets_to_rdrs(sat, l0_files, **kwargs): def iter_pkts(l0_files): for l0_file in l0_files: - with open(l0_file, 'rb') as l0_file_obj: + with open(l0_file, "rb") as l0_file_obj: for pkt in jpss_packet_stream(l0_file_obj): yield pkt - build_rdr(sat, iter_pkts(l0_files), **kwargs) + return build_rdr(sat, iter_pkts(l0_files), **kwargs) -def build_rdr(sat, pkt_iter, output_dir='.', aggr_type='idps', aggr_level=None, - diary_cushion=10000000, attr_overrides={}): +def build_rdr( + sat, + pkt_iter, + output_dir=".", + aggr_type="idps", + aggr_level=None, + diary_cushion=10000000, + attr_overrides={}, +): """Construct RDR file(s) from L0 packets Default aggregation behavior uses file boundaries computed in the same way @@ -56,28 +67,35 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_type='idps', aggr_level=None, rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) primary_type, packaged_type = process_rdr_types(rdr_types, force_packaging=False) rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2)) - if aggr_type == 'idps': + if aggr_type == "idps": aggr_level = aggr_level or primary_type.default_aggregation - primary_aggr_iets = sorted(set( - get_aggregate_start(sat, primary_type.gran_len, aggr_level, gran_iet) - for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type)) - elif aggr_type == 'full': + primary_aggr_iets = sorted( + set( + get_aggregate_start(sat, primary_type.gran_len, aggr_level, gran_iet) + for (rdr_type, gran_iet) in gran_infos + if rdr_type is primary_type + ) + ) + elif aggr_type == "full": # produce a single output file, ignoring IDPS-style aggregation boundaries assert aggr_level is None - first_gran_iet = min(gran_iet for (rdr_type, gran_iet) in gran_infos - if rdr_type is primary_type) - last_gran_iet = max(gran_iet for (rdr_type, gran_iet) in gran_infos - if rdr_type is primary_type) + first_gran_iet = min( + gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type + ) + last_gran_iet = max( + gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type + ) aggr_level = (last_gran_iet - first_gran_iet) // primary_type.gran_len + 1 primary_aggr_iets = [first_gran_iet] else: - raise ValueError('aggr_type must be idps or input') + raise ValueError("aggr_type must be idps or input") # now generate the RDRs rdr_files = [] for aggr_iet in primary_aggr_iets: - rdr_writer = RdrWriter(sat, rdr_types, aggr_iet, aggr_level, output_dir, - **attr_overrides) + rdr_writer = RdrWriter( + sat, rdr_types, aggr_iet, aggr_level, output_dir, **attr_overrides + ) rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) gran_iets = [aggr_iet + i * primary_type.gran_len for i in range(aggr_level)] for gran_iet in gran_iets: @@ -87,10 +105,14 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_type='idps', aggr_level=None, rdr_writer.write_granule(primary_type, gran_iet, blob) if packaged_type: packaged_gran_iets = get_overlapping_granules( - sat, packaged_type.gran_len, aggr_iet - diary_cushion, - aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1) + sat, + packaged_type.gran_len, + aggr_iet - diary_cushion, + aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1, + ) rdr_writer.write_aggregate( - packaged_type, packaged_gran_iets[0], len(packaged_gran_iets)) + packaged_type, packaged_gran_iets[0], len(packaged_gran_iets) + ) for gran_iet in packaged_gran_iets: with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file: pkts = list(jpss_packet_stream(pkt_file)) @@ -114,22 +136,25 @@ def process_rdr_types(given_rdr_types, force_packaging): """ rdr_types = set(given_rdr_types) if not rdr_types: - raise RdrgenError('No RDR types to generate!') - sci_type = {t for t in rdr_types if t.type_id == 'SCIENCE'} + raise RdrgenError("No RDR types to generate!") + sci_type = {t for t in rdr_types if t.type_id == "SCIENCE"} rdr_types -= sci_type if len(sci_type) > 1: raise RdrgenError( - 'Cannot process more than 1 science RDR type at once ' - + '(have {})'.format(', '.join(t.short_name for t in sci_type))) + "Cannot process more than 1 science RDR type at once " + + "(have {})".format(", ".join(t.short_name for t in sci_type)) + ) diary_type = {t for t in rdr_types if t is SpacecraftDiaryRdrType} rdr_types -= diary_type if rdr_types: raise RdrgenError( - 'Unsupported RDR type(s): ' + ', '.join(t.short_name for t in rdr_types)) + "Unsupported RDR type(s): " + ", ".join(t.short_name for t in rdr_types) + ) if sci_type: primary_type, = sci_type packaged_type = ( - SpacecraftDiaryRdrType if (diary_type or force_packaging) else None) + SpacecraftDiaryRdrType if (diary_type or force_packaging) else None + ) else: primary_type = SpacecraftDiaryRdrType packaged_type = None @@ -142,7 +167,7 @@ class BinnedTemporaryFileManager(object): Limits the number of file handles kept open at a time. """ - def __init__(self, parent_dir='.', max_open_files=32): + def __init__(self, parent_dir=".", max_open_files=32): self.max_open_files = max_open_files self.dir = tempfile.mkdtemp(dir=parent_dir) self._file_paths = {} @@ -157,9 +182,11 @@ class BinnedTemporaryFileManager(object): if not file_obj: file_path = self._file_paths.get(bin_key) if file_path: - file_obj = open(file_path, 'a+b') + file_obj = open(file_path, "a+b") else: - file_obj = tempfile.NamedTemporaryFile(dir=self.dir, delete=False, mode='wb') + file_obj = tempfile.NamedTemporaryFile( + dir=self.dir, delete=False, mode="wb" + ) file_path = file_obj.name self._file_paths[bin_key] = file_path if len(self._file_objs) == self.max_open_files: @@ -179,13 +206,13 @@ class BinnedTemporaryFileManager(object): file_obj = self._file_objs.pop(bin_key, None) if file_obj: file_obj.close() - file_path = self._file_paths.pop(bin_key, '/dev/null') - file_obj = open(file_path, 'rb') + file_path = self._file_paths.pop(bin_key, "/dev/null") + file_obj = open(file_path, "rb") try: yield file_obj finally: file_obj.close() - if file_path != '/dev/null': + if file_path != "/dev/null": os.remove(file_path) def clean_up(self): @@ -194,9 +221,21 @@ class BinnedTemporaryFileManager(object): class RdrWriter(object): - def __init__(self, sat, rdr_types, aggr_iet, aggr_level, dir_path='.', - distributor=None, origin=None, domain=None, compressed=False, - orbit_num=0, creation_time=None, software_ver=None): + def __init__( + self, + sat, + rdr_types, + aggr_iet, + aggr_level, + dir_path=".", + distributor=None, + origin=None, + domain=None, + compressed=False, + orbit_num=0, + creation_time=None, + software_ver=None, + ): self._sat = sat origin = origin or self.default_origin distributor = distributor or origin @@ -204,91 +243,125 @@ class RdrWriter(object): self._orbit_num = orbit_num self._creation_time = creation_time or datetime.now() self._software_ver = ( - software_ver or edosl0util.__name__ + '-' + edosl0util.__version__) + software_ver or edosl0util.__name__ + "-" + edosl0util.__version__ + ) aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len self.file_name = make_rdr_filename( - rdr_types, self._sat, aggr_iet, aggr_end_iet, self._orbit_num, - self._creation_time, origin, self._domain, compressed) - self._h5_file = h5py.File(os.path.join(dir_path, self.file_name), 'w') + rdr_types, + self._sat, + aggr_iet, + aggr_end_iet, + self._orbit_num, + self._creation_time, + origin, + self._domain, + compressed, + ) + self._h5_file = h5py.File(os.path.join(dir_path, self.file_name), "w") self._write_skeleton(rdr_types, distributor, origin) self._aggr_starts = {} def _write_skeleton(self, rdr_types, distributor, origin): - self._set_h5_attrs(self._h5_file, { - 'Distributor': distributor, - 'Mission_Name': 'S-NPP/JPSS', # FIXME: what will this be for J1? - 'N_Dataset_Source': origin, - 'N_HDF_Creation_Date': self._format_date_attr(self._creation_time), - 'N_HDF_Creation_Time': self._format_time_attr(self._creation_time), - 'Platform_Short_Name': platform_short_names[self._sat]}) - all_grp = self._h5_file.create_group('All_Data') - prod_grp = self._h5_file.create_group('Data_Products') + self._set_h5_attrs( + self._h5_file, + { + "Distributor": distributor, + "Mission_Name": "S-NPP/JPSS", # FIXME: what will this be for J1? + "N_Dataset_Source": origin, + "N_HDF_Creation_Date": self._format_date_attr(self._creation_time), + "N_HDF_Creation_Time": self._format_time_attr(self._creation_time), + "Platform_Short_Name": platform_short_names[self._sat], + }, + ) + all_grp = self._h5_file.create_group("All_Data") + prod_grp = self._h5_file.create_group("Data_Products") for rdr_type in rdr_types: - all_grp.create_group(rdr_type.short_name + '_All') + all_grp.create_group(rdr_type.short_name + "_All") gran_grp = prod_grp.create_group(rdr_type.short_name) - self._set_h5_attrs(gran_grp, { - 'Instrument_Short_Name': instrument_short_names[rdr_type.sensor], - 'N_Collection_Short_Name': rdr_type.short_name, - 'N_Dataset_Type_Tag': 'RDR', - 'N_Processing_Domain': self._domain}) + self._set_h5_attrs( + gran_grp, + { + "Instrument_Short_Name": instrument_short_names[rdr_type.sensor], + "N_Collection_Short_Name": rdr_type.short_name, + "N_Dataset_Type_Tag": "RDR", + "N_Processing_Domain": self._domain, + }, + ) def write_aggregate(self, rdr_type, aggr_iet, num_grans): self._aggr_starts[rdr_type] = aggr_iet - grp = self._h5_file['Data_Products'][rdr_type.short_name] + grp = self._h5_file["Data_Products"][rdr_type.short_name] ds = grp.create_dataset( - rdr_type.short_name + '_Aggr', [1], self._h5_ref_dtype, maxshape=[None]) - ds[0] = self._h5_file['All_Data/{}_All'.format(rdr_type.short_name)].ref + rdr_type.short_name + "_Aggr", [1], self._h5_ref_dtype, maxshape=[None] + ) + ds[0] = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)].ref aggr_end_iet = aggr_iet + num_grans * rdr_type.gran_len last_gran_iet = aggr_end_iet - rdr_type.gran_len - self._set_h5_attrs(ds, { - 'AggregateBeginningDate': self._format_date_attr(aggr_iet), - 'AggregateBeginningGranuleID': make_granule_id(self._sat, aggr_iet), - 'AggregateBeginningOrbitNumber': np.uint64(self._orbit_num), - 'AggregateBeginningTime': self._format_time_attr(aggr_iet), - 'AggregateEndingDate': self._format_date_attr(aggr_end_iet), - 'AggregateEndingGranuleID': make_granule_id(self._sat, last_gran_iet), - 'AggregateEndingOrbitNumber': np.uint64(self._orbit_num), - 'AggregateEndingTime': self._format_time_attr(aggr_end_iet), - 'AggregateNumberGranules': np.uint64(num_grans)}) + self._set_h5_attrs( + ds, + { + "AggregateBeginningDate": self._format_date_attr(aggr_iet), + "AggregateBeginningGranuleID": make_granule_id(self._sat, aggr_iet), + "AggregateBeginningOrbitNumber": np.uint64(self._orbit_num), + "AggregateBeginningTime": self._format_time_attr(aggr_iet), + "AggregateEndingDate": self._format_date_attr(aggr_end_iet), + "AggregateEndingGranuleID": make_granule_id(self._sat, last_gran_iet), + "AggregateEndingOrbitNumber": np.uint64(self._orbit_num), + "AggregateEndingTime": self._format_time_attr(aggr_end_iet), + "AggregateNumberGranules": np.uint64(num_grans), + }, + ) def write_granule(self, rdr_type, gran_iet, blob, creation_time=None): - raw_grp = self._h5_file['All_Data/{}_All'.format(rdr_type.short_name)] + raw_grp = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)] gran_idx = (gran_iet - self._aggr_starts[rdr_type]) // rdr_type.gran_len raw_ds = raw_grp.create_dataset( - 'RawApplicationPackets_{}'.format(gran_idx), data=blob, maxshape=[None]) - gran_grp = self._h5_file['Data_Products'][rdr_type.short_name] + "RawApplicationPackets_{}".format(gran_idx), data=blob, maxshape=[None] + ) + gran_grp = self._h5_file["Data_Products"][rdr_type.short_name] gran_ds = gran_grp.create_dataset( - rdr_type.short_name + '_Gran_{}'.format(gran_idx), - [1], self._h5_regionref_dtype, maxshape=[None]) + rdr_type.short_name + "_Gran_{}".format(gran_idx), + [1], + self._h5_regionref_dtype, + maxshape=[None], + ) gran_ds[0] = raw_ds.regionref[:] gran_end_iet = gran_iet + rdr_type.gran_len creation_time = creation_time or self._creation_time or datetime.now() gran_id = make_granule_id(self._sat, gran_iet) - gran_ver = 'A1' + gran_ver = "A1" blob_info = decode_rdr_blob(blob) - self._set_h5_attrs(gran_ds, { - 'Beginning_Date': self._format_date_attr(gran_iet), - 'Beginning_Time': self._format_time_attr(gran_iet), - 'Ending_Date': self._format_date_attr(gran_end_iet), - 'Ending_Time': self._format_time_attr(gran_end_iet), - 'N_Beginning_Orbit_Number': np.uint64(self._orbit_num), - 'N_Beginning_Time_IET': np.uint64(gran_iet), - 'N_Creation_Date': self._format_date_attr(creation_time), - 'N_Creation_Time': self._format_time_attr(creation_time), - 'N_Ending_Time_IET': np.uint64(gran_end_iet), - 'N_Granule_ID': gran_id, - 'N_Granule_Status': 'N/A', - 'N_Granule_Version': gran_ver, - 'N_IDPS_Mode': self._domain, - 'N_JPSS_Document_Ref': rdr_type.document, - 'N_LEOA_Flag': 'Off', - 'N_Packet_Type': [a.name for a in blob_info.apids], - 'N_Packet_Type_Count': [np.uint64(a.pkts_received) for a in blob_info.apids], - 'N_Percent_Missing_Data': np.float32(self._calc_percent_missing(blob_info)), - 'N_Primary_Label': 'Primary', # TODO: find out what this is - 'N_Reference_ID': ':'.join([rdr_type.short_name, gran_id, gran_ver]), - 'N_Software_Version': self._software_ver}) + self._set_h5_attrs( + gran_ds, + { + "Beginning_Date": self._format_date_attr(gran_iet), + "Beginning_Time": self._format_time_attr(gran_iet), + "Ending_Date": self._format_date_attr(gran_end_iet), + "Ending_Time": self._format_time_attr(gran_end_iet), + "N_Beginning_Orbit_Number": np.uint64(self._orbit_num), + "N_Beginning_Time_IET": np.uint64(gran_iet), + "N_Creation_Date": self._format_date_attr(creation_time), + "N_Creation_Time": self._format_time_attr(creation_time), + "N_Ending_Time_IET": np.uint64(gran_end_iet), + "N_Granule_ID": gran_id, + "N_Granule_Status": "N/A", + "N_Granule_Version": gran_ver, + "N_IDPS_Mode": self._domain, + "N_JPSS_Document_Ref": rdr_type.document, + "N_LEOA_Flag": "Off", + "N_Packet_Type": [a.name for a in blob_info.apids], + "N_Packet_Type_Count": [ + np.uint64(a.pkts_received) for a in blob_info.apids + ], + "N_Percent_Missing_Data": np.float32( + self._calc_percent_missing(blob_info) + ), + "N_Primary_Label": "Primary", # TODO: find out what this is + "N_Reference_ID": ":".join([rdr_type.short_name, gran_id, gran_ver]), + "N_Software_Version": self._software_ver, + }, + ) def close(self): self._h5_file.close() @@ -306,20 +379,20 @@ class RdrWriter(object): @staticmethod def _format_date_attr(t): - return iet_to_datetime(t).strftime('%Y%m%d') + return iet_to_datetime(t).strftime("%Y%m%d") @staticmethod def _format_time_attr(t): - return iet_to_datetime(t).strftime('%H%M%S.%fZ') + return iet_to_datetime(t).strftime("%H%M%S.%fZ") @staticmethod def _calc_percent_missing(common_rdr): total_received = sum(a.pkts_received for a in common_rdr.apids) total_reserved = sum(a.pkts_reserved for a in common_rdr.apids) - return 100. * (total_reserved - total_received) / total_reserved + return 100.0 * (total_reserved - total_received) / total_reserved - default_origin = 'ssec' - default_domain = 'dev' + default_origin = "ssec" + default_domain = "dev" _h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference) _h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference) @@ -350,33 +423,37 @@ def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): all_pkts = [] for apid in rdr_type.apids: apid_info[apid.num] = { - 'name': apid.name.encode(), - 'pkts_reserved': apid.max_expected, - 'pkts_received': 0, - 'first_tracker_index': total_trackers, - 'pkt_info': [{} for _ in range(apid.max_expected)]} + "name": apid.name.encode(), + "pkts_reserved": apid.max_expected, + "pkts_received": 0, + "first_tracker_index": total_trackers, + "pkt_info": [{} for _ in range(apid.max_expected)], + } total_trackers += apid.max_expected for pkt in pkt_stream: if pkt.apid not in apid_info: raise ValueError( - 'APID {} not expected for {}'.format(pkt.apid, rdr_type.short_name)) + "APID {} not expected for {}".format(pkt.apid, rdr_type.short_name) + ) pkt_iet = get_jpss_packet_time(pkt) if not granule_iet <= pkt_iet < granule_iet_end: - raise ValueError('packet stream crosses granule boundary') + raise ValueError("packet stream crosses granule boundary") info = apid_info[pkt.apid] - pkt_info = info['pkt_info'][info['pkts_received']] - pkt_info['obs_time'] = pkt_iet - pkt_info['seq_num'] = pkt.seqid - pkt_info['size'] = pkt.size - pkt_info['offset'] = total_pkt_size - info['pkts_received'] += 1 + pkt_info = info["pkt_info"][info["pkts_received"]] + pkt_info["obs_time"] = pkt_iet + pkt_info["seq_num"] = pkt.seqid + pkt_info["size"] = pkt.size + pkt_info["offset"] = total_pkt_size + info["pkts_received"] += 1 total_pkt_size += pkt.size all_pkts.append(pkt) apid_list_offset = ctypes.sizeof(StaticHeader) pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem) - ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof(PacketTracker) + ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof( + PacketTracker + ) buf_size = ap_storage_offset + total_pkt_size buf = np.zeros([buf_size], np.uint8) # zeros needed to null-pad strings @@ -395,21 +472,22 @@ def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): for i, (apid, info) in enumerate(apid_info.items()): offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem) item = ApidListItem.from_buffer(buf, offset) - item.name = info['name'] + item.name = info["name"] item.value = apid - item.pkt_tracker_start_idx = info['first_tracker_index'] - item.pkts_reserved = info['pkts_reserved'] - item.pkts_received = info['pkts_received'] - - for j, pkt_info in enumerate(info['pkt_info']): - offset = (header.pkt_tracker_offset - + (info['first_tracker_index'] + j) * ctypes.sizeof(PacketTracker)) + item.pkt_tracker_start_idx = info["first_tracker_index"] + item.pkts_reserved = info["pkts_reserved"] + item.pkts_received = info["pkts_received"] + + for j, pkt_info in enumerate(info["pkt_info"]): + offset = header.pkt_tracker_offset + ( + info["first_tracker_index"] + j + ) * ctypes.sizeof(PacketTracker) tracker = PacketTracker.from_buffer(buf, offset) if pkt_info: - tracker.obs_time = pkt_info['obs_time'] - tracker.sequence_number = pkt_info['seq_num'] - tracker.size = pkt_info['size'] - tracker.offset = pkt_info['offset'] + tracker.obs_time = pkt_info["obs_time"] + tracker.sequence_number = pkt_info["seq_num"] + tracker.size = pkt_info["size"] + tracker.offset = pkt_info["offset"] tracker.fill_percent = 0 else: tracker.offset = -1 @@ -421,15 +499,43 @@ def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): class ViirsScienceApidInfo(object): apids = list(x for x in range(800, 829) if x != 824) - names = ['M04', 'M05', 'M03', 'M02', 'M01', 'M06', 'M07', 'M09', 'M10', - 'M08', 'M11', 'M13', 'M12', 'I04', 'M16', 'M15', 'M14', 'I05', - 'I01', 'I02', 'I03', 'DNB', 'DNB_MGS', 'DNB_LGS', - 'CAL', 'ENG', 'DNB_HGA', 'DNB_HGB'] + names = [ + "M04", + "M05", + "M03", + "M02", + "M01", + "M06", + "M07", + "M09", + "M10", + "M08", + "M11", + "M13", + "M12", + "I04", + "M16", + "M15", + "M14", + "I05", + "I01", + "I02", + "I03", + "DNB", + "DNB_MGS", + "DNB_LGS", + "CAL", + "ENG", + "DNB_HGA", + "DNB_HGB", + ] @classmethod def get_specs(cls): - return [ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid)) - for apid in cls.apids] + return [ + ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid)) + for apid in cls.apids + ] @classmethod def get_name(cls, apid): @@ -443,11 +549,11 @@ class ViirsScienceApidInfo(object): @classmethod def get_packets_per_scan(cls, apid): name = cls.get_name(apid) - if name == 'ENG': + if name == "ENG": return 1 - elif name == 'CAL': + elif name == "CAL": return 24 - elif name.startswith('M') or name.startswith('DNB'): + elif name.startswith("M") or name.startswith("DNB"): return 17 else: return 33 @@ -458,19 +564,21 @@ class CrisScienceApidInfo(object): @classmethod def get_specs(cls): - return [ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid)) - for apid in cls.apids] + return [ + ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid)) + for apid in cls.apids + ] @classmethod def get_name(cls, apid): if apid == 1289: - return 'EIGHT_S_SCI' + return "EIGHT_S_SCI" elif apid == 1290: - return 'ENG' + return "ENG" else: offset = apid - 1315 - view_types = ['N', 'S', 'C'] - bands = ['LW', 'MW', 'SW'] + view_types = ["N", "S", "C"] + bands = ["LW", "MW", "SW"] num_fovs = 9 view_type = view_types[offset // (num_fovs * len(bands))] band = bands[offset // num_fovs % len(bands)] @@ -480,13 +588,13 @@ class CrisScienceApidInfo(object): @classmethod def get_max_expected(cls, apid): name = cls.get_name(apid) - if name == 'EIGHT_S_SCI': + if name == "EIGHT_S_SCI": return 5 - elif name == 'ENG': + elif name == "ENG": return 1 else: view_type = name[0] - if view_type == 'N': + if view_type == "N": return 121 else: return 9 @@ -506,7 +614,7 @@ class RdrTypeManager(object): def register_type(self, cls): for apid_spec in cls.apids: if apid_spec.num in self._types_by_apid: - raise ValueError('each APID can only be handled by one RDR type') + raise ValueError("each APID can only be handled by one RDR type") self._types_by_apid[apid_spec.num] = cls return cls @@ -514,7 +622,7 @@ class RdrTypeManager(object): try: return self._types_by_apid[apid] except KeyError: - raise RdrgenError('Unsupported APID: {}'.format(apid)) + raise RdrgenError("Unsupported APID: {}".format(apid)) rdr_type_mgr = RdrTypeManager() @@ -524,54 +632,58 @@ get_rdr_type = rdr_type_mgr.get_type_for_apid @rdr_type_spec class ViirsScienceRdrType(object): - product_id = 'RVIRS' - short_name = 'VIIRS-SCIENCE-RDR' + product_id = "RVIRS" + short_name = "VIIRS-SCIENCE-RDR" gran_len = 85350000 - sensor = 'viirs' - type_id = 'SCIENCE' - document = '474-00448-02-06_JPSS-DD-Vol-II-Part-6_0200G.pdf' + sensor = "viirs" + type_id = "SCIENCE" + document = "474-00448-02-06_JPSS-DD-Vol-II-Part-6_0200G.pdf" apids = ViirsScienceApidInfo.get_specs() default_aggregation = 1 @rdr_type_spec class CrisScienceRdrType(object): - product_id = 'RCRIS' - short_name = 'CRIS-SCIENCE-RDR' + product_id = "RCRIS" + short_name = "CRIS-SCIENCE-RDR" gran_len = 31997000 - sensor = 'cris' - type_id = 'SCIENCE' - document = '474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf' + sensor = "cris" + type_id = "SCIENCE" + document = "474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf" apids = CrisScienceApidInfo.get_specs() default_aggregation = 15 @rdr_type_spec class AtmsScienceRdrType(object): - product_id = 'RATMS' - short_name = 'ATMS-SCIENCE-RDR' + product_id = "RATMS" + short_name = "ATMS-SCIENCE-RDR" gran_len = 31997000 - sensor = 'atms' - type_id = 'SCIENCE' - document = '474-00448-02-02_JPSS-DD-Vol-II-Part-2_0200B.pdf' - apids = [ApidSpec(515, 'CAL', max_expected=5), - ApidSpec(528, 'SCI', max_expected=1249), - ApidSpec(530, 'ENG_TEMP', max_expected=13), - ApidSpec(531, 'ENG_HS', max_expected=5)] + sensor = "atms" + type_id = "SCIENCE" + document = "474-00448-02-02_JPSS-DD-Vol-II-Part-2_0200B.pdf" + apids = [ + ApidSpec(515, "CAL", max_expected=5), + ApidSpec(528, "SCI", max_expected=1249), + ApidSpec(530, "ENG_TEMP", max_expected=13), + ApidSpec(531, "ENG_HS", max_expected=5), + ] default_aggregation = 15 @rdr_type_spec class SpacecraftDiaryRdrType(object): - product_id = 'RNSCA' - short_name = 'SPACECRAFT-DIARY-RDR' + product_id = "RNSCA" + short_name = "SPACECRAFT-DIARY-RDR" gran_len = 20000000 sensor = None - type_id = 'DIARY' - document = '474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200F.pdf' - apids = [ApidSpec(0, 'CRITICAL', max_expected=21), - ApidSpec(8, 'ADCS_HKH', max_expected=21), - ApidSpec(11, 'DIARY', max_expected=21)] + type_id = "DIARY" + document = "474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200F.pdf" + apids = [ + ApidSpec(0, "CRITICAL", max_expected=21), + ApidSpec(8, "ADCS_HKH", max_expected=21), + ApidSpec(11, "DIARY", max_expected=21), + ] default_aggregation = 303 @@ -598,7 +710,7 @@ class ViirsGroupedPacketTimeTracker(object): def get_iet(self, pkt): if not self.tracks_apid(pkt.apid): - raise ValueError('APID {} not a VIIRS grouped packet type'.format(pkt.apid)) + raise ValueError("APID {} not a VIIRS grouped packet type".format(pkt.apid)) if pkt.is_first(): obs_iet = get_packet_iet(pkt) self._db[pkt.apid] = (obs_iet, pkt.seqid) @@ -620,10 +732,10 @@ class ViirsGroupedPacketTimeTracker(object): idx = 20 else: idx = 10 - arr = np.frombuffer(pkt.bytes()[idx:idx + 8], 'B') - days = arr[0:2].view('>u2')[0] - ms = arr[2:6].view('>u4')[0] - us = arr[6:8].view('>u2')[0] + arr = np.frombuffer(pkt.bytes()[idx : idx + 8], "B") + days = arr[0:2].view(">u2")[0] + ms = arr[2:6].view(">u4")[0] + us = arr[6:8].view(">u2")[0] return cds_to_iet(days, ms, us) @staticmethod @@ -631,8 +743,10 @@ class ViirsGroupedPacketTimeTracker(object): seq_limit = 2 ** 14 group_end = first_seq_num + group_size # the 2nd check below is needed to handle wrap-around - return (first_seq_num < nonfirst_seq_num < group_end - or first_seq_num < nonfirst_seq_num + seq_limit < group_end) + return ( + first_seq_num < nonfirst_seq_num < group_end + or first_seq_num < nonfirst_seq_num + seq_limit < group_end + ) @staticmethod def check_packet_iet(pkt_iet, obs_iet): @@ -656,26 +770,43 @@ class OrphanedViirsPacket(RdrgenError): return repr(self.packet) -def make_rdr_filename(rdr_types, sat, aggr_begin, aggr_end, orbit_num, creation_time, - origin, domain, compressed): +def make_rdr_filename( + rdr_types, + sat, + aggr_begin, + aggr_end, + orbit_num, + creation_time, + origin, + domain, + compressed, +): aggr_begin = iet_to_datetime(aggr_begin) aggr_end = iet_to_datetime(aggr_end) - prod_ids = '-'.join(sorted(t.product_id for t in rdr_types)) - sat = {'snpp': 'npp', 'j01': 'j01'}[sat] - if origin.endswith('-'): - origin = origin[:-1] + ('c' if compressed else 'u') + prod_ids = "-".join(sorted(t.product_id for t in rdr_types)) + sat = {"snpp": "npp", "j01": "j01"}[sat] + if origin.endswith("-"): + origin = origin[:-1] + ("c" if compressed else "u") def format_time(t): - return t.strftime('%H%M%S') + str(t.microsecond // 100000) + return t.strftime("%H%M%S") + str(t.microsecond // 100000) - return '{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5'.format( - p=prod_ids, s=sat, d=aggr_begin, b=format_time(aggr_begin), - e=format_time(aggr_end), n=orbit_num, c=creation_time, o=origin, m=domain) + return "{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5".format( + p=prod_ids, + s=sat, + d=aggr_begin, + b=format_time(aggr_begin), + e=format_time(aggr_end), + n=orbit_num, + c=creation_time, + o=origin, + m=domain, + ) def make_granule_id(sat, gran_iet): tenths = (gran_iet - satellite_base_times[sat]) // 100000 - return '{}{:012d}'.format(platform_short_names[sat], tenths) + return "{}{:012d}".format(platform_short_names[sat], tenths) def get_packet_iet(pkt): @@ -720,12 +851,11 @@ def get_overlapping_granules(sat, gran_len, start_iet, stop_iet): return rv -satellite_base_times = { - 'snpp': 1698019234000000, - 'j01': 1698019234000000, -} -platform_short_names = { - 'snpp': 'NPP', - 'j01': 'J01', +satellite_base_times = {"snpp": 1698019234000000, "j01": 1698019234000000} +platform_short_names = {"snpp": "NPP", "j01": "J01"} +instrument_short_names = { + "viirs": "VIIRS", + "cris": "CrIS", + "atms": "ATMS", + None: "SPACECRAFT", } -instrument_short_names = {'viirs': 'VIIRS', 'cris': 'CrIS', 'atms': 'ATMS', None: 'SPACECRAFT'} diff --git a/edosl0util/split.py b/edosl0util/split.py index f047c22e47316f104382090180b3f8b0af6cc8bf..c9d00063f60b97af9943781db7621f69e61f98dd 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -47,9 +47,7 @@ def _replace_pdsname_stamp(filename, stamp): # # NOTE: It seems that EDOS uses the file_id column for fractional seconds. # We just zero this out since the bucket should be on even seconds. - pat = '{}{}0{}'.format(filename[:22], - '%y%j%H%M%S', - filename[-6:]) + pat = "{}{}0{}".format(filename[:22], "%y%j%H%M%S", filename[-6:]) return stamp.strftime(pat) @@ -58,12 +56,8 @@ def _filename_for_splitfile(filename, stamp, minutes): # # NOTE: It seems that EDOS uses the file_id column for fractional seconds. # We just zero this out since the bucket should be on even seconds. - pat = '{}{}{}{}0{}'.format( - filename[:20], - minutes, - filename[21], - '%y%j%H%M%S', - filename[-6:] + pat = "{}{}{}{}0{}".format( + filename[:20], minutes, filename[21], "%y%j%H%M%S", filename[-6:] ) return stamp.strftime(pat) @@ -82,8 +76,8 @@ def split_file(filepath, minutes, destdir): :raises RuntimeError: If a file exists with the same name of a bucket file. """ - destdir = destdir or '.' - stream = split_stream(jpss_packet_stream(io.open(filepath, 'rb')), minutes) + destdir = destdir or "." + stream = split_stream(jpss_packet_stream(io.open(filepath, "rb")), minutes) for timestamp, pkts, blob in stream: stamp = datetime.utcfromtimestamp(timestamp) dirname, filename = os.path.split(filepath) @@ -91,9 +85,12 @@ def split_file(filepath, minutes, destdir): dstpath = os.path.join(dirname, destdir, newname) if os.path.exists(dstpath): raise RuntimeError( - ('File already exists. ' - 'Bucket file possibly colliding with input file.'), - dstpath) - with io.open(dstpath, 'wb') as fptr: + ( + "File already exists. " + "Bucket file possibly colliding with input file." + ), + dstpath, + ) + with io.open(dstpath, "wb") as fptr: fptr.write(blob) yield stamp, fptr.name diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 0311c90bfdb5e8913f5575b28c1b0f9c007278ea..f8d5349c1bbc2cad58454fc6554eba8d8a390337 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -9,7 +9,7 @@ from edosl0util.headers import ( GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, - GROUP_STANDALONE + GROUP_STANDALONE, ) from edosl0util import headers @@ -18,7 +18,7 @@ from edosl0util.timecode import dt_to_cds LOG = logging.getLogger(__name__) # Max CCSDS sequence id -MAX_SEQ_ID = 2**14 - 1 +MAX_SEQ_ID = 2 ** 14 - 1 class Error(Exception): @@ -46,7 +46,8 @@ class BasicStream(object): Basic packet stream iterator that reads the primary and secondary headers and maintains offsets and read sizes. """ - Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data']) + + Tracker = namedtuple("Tracker", ["h1", "h2", "size", "offset", "data"]) def __init__(self, fobj, header_lookup=None, with_data=True): self.file = fobj @@ -71,8 +72,11 @@ class BasicStream(object): raise StopIteration() if len(buf) != size: raise PacketTooShort( - 'expected to read {:d} bytes, got {:d} at offset {:d}' - .format(size, len(buf), self._offset), self.file) + "expected to read {:d} bytes, got {:d} at offset {:d}".format( + size, len(buf), self._offset + ), + self.file, + ) self._offset += size return buf @@ -116,27 +120,40 @@ class Packet(object): self.seqid = self.primary_header.source_sequence_count def __str__(self): - return '<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>' % \ - (self.apid, self.seqid, self.stamp, self.size, self.offset) + return "<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>" % ( + self.apid, + self.seqid, + self.stamp, + self.size, + self.offset, + ) + __repr__ = __str__ @property def cds_timecode(self): return ( - self.secondary_header and - hasattr(self.secondary_header, 'timecode') and - self.secondary_header.timecode.day_segmented_timecode() or None) + self.secondary_header + and hasattr(self.secondary_header, "timecode") + and self.secondary_header.timecode.day_segmented_timecode() + or None + ) @property def stamp(self): return ( - self.secondary_header and - hasattr(self.secondary_header, 'timecode') and - self.secondary_header.timecode.asdatetime() or None) + self.secondary_header + and hasattr(self.secondary_header, "timecode") + and self.secondary_header.timecode.asdatetime() + or None + ) def bytes(self): - return bytearray(self.primary_header) + \ - bytearray(self.secondary_header) + self.data + return ( + bytearray(self.primary_header) + + bytearray(self.secondary_header) + + self.data + ) def is_group(self): return self.is_first() or self.is_continuing() or self.is_last() @@ -160,8 +177,7 @@ class Packet(object): class PacketStream(object): SEQID_NOTSET = -1 - def __init__(self, data_stream, fail_on_missing=False, - fail_on_tooshort=False): + def __init__(self, data_stream, fail_on_missing=False, fail_on_tooshort=False): """ :param data_stream: An interable of ``Tracker`` objects :keyword fail_on_missing: @@ -180,15 +196,14 @@ class PacketStream(object): self._first = None self._last = None self._apid_info = defaultdict( - lambda: {'count': 0, - 'last_seqid': self.SEQID_NOTSET, - 'num_missing': 0}) + lambda: {"count": 0, "last_seqid": self.SEQID_NOTSET, "num_missing": 0} + ) self._fail_on_missing = fail_on_missing self._fail_on_tooshort = fail_on_tooshort def __repr__(self): - filepath = getattr(self.file, 'name', None) - return '<{} file={}>'.format(self.__class__.__name__, filepath) + filepath = getattr(self.file, "name", None) + return "<{} file={}>".format(self.__class__.__name__, filepath) def __iter__(self): return self @@ -218,7 +233,7 @@ class PacketStream(object): except PacketTooShort as err: if self._fail_on_tooshort: raise - LOG.error('Packet too short, aborting stream: %s', err) + LOG.error("Packet too short, aborting stream: %s", err) # The result of this is essentially a file truncation raise StopIteration() packet = Packet(h1, h2, data, data_size=data_size, offset=offset) @@ -244,15 +259,15 @@ class PacketStream(object): self._last = packet.stamp else: self._last = max(packet.stamp, self._last) - apid['count'] += 1 - if apid['last_seqid'] != self.SEQID_NOTSET: - if packet.seqid > apid['last_seqid']: - num_missing = packet.seqid - apid['last_seqid'] - 1 + apid["count"] += 1 + if apid["last_seqid"] != self.SEQID_NOTSET: + if packet.seqid > apid["last_seqid"]: + num_missing = packet.seqid - apid["last_seqid"] - 1 else: - num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID - have_missing = num_missing != apid['num_missing'] - apid['num_missing'] += num_missing - apid['last_seqid'] = packet.seqid + num_missing = packet.seqid - apid["last_seqid"] + MAX_SEQ_ID + have_missing = num_missing != apid["num_missing"] + apid["num_missing"] += num_missing + apid["last_seqid"] = packet.seqid return have_missing def info(self): diff --git a/edosl0util/timecode.py b/edosl0util/timecode.py index 26b4aef83428962b66ae2ad002be4413ddd7b83b..b09142b9ea7f012d14618fc002070164e381601d 100644 --- a/edosl0util/timecode.py +++ b/edosl0util/timecode.py @@ -27,11 +27,16 @@ def timecode_parts_to_iet(days, ms, us, epoch): """ Convert components to a IET based on arbitrary epoch. """ - return int(_get_grain().utc2tai( - epoch + timedelta(days=float(days), - milliseconds=float(ms), - microseconds=float(us)), - epoch) * 1e6) + return int( + _get_grain().utc2tai( + epoch + + timedelta( + days=float(days), milliseconds=float(ms), microseconds=float(us) + ), + epoch, + ) + * 1e6 + ) def cds_to_iet(days, ms, us): @@ -52,7 +57,7 @@ def dt_to_cds(dt): """ UTC datetime to (day, millis, micros) """ - d = (dt - CDS_EPOCH) + d = dt - CDS_EPOCH return (d.days, int(d.seconds * 1e3), d.microseconds) diff --git a/edosl0util/trunc.py b/edosl0util/trunc.py index fdf7080ca05c274cffa53b51729650ee668d012f..a0674e62a75e2991fcf87f60ade3467d0d25766d 100644 --- a/edosl0util/trunc.py +++ b/edosl0util/trunc.py @@ -20,5 +20,5 @@ def trunc_stream(stream, start, end): def trunc_file(filename, start, end): - stream = jpss_packet_stream(io.open(filename, 'rb')) + stream = jpss_packet_stream(io.open(filename, "rb")) return trunc_stream(stream, start, end) diff --git a/setup.py b/setup.py index 441e77ed97e327a5d0b5de85a46d4aaf30fc48af..8d4c7317d7775fda9c6220a6eeec032c09190e53 100644 --- a/setup.py +++ b/setup.py @@ -33,5 +33,6 @@ setup( edosl0crgen = edosl0util.cli.crgen:main rdr2l0 = edosl0util.cli.rdr2l0:main rdrgen = edosl0util.cli.rdrgen:main + rdrmerge = edosl0util.cli.rdrmerge:main """ ) diff --git a/tests/test_rdrmerge.py b/tests/test_rdrmerge.py new file mode 100644 index 0000000000000000000000000000000000000000..660afbe56aec95e263f3f86766522457be3836df --- /dev/null +++ b/tests/test_rdrmerge.py @@ -0,0 +1,14 @@ +import os + +from glob import glob +from edosl0util.cli import rdrmerge + + +def test_rdrmerge(tmpdir): + + rdr = glob(os.path.join(os.getcwd(), 'tests/RCRIS*.h5'))[0] + tmpdir.chdir() + + outputs = rdrmerge.merge_rdrs([rdr]) + assert len(outputs) == 1 + assert os.path.exists(outputs[0])