Skip to content
Snippets Groups Projects
Commit 0d3e40d1 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

rdrmerge: all setting filter_before time

Added collect_groups
parent 6af17f17
No related branches found
Tags 0.14.2
No related merge requests found
...@@ -8,7 +8,12 @@ from tempfile import mkdtemp ...@@ -8,7 +8,12 @@ from tempfile import mkdtemp
from ..jpssrdr import atms_sci_to_l0, cris_sci_to_l0, spacecraft_to_l0, viirs_sci_to_l0 from ..jpssrdr import atms_sci_to_l0, cris_sci_to_l0, spacecraft_to_l0, viirs_sci_to_l0
from ..merge import merge from ..merge import merge
from ..rdrgen import build_rdr, filter_before, filter_group_orphans, iter_pkts from ..rdrgen import (
build_rdr,
filter_before,
filter_group_orphans,
iter_pkts,
)
from ..stream import jpss_packet_stream from ..stream import jpss_packet_stream
from .util import configure_logging from .util import configure_logging
...@@ -89,7 +94,7 @@ start_of_mission = { ...@@ -89,7 +94,7 @@ start_of_mission = {
} }
def merge_rdrs(inputs): def merge_rdrs(inputs, not_before=None, not_after=None):
to_process = rdrs_to_process(inputs) to_process = rdrs_to_process(inputs)
# do each set of files separately to handle case where different products # do each set of files separately to handle case where different products
...@@ -111,7 +116,8 @@ def merge_rdrs(inputs): ...@@ -111,7 +116,8 @@ def merge_rdrs(inputs):
# have to pre-filter "orphans" to prevent OrphanPacketError # have to pre-filter "orphans" to prevent OrphanPacketError
packets = filter_group_orphans(iter_pkts([merged])) packets = filter_group_orphans(iter_pkts([merged]))
packets = filter_before(packets, before=start_of_mission[sat]) # always filter packets before start-of-mission
packets = filter_before(packets, before=not_before or start_of_mission[sat])
rdrs = build_rdr( rdrs = build_rdr(
sat, packets, aggr_type="full", output_dir=tmpdir, strict=False sat, packets, aggr_type="full", output_dir=tmpdir, strict=False
) )
...@@ -128,8 +134,29 @@ def merge_rdrs(inputs): ...@@ -128,8 +134,29 @@ def merge_rdrs(inputs):
def main(): def main():
import argparse import argparse
def timestamp(v):
for fmt in [
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d"
]:
try:
return datetime.strptime(v, fmt)
except (TypeError, ValueError):
pass
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument(
"--not-before",
type=timestamp,
help=(
"Filter any data before this many seconds ago. Defaults to the start "
"of mission for the satellite. Format as timestamp "
"<yyyy-mm-dd>[T<hh:mm:ss>[Z]]"
),
)
parser.add_argument("rdr", nargs="+", help="Any science rdr (RVIRS, RCRIS, RATMS)") parser.add_argument("rdr", nargs="+", help="Any science rdr (RVIRS, RCRIS, RATMS)")
args = parser.parse_args() args = parser.parse_args()
configure_logging(args) configure_logging(args)
...@@ -137,7 +164,7 @@ def main(): ...@@ -137,7 +164,7 @@ def main():
if not args.rdr: if not args.rdr:
parser.exit(1, "no RDR's provided") parser.exit(1, "no RDR's provided")
for o in merge_rdrs(args.rdr): for o in merge_rdrs(args.rdr, not_before=args.not_before):
LOG.info("created %s", o) LOG.info("created %s", o)
......
...@@ -15,7 +15,7 @@ import edosl0util ...@@ -15,7 +15,7 @@ import edosl0util
from edosl0util import compat from edosl0util import compat
from edosl0util.jpssrdr import Apid as ApidListItem from edosl0util.jpssrdr import Apid as ApidListItem
from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob
from edosl0util.stream import jpss_packet_stream from edosl0util.stream import jpss_packet_stream, collect_groups
from edosl0util.timecode import cds_to_iet, iet_to_dt from edosl0util.timecode import cds_to_iet, iet_to_dt
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -75,24 +75,15 @@ def filter_group_orphans(s): ...@@ -75,24 +75,15 @@ def filter_group_orphans(s):
def filter_before(s, before): def filter_before(s, before):
""" """
Filter all packets until the first packet with a timestamp >= before. No Filter all packets before the provided time. This handles groupped packets
more filtering is performed after the first valid timestamp, any packets by dropping the entire group if its first packet has a timestamp before
that occur after the the first valid packet that are < before will be the provided time.
passed on.
""" """
s = iter(s) for grp in collect_groups(s):
done = object() # sentinal to stop iter if grp[0].stamp < before:
p = next(s, done) LOG.debug("dropping before[%s] %s", before, grp[0])
if p is done: continue
return yield from grp
while p.stamp is None or p.stamp < before:
LOG.debug("dropping hanging/before[%s] %s", before, p)
p = next(s, done)
if p is done:
return # return early, iter is done
yield p
for p in s:
yield p
def packets_to_rdrs(sat, l0_files, **kwargs): def packets_to_rdrs(sat, l0_files, **kwargs):
......
import os import ctypes as c
import errno import errno
import logging import logging
import ctypes as c import os
from collections import deque, defaultdict, namedtuple from collections import defaultdict, deque, namedtuple
from edosl0util import headers
from edosl0util.headers import ( from edosl0util.headers import (
PrimaryHeader,
GROUP_FIRST,
GROUP_CONTINUING, GROUP_CONTINUING,
GROUP_FIRST,
GROUP_LAST, GROUP_LAST,
GROUP_STANDALONE, GROUP_STANDALONE,
PrimaryHeader,
) )
from edosl0util import headers
from edosl0util.timecode import dt_to_cds from edosl0util.timecode import dt_to_cds
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -300,6 +299,36 @@ class PacketStream(object): ...@@ -300,6 +299,36 @@ class PacketStream(object):
self.push_back(packet) self.push_back(packet)
def collect_groups(stream):
"""
Collects packets into lists of groupped packets. If a packet is standalone
it will be a list of just a single packet. If a packet is groupped it will
be a list of packets in the group. There is no garauntee the group will be
complete. The first packet in a group will always have a timestamp.
"""
done = object() # sentinal for stopping iter
group = deque()
while True:
# drop leading packets without timestamp
p = next(stream, done)
if p is done:
return
# just pass through any non-groupped packets
if p.is_standalone():
yield [p]
continue
# Yield and start a new group if either we get a first packet or the current
# packets APID does not match the current group.
if p.is_first() or (group and group[-1].apid != p.apid):
if group:
yield list(group)
group = deque([p])
continue
group.append(p)
def jpss_packet_stream(fobj, **kwargs): def jpss_packet_stream(fobj, **kwargs):
stream = BasicStream(fobj, headers.jpss_header_lookup) stream = BasicStream(fobj, headers.jpss_header_lookup)
return PacketStream(stream, **kwargs) return PacketStream(stream, **kwargs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment