Skip to content
Snippets Groups Projects
Commit c01d5aa4 authored by Greg Quinn's avatar Greg Quinn
Browse files

rdrgen: aggr_type='full'

parent c74ddfd0
No related branches found
No related tags found
No related merge requests found
...@@ -27,8 +27,13 @@ def packets_to_rdrs(sat, l0_files, **kwargs): ...@@ -27,8 +27,13 @@ def packets_to_rdrs(sat, l0_files, **kwargs):
build_rdr(sat, iter_pkts(l0_files), **kwargs) build_rdr(sat, iter_pkts(l0_files), **kwargs)
def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=10000000, def build_rdr(sat, pkt_iter, output_dir='.', aggr_type='idps', aggr_level=None,
attr_overrides={}): diary_cushion=10000000, attr_overrides={}):
"""Construct RDR file(s) from L0 packets
Default aggregation behavior uses file boundaries computed in the same way
as for IDPS assuming the default aggregation level depending on the instrument
(e.g. 85 sec for VIIRS, 8 min for CrIS).
# divy packets up into temp files organized by granule # divy packets up into temp files organized by granule
file_mgr = BinnedTemporaryFileManager() file_mgr = BinnedTemporaryFileManager()
get_jpss_packet_time = GetJpssPacketTime() get_jpss_packet_time = GetJpssPacketTime()
...@@ -45,10 +50,22 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=1000 ...@@ -45,10 +50,22 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=1000
rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos)
primary_type, packaged_type = process_rdr_types(rdr_types, force_packaging=False) primary_type, packaged_type = process_rdr_types(rdr_types, force_packaging=False)
rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2)) rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2))
aggr_level = aggr_level or primary_type.default_aggregation if aggr_type == 'idps':
primary_aggr_iets = sorted(set( aggr_level = aggr_level or primary_type.default_aggregation
get_aggregate_start(sat, primary_type.gran_len, aggr_level, gran_iet) primary_aggr_iets = sorted(set(
for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type)) get_aggregate_start(sat, primary_type.gran_len, aggr_level, gran_iet)
for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type))
elif aggr_type == 'full':
# produce a single output file, ignoring IDPS-style aggregation boundaries
assert aggr_level is None
first_gran_iet = min(gran_iet for (rdr_type, gran_iet) in gran_infos
if rdr_type is primary_type)
last_gran_iet = max(gran_iet for (rdr_type, gran_iet) in gran_infos
if rdr_type is primary_type)
aggr_level = (last_gran_iet - first_gran_iet) / primary_type.gran_len + 1
primary_aggr_iets = [first_gran_iet]
else:
raise ValueError('aggr_type must be idps or input')
# now generate the RDRs # now generate the RDRs
rdr_files = [] rdr_files = []
...@@ -56,8 +73,7 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=1000 ...@@ -56,8 +73,7 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=1000
rdr_writer = RdrWriter(sat, rdr_types, aggr_iet, aggr_level, output_dir, rdr_writer = RdrWriter(sat, rdr_types, aggr_iet, aggr_level, output_dir,
**attr_overrides) **attr_overrides)
rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level)
gran_iets = get_aggregate_granule_times( gran_iets = [aggr_iet + i * primary_type.gran_len for i in range(aggr_level)]
sat, primary_type.gran_len, aggr_level, aggr_iet)
for gran_iet in gran_iets: for gran_iet in gran_iets:
with file_mgr.process_file((primary_type, gran_iet)) as pkt_file: with file_mgr.process_file((primary_type, gran_iet)) as pkt_file:
pkts = list(jpss_packet_stream(pkt_file)) pkts = list(jpss_packet_stream(pkt_file))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment