diff --git a/edosl0util/cli/rdrmerge.py b/edosl0util/cli/rdrmerge.py index 752bebafe4c52f748296ad377c814e5946a95445..b91ab3d1e932d44deb066b610b693dcb35000e4b 100644 --- a/edosl0util/cli/rdrmerge.py +++ b/edosl0util/cli/rdrmerge.py @@ -3,33 +3,37 @@ import os import shutil from contextlib import contextmanager from datetime import datetime -from glob import glob from os.path import basename, join from tempfile import mkdtemp from ..jpssrdr import atms_sci_to_l0, cris_sci_to_l0, spacecraft_to_l0, viirs_sci_to_l0 from ..rdrgen import packets_to_rdrs +from ..stream import jpss_packet_stream +from ..merge import merge +from .util import configure_logging -LOG = logging.getLogger(__name__) +LOG = logging.getLogger("edosl0util.cli.rdrmerge") +# py2 compat @contextmanager def TemporaryDirectory(**kwds): - tmpdir = mkdtemp(**kwds) + dirpath = mkdtemp(**kwds) try: - yield tmpdir + yield dirpath finally: - shutil.rmtree(tmpdir) + if not os.environ.get('DEBUG', False): + shutil.rmtree(dirpath) -def merge_rdrs(rdrs): +def rdrs_to_process(inputs): # filter and validiate rdrs to_process = { (s, p): [] for s in ("snpp", "j01") for p in ("RNSCA-RVIRS", "RCRIS-RNSCA", "RATMS-RNSCA") } - for rdr in rdrs: + for rdr in inputs: parts = basename(rdr).split("_") if len(parts) < 2: continue @@ -47,47 +51,62 @@ def merge_rdrs(rdrs): continue to_process[sat, product].append(rdr) + return {(s, p): v for (s, p), v in to_process.items() if v} + +sci_extractors = { + "RNSCA-RVIRS": viirs_sci_to_l0, + "RCRIS-RNSCA": cris_sci_to_l0, + "RATMS-RNSCA": atms_sci_to_l0, +} + + +def extract_packets(sat, product, fpath, workdir="."): # use same start/end for all extracting since we don't care about file # timestamps start = datetime(1970, 1, 1) end = datetime.utcnow() + fname = basename(fpath) + pds = sci_extractors[product](sat, [fpath], start, end, workdir) + outputs = [join(workdir, fname + ".sci.pds")] + shutil.move(pds, outputs[0]) + + scs = spacecraft_to_l0(sat, [fpath], start, end, workdir) + for idx, pds in enumerate(scs): + outputs.append(join(workdir, fname + ".sc%d.pds" % idx)) + shutil.move(pds, outputs[-1]) + + return outputs + + +def merge_rdrs(inputs): + to_process = rdrs_to_process(inputs) + # do each set of files separately to handle case where different products # are provided outputs = [] - for sat, product in to_process.keys(): - inputs = to_process[sat, product] - if not inputs: - continue - - sci_to_l0 = { - "RNSCA-RVIRS": viirs_sci_to_l0, - "RCRIS-RNSCA": cris_sci_to_l0, - "RATMS-RNSCA": atms_sci_to_l0, - }[product] - + for (sat, product), rdrs in to_process.items(): + LOG.info("handling %d files for %s %s", len(rdrs), sat, product) with TemporaryDirectory(dir=os.getcwd()) as tmpdir: # extract RDRs separately, name them after RDR - for fpath in inputs: - # extract the SCI data - fname = basename(fpath) - pds = sci_to_l0(sat, [fpath], start, end, tmpdir) - os.rename(pds, join(tmpdir, fname + ".sci.pds")) - # extract the spacecraft data - scs = spacecraft_to_l0(sat, [fpath], start, end, tmpdir) - for idx, pds in enumerate(scs): - os.rename(pds, join(tmpdir, fname + ".sc%d.pds" % idx)) - - # rdrs contains only file names - rdrs = packets_to_rdrs( - sat, glob(join(tmpdir, "*.pds")), aggr_type="full", output_dir=tmpdir - ) + pds = [] + for fpath in rdrs: + LOG.debug("extracting %s", fpath) + pds += extract_packets(sat, product, fpath, tmpdir) + + merged = join(tmpdir, basename(fpath) + '.merge.pds') + LOG.debug("merging packet files to %s", merged) + with open(merged, 'wb') as fp: + merge([jpss_packet_stream(open(p, 'rb')) for p in pds], fp) + + rdrs = packets_to_rdrs(sat, [merged], aggr_type="full", output_dir=tmpdir) assert len(rdrs) == 1, "Should have gotten a single RDR" + rdr = rdrs[0] # copy back to CWD - os.rename(join(tmpdir, rdrs[0]), rdrs[0]) - outputs.append(rdrs[0]) + os.rename(join(tmpdir, rdr), rdr) + outputs.append(rdr) return outputs @@ -96,8 +115,10 @@ def main(): import argparse parser = argparse.ArgumentParser() + parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("rdr", nargs="+", help="Any science rdr (RVIRS, RCRIS, RATMS)") args = parser.parse_args() + configure_logging(args) if not args.rdr: parser.exit(1, "no RDR's provided") diff --git a/edosl0util/crgen.py b/edosl0util/crgen.py index 00ad1e593f1143dbb81977ee3bbf3e2e23b89f87..920cf9ba63d55c49ca7c946a780d427cfe969311 100644 --- a/edosl0util/crgen.py +++ b/edosl0util/crgen.py @@ -3,7 +3,6 @@ from datetime import datetime import copy import difflib -import itertools import logging import os import pprint diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index b96df74b4f523a8e91a91afa3a51d0a669987567..0519f360b03a2797a7ff01d784177151f8616a35 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -14,9 +14,10 @@ import os from collections import namedtuple import numpy as np -from edosl0util.stream import jpss_packet_stream from h5py import File as H5File +from edosl0util.stream import jpss_packet_stream + from .headers import BaseStruct from .merge import VIIRS_APID_ORDER, merge @@ -229,7 +230,7 @@ def decode_rdr_blob(buf): def rdr_datasets(filepath): - fobj = H5File(filepath, mode='r') + fobj = H5File(filepath, mode="r") rdr = dict( telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)), diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)), @@ -278,7 +279,8 @@ def _write_packets(pkts, dest, skipfill): dest.write(pkt.packet) -def write_rdr_datasets(filepath, ancillary=False, skipfill=False): +def write_rdr_datasets(filepath, ancillary=False, skipfill=False, workdir=None): + workdir = workdir or "." rdrname = os.path.basename(filepath) rdrs = rdr_datasets(filepath) @@ -297,13 +299,15 @@ def write_rdr_datasets(filepath, ancillary=False, skipfill=False): rdr.header.type_id.decode(), apid, ) - pktfile = "{}.{}{}.pkts".format(rdrname, typekey, apid) + pktfile = os.path.join( + workdir, "{}.{}{}.pkts".format(rdrname, typekey, apid) + ) with open(pktfile, "ab") as dest: _write_packets(pkts, dest, skipfill) else: - pktfile = "{}.{}.pkts".format(rdrname, typekey) + pktfile = os.path.join(workdir, "{}.{}.pkts".format(rdrname, typekey)) LOG.debug("writing %s", pktfile) - with open(pktfile, "wb") as dest: + with open(pktfile, "ab") as dest: for idx, rdr in enumerate(rdrs[typekey]): LOG.debug( "... %s gran %d %s-%s-%s", @@ -328,18 +332,18 @@ def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir): product = "P{}{}".format(satellite_to_scid[satellite], product) for filepath in rdrs: LOG.info("dumping %s", filepath) - write_rdr_datasets(filepath, skipfill=True) + write_rdr_datasets(filepath, skipfill=True, workdir=workdir) # alphanumeric sorting to bootstrap final sort - inputs = sorted(glob.glob(filepat)) + inputs = sorted(glob.glob(os.path.join(workdir, filepat))) streams = [jpss_packet_stream(open(f, "rb")) for f in inputs] - pdsname = pdsfilename(product, start) + pdsname = os.path.join(workdir, pdsfilename(product, start)) LOG.info("merging to %s", pdsname) order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None - with open(os.path.join(workdir, pdsname), "wb") as dest: + with open(pdsname, "wb") as dest: merge(streams, output=dest, trunc_to=[start, end], apid_order=order) - return os.path.join(workdir, pdsname) if workdir != "." else pdsname + return pdsname if workdir != "." else pdsname def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None): @@ -391,13 +395,15 @@ def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None): workdir = workdir or "." for filepath in rdrs: LOG.info("dumping %s", filepath) - write_rdr_datasets(filepath, ancillary=True, skipfill=True) + write_rdr_datasets(filepath, ancillary=True, skipfill=True, workdir=workdir) filenames = [] scid = satellite_to_scid[satellite] for apid in (0, 8, 11): # alphanumeric sorting to bootstrap final sort - inputs = sorted(glob.glob("*.ancillary{}.pkts".format(apid))) + inputs = sorted( + glob.glob(os.path.join(workdir, "*.ancillary{}.pkts".format(apid))) + ) files = [open(f, "rb") for f in inputs] streams = [jpss_packet_stream(f) for f in files] diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 444df09e10d4b1e11d8a161b6a0e3f29a2292e1f..68ae36e87bd867d67a8b201f79373909591d3c76 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -1,5 +1,6 @@ import ctypes import os +import shutil import tempfile from collections import OrderedDict from contextlib import contextmanager @@ -11,23 +12,20 @@ import numpy as np import edosl0util from edosl0util import compat -from edosl0util.jpssrdr import ( - StaticHeader, - Apid as ApidListItem, - PacketTracker, - decode_rdr_blob, -) +from edosl0util.jpssrdr import Apid as ApidListItem +from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob from edosl0util.stream import jpss_packet_stream from edosl0util.timecode import cds_to_iet, iet_to_dt -def packets_to_rdrs(sat, l0_files, **kwargs): - def iter_pkts(l0_files): - for l0_file in l0_files: - with open(l0_file, "rb") as l0_file_obj: - for pkt in jpss_packet_stream(l0_file_obj): - yield pkt +def iter_pkts(l0_files): + for l0_file in l0_files: + with open(l0_file, "rb") as l0_file_obj: + for pkt in jpss_packet_stream(l0_file_obj): + yield pkt + +def packets_to_rdrs(sat, l0_files, **kwargs): return build_rdr(sat, iter_pkts(l0_files), **kwargs) @@ -53,74 +51,86 @@ def build_rdr( # divy packets up into temp files organized by granule file_mgr = BinnedTemporaryFileManager() - get_jpss_packet_time = GetJpssPacketTime() - gran_infos = set() # (rdr_type, gran_iet) pairs - for pkt in pkt_iter: - rdr_type = get_rdr_type(pkt.apid) - pkt_iet = get_jpss_packet_time(pkt) - gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet) - gran_info = (rdr_type, gran_iet) - gran_infos.add(gran_info) - file_mgr.add_data(gran_info, pkt.bytes()) - - # determine what RDR files we'll be producing based on the packets we've seen - rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) - primary_type, packaged_type = process_rdr_types(rdr_types, force_packaging=False) - rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2)) - if aggr_type == "idps": - aggr_level = aggr_level or primary_type.default_aggregation - primary_aggr_iets = sorted( - set( - get_aggregate_start(sat, primary_type.gran_len, aggr_level, gran_iet) + try: + get_jpss_packet_time = GetJpssPacketTime() + gran_infos = set() # (rdr_type, gran_iet) pairs + for pkt in pkt_iter: + rdr_type = get_rdr_type(pkt.apid) + pkt_iet = get_jpss_packet_time(pkt) + gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet) + gran_info = (rdr_type, gran_iet) + gran_infos.add(gran_info) + file_mgr.add_data(gran_info, pkt.bytes()) + + # determine what RDR files we'll be producing based on the packets we've seen + rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) + primary_type, packaged_type = process_rdr_types( + rdr_types, force_packaging=False + ) + rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2)) + if aggr_type == "idps": + aggr_level = aggr_level or primary_type.default_aggregation + primary_aggr_iets = sorted( + set( + get_aggregate_start( + sat, primary_type.gran_len, aggr_level, gran_iet + ) + for (rdr_type, gran_iet) in gran_infos + if rdr_type is primary_type + ) + ) + elif aggr_type == "full": + # produce a single output file, ignoring IDPS-style aggregation boundaries + assert aggr_level is None + first_gran_iet = min( + gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type ) - ) - elif aggr_type == "full": - # produce a single output file, ignoring IDPS-style aggregation boundaries - assert aggr_level is None - first_gran_iet = min( - gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type - ) - last_gran_iet = max( - gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type - ) - aggr_level = (last_gran_iet - first_gran_iet) // primary_type.gran_len + 1 - primary_aggr_iets = [first_gran_iet] - else: - raise ValueError("aggr_type must be idps or input") - - # now generate the RDRs - rdr_files = [] - for aggr_iet in primary_aggr_iets: - rdr_writer = RdrWriter( - sat, rdr_types, aggr_iet, aggr_level, output_dir, **attr_overrides - ) - rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) - gran_iets = [aggr_iet + i * primary_type.gran_len for i in range(aggr_level)] - for gran_iet in gran_iets: - with file_mgr.process_file((primary_type, gran_iet)) as pkt_file: - pkts = list(jpss_packet_stream(pkt_file)) - blob = build_rdr_blob(sat, pkts, primary_type, gran_iet) - rdr_writer.write_granule(primary_type, gran_iet, blob) - if packaged_type: - packaged_gran_iets = get_overlapping_granules( - sat, - packaged_type.gran_len, - aggr_iet - diary_cushion, - aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1, + last_gran_iet = max( + gran_iet + for (rdr_type, gran_iet) in gran_infos + if rdr_type is primary_type ) - rdr_writer.write_aggregate( - packaged_type, packaged_gran_iets[0], len(packaged_gran_iets) + aggr_level = (last_gran_iet - first_gran_iet) // primary_type.gran_len + 1 + primary_aggr_iets = [first_gran_iet] + else: + raise ValueError("aggr_type must be idps or input") + + # now generate the RDRs + rdr_files = [] + for aggr_iet in primary_aggr_iets: + rdr_writer = RdrWriter( + sat, rdr_types, aggr_iet, aggr_level, output_dir, **attr_overrides ) - for gran_iet in packaged_gran_iets: - with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file: + rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) + gran_iets = [ + aggr_iet + i * primary_type.gran_len for i in range(aggr_level) + ] + for gran_iet in gran_iets: + with file_mgr.process_file((primary_type, gran_iet)) as pkt_file: pkts = list(jpss_packet_stream(pkt_file)) - blob = build_rdr_blob(sat, pkts, packaged_type, gran_iet) - rdr_writer.write_granule(packaged_type, gran_iet, blob) - rdr_writer.close() - rdr_files.append(rdr_writer.file_name) - file_mgr.clean_up() + blob = build_rdr_blob(sat, pkts, primary_type, gran_iet) + rdr_writer.write_granule(primary_type, gran_iet, blob) + if packaged_type: + packaged_gran_iets = get_overlapping_granules( + sat, + packaged_type.gran_len, + aggr_iet - diary_cushion, + aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1, + ) + rdr_writer.write_aggregate( + packaged_type, packaged_gran_iets[0], len(packaged_gran_iets) + ) + for gran_iet in packaged_gran_iets: + with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file: + pkts = list(jpss_packet_stream(pkt_file)) + blob = build_rdr_blob(sat, pkts, packaged_type, gran_iet) + rdr_writer.write_granule(packaged_type, gran_iet, blob) + rdr_writer.close() + rdr_files.append(rdr_writer.file_name) + finally: + file_mgr.clean_up() return rdr_files @@ -217,7 +227,7 @@ class BinnedTemporaryFileManager(object): def clean_up(self): """Call after all files are processed to avoid leaving a dir sitting around""" - os.rmdir(self.dir) + shutil.rmtree(self.dir) class RdrWriter(object): @@ -791,7 +801,7 @@ def make_rdr_filename( def format_time(t): return t.strftime("%H%M%S") + str(t.microsecond // 100000) - return "{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5".format( + return "{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5".format( # noqa p=prod_ids, s=sat, d=aggr_begin, @@ -831,7 +841,7 @@ def get_aggregate_start(sat, gran_len, grans_per_aggr, iet): N = grans_per_aggr G_s = get_granule_start(sat, Y, iet) A_n = G_s // (N * Y) - O = G_s % Y + O = G_s % Y # noqa A_s = A_n * (Y * N) + O return A_s diff --git a/tests/P1570826VIIRSSCIENCE6T17270135400001.PDS b/tests/fixtures/P1570826VIIRSSCIENCE6T17270135400001.PDS similarity index 100% rename from tests/P1570826VIIRSSCIENCE6T17270135400001.PDS rename to tests/fixtures/P1570826VIIRSSCIENCE6T17270135400001.PDS diff --git a/tests/RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5 b/tests/fixtures/RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5 similarity index 100% rename from tests/RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5 rename to tests/fixtures/RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5 diff --git a/tests/fixtures/RCRIS-RNSCA_npp_d20190516_t1604012_e1617532_b00001_c20190516162005905000_all-_dev.h5 b/tests/fixtures/RCRIS-RNSCA_npp_d20190516_t1604012_e1617532_b00001_c20190516162005905000_all-_dev.h5 new file mode 100644 index 0000000000000000000000000000000000000000..6124b9c4af944a25090dbefff6d0ed78f36428f0 --- /dev/null +++ b/tests/fixtures/RCRIS-RNSCA_npp_d20190516_t1604012_e1617532_b00001_c20190516162005905000_all-_dev.h5 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b0a4819ad40332c6727ddd2d43341426a436413a3b4bf984e9b06cf128cd298c +size 112435478 diff --git a/tests/fixtures/RCRIS-RNSCA_npp_d20190516_t1609532_e1623131_b00001_c20190516162306506000_all-_dev.h5 b/tests/fixtures/RCRIS-RNSCA_npp_d20190516_t1609532_e1623131_b00001_c20190516162306506000_all-_dev.h5 new file mode 100644 index 0000000000000000000000000000000000000000..94093b233bcf73df2049d39364860b818832eefb --- /dev/null +++ b/tests/fixtures/RCRIS-RNSCA_npp_d20190516_t1609532_e1623131_b00001_c20190516162306506000_all-_dev.h5 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:1b778bc2daedf1e87d6057fea9f74e893ef8ea61a07bf97b05b8e9a86becb064 +size 106871892 diff --git a/tests/RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5 b/tests/fixtures/RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5 similarity index 100% rename from tests/RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5 rename to tests/fixtures/RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5 diff --git a/tests/test_rdrmerge.py b/tests/test_rdrmerge.py index 660afbe56aec95e263f3f86766522457be3836df..aa058663bd838541491bf134536447d1aa0350c4 100644 --- a/tests/test_rdrmerge.py +++ b/tests/test_rdrmerge.py @@ -1,12 +1,14 @@ import os - from glob import glob + from edosl0util.cli import rdrmerge def test_rdrmerge(tmpdir): - rdr = glob(os.path.join(os.getcwd(), 'tests/RCRIS*.h5'))[0] + rdr = glob( + os.path.join(os.getcwd(), "tests/fixtures/RCRIS-RNSCA_npp_d20190516_t16*.h5") + )[0] tmpdir.chdir() outputs = rdrmerge.merge_rdrs([rdr])