diff --git a/edosl0util/cli/rdrmerge.py b/edosl0util/cli/rdrmerge.py index 9f7bfbc2eaa8322bdeee2503d4c1043f24c4275c..0a12727a75a36ad9cd0215b83a8da134390e4afc 100644 --- a/edosl0util/cli/rdrmerge.py +++ b/edosl0util/cli/rdrmerge.py @@ -7,9 +7,9 @@ from os.path import basename, join from tempfile import mkdtemp from ..jpssrdr import atms_sci_to_l0, cris_sci_to_l0, spacecraft_to_l0, viirs_sci_to_l0 -from ..rdrgen import build_rdr, filter_group_orphans, filter_before, iter_pkts -from ..stream import jpss_packet_stream from ..merge import merge +from ..rdrgen import build_rdr, filter_before, filter_group_orphans, iter_pkts +from ..stream import jpss_packet_stream from .util import configure_logging LOG = logging.getLogger("edosl0util.cli.rdrmerge") @@ -22,7 +22,7 @@ def TemporaryDirectory(**kwds): try: yield dirpath finally: - if not os.environ.get('DEBUG', False): + if not os.environ.get("DEBUG", False): shutil.rmtree(dirpath) @@ -80,10 +80,7 @@ def extract_packets(sat, product, fpath, workdir="."): return outputs -start_of_mission = { - 'snpp': datetime(2011, 10, 28), - 'noaa20': datetime(2017, 11, 18), -} +start_of_mission = {"snpp": datetime(2011, 10, 28), "noaa20": datetime(2017, 11, 18)} def merge_rdrs(inputs): @@ -101,15 +98,17 @@ def merge_rdrs(inputs): LOG.debug("extracting %s", fpath) pds += extract_packets(sat, product, fpath, tmpdir) - merged = join(tmpdir, basename(fpath) + '.merge.pds') + merged = join(tmpdir, basename(fpath) + ".merge.pds") LOG.debug("merging packet files to %s", merged) - with open(merged, 'wb') as fp: - merge([jpss_packet_stream(open(p, 'rb')) for p in pds], fp) + with open(merged, "wb") as fp: + merge([jpss_packet_stream(open(p, "rb")) for p in pds], fp) # have to pre-filter "orphans" to prevent OrphanPacketError packets = filter_group_orphans(iter_pkts([merged])) packets = filter_before(packets, before=start_of_mission[sat]) - rdrs = build_rdr(sat, packets, aggr_type="full", output_dir=tmpdir) + rdrs = build_rdr( + sat, packets, aggr_type="full", output_dir=tmpdir, strict=False + ) assert len(rdrs) == 1, "Should have gotten a single RDR" rdr = rdrs[0] diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 55613568e8b56497349f7bf2597f591ecffd1489..62a66a54d87a10b06f305be1b58110f8b638cec2 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -1,4 +1,5 @@ import ctypes +import logging import os import shutil import tempfile @@ -17,6 +18,27 @@ from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob from edosl0util.stream import jpss_packet_stream from edosl0util.timecode import cds_to_iet, iet_to_dt +LOG = logging.getLogger(__name__) + + +class Error(Exception): + """RDR generation base error. + """ + + +class InvalidStream(Error): + """ Packet stream is not valid for RDR + """ + + +class InvalidPacket(Error): + """ Invalid packet for the current RDR. + """ + + def __init__(self, msg, packet): + self.msg = msg + self.packet = packet + def iter_pkts(l0_files): for l0_file in l0_files: @@ -46,18 +68,25 @@ def filter_group_orphans(s): try: viirs_tracker.get_iet(p) except OrphanedViirsPacket: + LOG.debug("dropping orphan %s", p) continue yield p def filter_before(s, before): """ - Filter all packets that occur before ... before. + Filter all packets until the first packet with a timestamp >= before. No + more filtering is performed after the first valid timestamp, any packets + that occur after the the first valid packet that are < before will be + passed on. """ s = iter(s) - p = next(s) done = object() # sentinal to stop iter + p = next(s, done) + if p is done: + return while p.stamp is None or p.stamp < before: + LOG.debug("dropping hanging/before[%s] %s", before, p) p = next(s, done) if p is done: return # return early, iter is done @@ -78,6 +107,7 @@ def build_rdr( aggr_level=None, diary_cushion=10000000, attr_overrides={}, + strict=True, ): """Construct RDR file(s) from L0 packets @@ -151,7 +181,7 @@ def build_rdr( for gran_iet in gran_iets: with file_mgr.process_file((primary_type, gran_iet)) as pkt_file: pkts = list(jpss_packet_stream(pkt_file)) - blob = build_rdr_blob(sat, pkts, primary_type, gran_iet) + blob = build_rdr_blob(sat, pkts, primary_type, gran_iet, strict=strict) rdr_writer.write_granule(primary_type, gran_iet, blob) if packaged_type: packaged_gran_iets = get_overlapping_granules( @@ -166,7 +196,9 @@ def build_rdr( for gran_iet in packaged_gran_iets: with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file: pkts = list(jpss_packet_stream(pkt_file)) - blob = build_rdr_blob(sat, pkts, packaged_type, gran_iet) + blob = build_rdr_blob( + sat, pkts, packaged_type, gran_iet, strict=strict + ) rdr_writer.write_granule(packaged_type, gran_iet, blob) rdr_writer.close() rdr_files.append(rdr_writer.file_name) @@ -464,7 +496,23 @@ def _encodeall(v): return v -def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): +def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet, strict=True): + """ Build an RDR blob of data for a single RDR native resolution granule. + + :param str sat: snpp or noaa20 + :param .stream.PacketStream pkt_stream: + Stream of packets for a single RDR native resolution granule. + :param RdrType rdr_type: + One of the *RDRType classes that indicates the type of packets contained + in the stream. + :param int granule_iet: Granule start time in IET. + :param bool strict: + When True any packet stream issues, duplicates, unexpected APIDs, etc..., + will result in an error subclass of Error. + + :raise InvalidStream: If strict and the stream looks hokey + :raise InvalidPacket: If strict and got a bunk packet + """ get_jpss_packet_time = GetJpssPacketTime() granule_iet_end = granule_iet + rdr_type.gran_len @@ -484,13 +532,28 @@ def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): for pkt in pkt_stream: if pkt.apid not in apid_info: - raise ValueError( - "APID {} not expected for {}".format(pkt.apid, rdr_type.short_name) - ) + if strict: + raise InvalidPacket( + "APID {} not expected for {}".format(pkt.apid, rdr_type.short_name), + pkt, + ) + else: + LOG.warning("dropping unexpected APID: %s", pkt) + continue pkt_iet = get_jpss_packet_time(pkt) if not granule_iet <= pkt_iet < granule_iet_end: - raise ValueError("packet stream crosses granule boundary") + if strict: + raise InvalidStream("packet stream crosses granule boundary") + else: + LOG.warning("dropping packet that crosses granule boundry: %s", pkt) + continue info = apid_info[pkt.apid] + if info["pkts_received"] > len(info["pkt_info"]) - 1: + if strict: + raise InvalidPacket("possible duplicate packet", pkt) + else: + LOG.warning("dropping possible duplicate packet: %s", pkt) + continue pkt_info = info["pkt_info"][info["pkts_received"]] pkt_info["obs_time"] = pkt_iet pkt_info["seq_num"] = pkt.seqid diff --git a/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1945475_e1952543_b00001_c20191202195439734000_all-_dev.h5 b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1945475_e1952543_b00001_c20191202195439734000_all-_dev.h5 new file mode 100644 index 0000000000000000000000000000000000000000..66cf421d0003baf0c95e3a64cfa72dc561e47ede --- /dev/null +++ b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1945475_e1952543_b00001_c20191202195439734000_all-_dev.h5 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8251afd7127a74511fcf1904914474ca116b37e07141eeda1a8429fed53c2734 +size 374460102 diff --git a/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1950036_e2001264_b00001_c20191202200217562000_all-_dev.h5 b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1950036_e2001264_b00001_c20191202200217562000_all-_dev.h5 new file mode 100644 index 0000000000000000000000000000000000000000..efef50da55e59f82f2aed175a187f5844f1db6b1 --- /dev/null +++ b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1950036_e2001264_b00001_c20191202200217562000_all-_dev.h5 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6ee91727e12b029767cbcd3b303d0224abee960f63c2449bff8673663fc236f2 +size 614977107 diff --git a/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1950036_e2002517_b00001_c20191202200130753000_all-_dev.h5 b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1950036_e2002517_b00001_c20191202200130753000_all-_dev.h5 new file mode 100644 index 0000000000000000000000000000000000000000..a4de8362f5d2e9c7da4ac09d76d60bbb16cfcf34 --- /dev/null +++ b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1950036_e2002517_b00001_c20191202200130753000_all-_dev.h5 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:43bd09f7de06a1a670aa5c4e47f4d55ea1658756447d00fa12480a03511325d0 +size 657161282 diff --git a/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1954196_e1957103_b00001_c20191202195614020000_all-_dev.h5 b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1954196_e1957103_b00001_c20191202195614020000_all-_dev.h5 new file mode 100644 index 0000000000000000000000000000000000000000..81876ce90d245516486b3cdd21212f1f6bca8dfa --- /dev/null +++ b/tests/fixtures/merge/RNSCA-RVIRS_npp_d20191202_t1954196_e1957103_b00001_c20191202195614020000_all-_dev.h5 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fca0bc2bcc3c3f94f609b2a7f9ee7663a410f578e4911289d86c747df1d991ed +size 42311433 diff --git a/tests/test_rdrmerge.py b/tests/test_rdrmerge.py index aa058663bd838541491bf134536447d1aa0350c4..7d7105fef29e6a0d97a8740932101ac0b6ae8e30 100644 --- a/tests/test_rdrmerge.py +++ b/tests/test_rdrmerge.py @@ -3,14 +3,35 @@ from glob import glob from edosl0util.cli import rdrmerge +import pytest -def test_rdrmerge(tmpdir): +@pytest.fixture +def testdir(tmpdir): + cwd = os.getcwd() + yield tmpdir + os.chdir(cwd) + + +@pytest.mark.slow +def test_rdrmerge(testdir): rdr = glob( os.path.join(os.getcwd(), "tests/fixtures/RCRIS-RNSCA_npp_d20190516_t16*.h5") )[0] - tmpdir.chdir() + testdir.chdir() outputs = rdrmerge.merge_rdrs([rdr]) assert len(outputs) == 1 assert os.path.exists(outputs[0]) + + +@pytest.mark.slow +def test_rdrmerge_viirs(testdir): + rdrs = glob( + os.path.join(os.getcwd(), "tests/fixtures/merge/RNSCA-RVIRS_npp_d2019*.h5") + ) + testdir.chdir() + + outputs = rdrmerge.merge_rdrs(rdrs) + assert len(outputs) == 1 + assert os.path.exists(outputs[0])