diff --git a/edosl0util/cli/rdrmerge.py b/edosl0util/cli/rdrmerge.py index 506a8e0de1b8ec08cff2db76a88030dc4c6f4b5e..6e17540e53e8bfffe6072e3032130bd32f428926 100644 --- a/edosl0util/cli/rdrmerge.py +++ b/edosl0util/cli/rdrmerge.py @@ -8,7 +8,12 @@ from tempfile import mkdtemp from ..jpssrdr import atms_sci_to_l0, cris_sci_to_l0, spacecraft_to_l0, viirs_sci_to_l0 from ..merge import merge -from ..rdrgen import build_rdr, filter_before, filter_group_orphans, iter_pkts +from ..rdrgen import ( + build_rdr, + filter_before, + filter_group_orphans, + iter_pkts, +) from ..stream import jpss_packet_stream from .util import configure_logging @@ -89,7 +94,7 @@ start_of_mission = { } -def merge_rdrs(inputs): +def merge_rdrs(inputs, not_before=None, not_after=None): to_process = rdrs_to_process(inputs) # do each set of files separately to handle case where different products @@ -111,7 +116,8 @@ def merge_rdrs(inputs): # have to pre-filter "orphans" to prevent OrphanPacketError packets = filter_group_orphans(iter_pkts([merged])) - packets = filter_before(packets, before=start_of_mission[sat]) + # always filter packets before start-of-mission + packets = filter_before(packets, before=not_before or start_of_mission[sat]) rdrs = build_rdr( sat, packets, aggr_type="full", output_dir=tmpdir, strict=False ) @@ -128,8 +134,29 @@ def merge_rdrs(inputs): def main(): import argparse + def timestamp(v): + for fmt in [ + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d" + ]: + try: + return datetime.strptime(v, fmt) + except (TypeError, ValueError): + pass + parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", action="store_true") + parser.add_argument( + "--not-before", + type=timestamp, + help=( + "Filter any data before this many seconds ago. Defaults to the start " + "of mission for the satellite. Format as timestamp " + "<yyyy-mm-dd>[T<hh:mm:ss>[Z]]" + ), + ) parser.add_argument("rdr", nargs="+", help="Any science rdr (RVIRS, RCRIS, RATMS)") args = parser.parse_args() configure_logging(args) @@ -137,7 +164,7 @@ def main(): if not args.rdr: parser.exit(1, "no RDR's provided") - for o in merge_rdrs(args.rdr): + for o in merge_rdrs(args.rdr, not_before=args.not_before): LOG.info("created %s", o) diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 62a66a54d87a10b06f305be1b58110f8b638cec2..4f281fd3f247bdeb121621f532e8ce8121ce689a 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -15,7 +15,7 @@ import edosl0util from edosl0util import compat from edosl0util.jpssrdr import Apid as ApidListItem from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob -from edosl0util.stream import jpss_packet_stream +from edosl0util.stream import jpss_packet_stream, collect_groups from edosl0util.timecode import cds_to_iet, iet_to_dt LOG = logging.getLogger(__name__) @@ -75,24 +75,15 @@ def filter_group_orphans(s): def filter_before(s, before): """ - Filter all packets until the first packet with a timestamp >= before. No - more filtering is performed after the first valid timestamp, any packets - that occur after the the first valid packet that are < before will be - passed on. + Filter all packets before the provided time. This handles groupped packets + by dropping the entire group if its first packet has a timestamp before + the provided time. """ - s = iter(s) - done = object() # sentinal to stop iter - p = next(s, done) - if p is done: - return - while p.stamp is None or p.stamp < before: - LOG.debug("dropping hanging/before[%s] %s", before, p) - p = next(s, done) - if p is done: - return # return early, iter is done - yield p - for p in s: - yield p + for grp in collect_groups(s): + if grp[0].stamp < before: + LOG.debug("dropping before[%s] %s", before, grp[0]) + continue + yield from grp def packets_to_rdrs(sat, l0_files, **kwargs): diff --git a/edosl0util/stream.py b/edosl0util/stream.py index f8d5349c1bbc2cad58454fc6554eba8d8a390337..889609b2cdd4d7fc1e162b0ccc4e67b7ecd5ac3d 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,18 +1,17 @@ -import os +import ctypes as c import errno import logging -import ctypes as c -from collections import deque, defaultdict, namedtuple +import os +from collections import defaultdict, deque, namedtuple +from edosl0util import headers from edosl0util.headers import ( - PrimaryHeader, - GROUP_FIRST, GROUP_CONTINUING, + GROUP_FIRST, GROUP_LAST, GROUP_STANDALONE, + PrimaryHeader, ) - -from edosl0util import headers from edosl0util.timecode import dt_to_cds LOG = logging.getLogger(__name__) @@ -300,6 +299,36 @@ class PacketStream(object): self.push_back(packet) +def collect_groups(stream): + """ + Collects packets into lists of groupped packets. If a packet is standalone + it will be a list of just a single packet. If a packet is groupped it will + be a list of packets in the group. There is no garauntee the group will be + complete. The first packet in a group will always have a timestamp. + """ + done = object() # sentinal for stopping iter + group = deque() + while True: + # drop leading packets without timestamp + p = next(stream, done) + if p is done: + return + # just pass through any non-groupped packets + if p.is_standalone(): + yield [p] + continue + + # Yield and start a new group if either we get a first packet or the current + # packets APID does not match the current group. + if p.is_first() or (group and group[-1].apid != p.apid): + if group: + yield list(group) + group = deque([p]) + continue + + group.append(p) + + def jpss_packet_stream(fobj, **kwargs): stream = BasicStream(fobj, headers.jpss_header_lookup) return PacketStream(stream, **kwargs)