Skip to content
Snippets Groups Projects
Commit 2aafd00b authored by Greg Quinn's avatar Greg Quinn
Browse files

Add RCRIS-RNSCA rdrgen test

parent 9dfa022d
No related branches found
No related tags found
No related merge requests found
...@@ -16,22 +16,28 @@ from edosl0util.jpssrdr import ( ...@@ -16,22 +16,28 @@ from edosl0util.jpssrdr import (
from edosl0util.stream import jpss_packet_stream from edosl0util.stream import jpss_packet_stream
def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}): def packets_to_rdrs(sat, l0_files, **kwargs):
def iter_pkts(l0_files):
for l0_file in l0_files:
with open(l0_file) as l0_file_obj:
for pkt in jpss_packet_stream(l0_file_obj):
yield pkt
build_rdr(sat, iter_pkts(l0_files), **kwargs)
def build_rdr(sat, pkt_iter, aggr_level=None, diary_cushion=10000000, attr_overrides={}):
# divy packets up into temp files organized by granule # divy packets up into temp files organized by granule
file_mgr = BinnedTemporaryFileManager() file_mgr = BinnedTemporaryFileManager()
get_jpss_packet_time = GetJpssPacketTime() get_jpss_packet_time = GetJpssPacketTime()
gran_infos = set() # (rdr_type, gran_iet) pairs gran_infos = set() # (rdr_type, gran_iet) pairs
for pkt_file in pkt_files: for pkt in pkt_iter:
with open(pkt_file) as file_obj: rdr_type = get_rdr_type(pkt.apid)
stream = jpss_packet_stream(file_obj) pkt_iet = get_jpss_packet_time(pkt)
for pkt in stream: gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet)
rdr_type = get_rdr_type(pkt.apid) gran_info = (rdr_type, gran_iet)
pkt_iet = get_jpss_packet_time(pkt) gran_infos.add(gran_info)
gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet) file_mgr.add_data(gran_info, pkt.bytes())
gran_info = (rdr_type, gran_iet)
gran_infos.add(gran_info)
file_mgr.add_data(gran_info, pkt.bytes())
# determine what RDR files we'll be producing based on the packets we've seen # determine what RDR files we'll be producing based on the packets we've seen
rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos)
...@@ -43,6 +49,7 @@ def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}): ...@@ -43,6 +49,7 @@ def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}):
for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type)) for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type))
# now generate the RDRs # now generate the RDRs
rdr_files = []
for aggr_iet in primary_aggr_iets: for aggr_iet in primary_aggr_iets:
rdr_writer = RdrWriter(sat, rdr_types, aggr_iet, aggr_level, **attr_overrides) rdr_writer = RdrWriter(sat, rdr_types, aggr_iet, aggr_level, **attr_overrides)
rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level)
...@@ -54,10 +61,9 @@ def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}): ...@@ -54,10 +61,9 @@ def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}):
blob = build_rdr_blob(sat, pkts, primary_type, gran_iet) blob = build_rdr_blob(sat, pkts, primary_type, gran_iet)
rdr_writer.write_granule(primary_type, gran_iet, blob) rdr_writer.write_granule(primary_type, gran_iet, blob)
if packaged_type: if packaged_type:
cushion = 1000000 # 1 second; is this consistent with IDPS?
packaged_gran_iets = get_overlapping_granules( packaged_gran_iets = get_overlapping_granules(
sat, packaged_type.gran_len, aggr_iet - cushion, sat, packaged_type.gran_len, aggr_iet - diary_cushion,
aggr_iet + aggr_level * primary_type.gran_len + cushion + 1) aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1)
rdr_writer.write_aggregate( rdr_writer.write_aggregate(
packaged_type, packaged_gran_iets[0], len(packaged_gran_iets)) packaged_type, packaged_gran_iets[0], len(packaged_gran_iets))
for gran_iet in packaged_gran_iets: for gran_iet in packaged_gran_iets:
...@@ -66,6 +72,9 @@ def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}): ...@@ -66,6 +72,9 @@ def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}):
blob = build_rdr_blob(sat, pkts, packaged_type, gran_iet) blob = build_rdr_blob(sat, pkts, packaged_type, gran_iet)
rdr_writer.write_granule(packaged_type, gran_iet, blob) rdr_writer.write_granule(packaged_type, gran_iet, blob)
rdr_writer.close() rdr_writer.close()
rdr_files.append(rdr_writer.file_name)
return rdr_files
def process_rdr_types(given_rdr_types, force_packaging): def process_rdr_types(given_rdr_types, force_packaging):
...@@ -214,7 +223,7 @@ class RdrWriter(object): ...@@ -214,7 +223,7 @@ class RdrWriter(object):
'AggregateEndingGranuleID': make_granule_id(self._sat, last_gran_iet), 'AggregateEndingGranuleID': make_granule_id(self._sat, last_gran_iet),
'AggregateEndingOrbitNumber': np.uint64(self._orbit_num), 'AggregateEndingOrbitNumber': np.uint64(self._orbit_num),
'AggregateEndingTime': self._format_time_attr(aggr_end_iet), 'AggregateEndingTime': self._format_time_attr(aggr_end_iet),
'AggregateNumberGranules': np.uint64(1)}) 'AggregateNumberGranules': np.uint64(num_grans)})
def write_granule(self, rdr_type, gran_iet, blob, creation_time=None): def write_granule(self, rdr_type, gran_iet, blob, creation_time=None):
raw_grp = self._h5_file['All_Data/{}_All'.format(rdr_type.short_name)] raw_grp = self._h5_file['All_Data/{}_All'.format(rdr_type.short_name)]
......
File added
import os import os
import subprocess import subprocess
from StringIO import StringIO from io import BytesIO
from datetime import datetime from datetime import datetime
import h5py import h5py
...@@ -10,6 +10,58 @@ from edosl0util.jpssrdr import decode_rdr_blob ...@@ -10,6 +10,58 @@ from edosl0util.jpssrdr import decode_rdr_blob
from edosl0util.stream import jpss_packet_stream from edosl0util.stream import jpss_packet_stream
def generate_rdr_packets(rdr_file):
"""Iterate over all packets in an RDR
For RDRs with packaged spacecraft diary (e.g. RCRIS-RNSCA), all packets from
one RDR type will be provided before the other. Within each RDR type packets
will be provided in the order in which they were received when the RDR was
constructed (i.e. storage order not tracker order).
"""
with h5py.File(rdr_file, 'r') as h5_file:
for grp in h5_file['All_Data'].values():
for gran_idx in range(len(grp)):
blob = grp['RawApplicationPackets_{}'.format(gran_idx)][:]
for pkt in generate_rdr_blob_packets(blob):
yield pkt
def generate_rdr_blob_packets(blob):
"""Iterate packets in an RDR blob in storage (not tracker) order"""
# TODO: move to an edosl0util module?
info = decode_rdr_blob(blob)
pkt_buf = blob[info.header.ap_storage_offset:]
return jpss_packet_stream(BytesIO(pkt_buf))
def test_can_reproduce_cris_rdr():
orig_file_name = 'RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5'
orig_file = os.path.join(os.path.dirname(__file__), orig_file_name)
new_file, = m.build_rdr('snpp', generate_rdr_packets(orig_file))
with h5py.File(orig_file, 'r') as orig_h5, h5py.File(new_file, 'a') as new_h5:
root_attrs = ['Distributor', 'N_Dataset_Source', 'N_HDF_Creation_Date',
'N_HDF_Creation_Time']
coll_attrs = ['N_Processing_Domain']
aggr_attrs = ['AggregateBeginningOrbitNumber', 'AggregateEndingOrbitNumber']
gran_attrs = ['N_Beginning_Orbit_Number', 'N_Creation_Date', 'N_Creation_Time',
'N_IDPS_Mode', 'N_Software_Version', 'N_Granule_Version',
'N_Reference_ID']
def copy_attrs(get_obj, attrs):
orig_obj = get_obj(orig_h5)
new_obj = get_obj(new_h5)
for attr in attrs:
new_obj.attrs[attr] = orig_obj.attrs[attr]
copy_attrs(lambda h5: h5, root_attrs)
for coll in orig_h5['Data_Products']:
copy_attrs(lambda h5: h5['Data_Products'][coll], coll_attrs)
copy_attrs(lambda h5: h5['Data_Products'][coll][coll + '_Aggr'], aggr_attrs)
for gran_idx in range(len(orig_h5['Data_Products'][coll]) - 1):
copy_attrs(
lambda h5: h5['Data_Products'][coll][coll + '_Gran_' + str(gran_idx)],
gran_attrs)
def test_can_reproduce_rdr_from_class(): def test_can_reproduce_rdr_from_class():
class_rdr_file = 'RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5' class_rdr_file = 'RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5'
class_rdr_path = os.path.join(os.path.dirname(__file__), class_rdr_file) class_rdr_path = os.path.join(os.path.dirname(__file__), class_rdr_file)
...@@ -27,7 +79,7 @@ def test_can_reproduce_rdr_from_class(): ...@@ -27,7 +79,7 @@ def test_can_reproduce_rdr_from_class():
rdr_type = m.SpacecraftDiaryRdrType rdr_type = m.SpacecraftDiaryRdrType
gran_iet = 1883865714000000 gran_iet = 1883865714000000
aggr_level = 1 aggr_level = 1
pkt_stream = jpss_packet_stream(StringIO(pkt_buf.tobytes())) pkt_stream = jpss_packet_stream(BytesIO(pkt_buf.tobytes()))
blob = m.build_rdr_blob('snpp', pkt_stream, rdr_type, gran_iet) blob = m.build_rdr_blob('snpp', pkt_stream, rdr_type, gran_iet)
tmp_dir = '/tmp' tmp_dir = '/tmp'
writer = m.RdrWriter( writer = m.RdrWriter(
...@@ -35,9 +87,9 @@ def test_can_reproduce_rdr_from_class(): ...@@ -35,9 +87,9 @@ def test_can_reproduce_rdr_from_class():
distributor='arch', origin='nob-', domain='ops', distributor='arch', origin='nob-', domain='ops',
creation_time=datetime(2017, 9, 13, 22, 3, 40, 173580), creation_time=datetime(2017, 9, 13, 22, 3, 40, 173580),
orbit_num=30441, software_ver='I2.0.03.00') orbit_num=30441, software_ver='I2.0.03.00')
writer.write_aggregate(rdr_type, gran_iet, aggr_level)
writer.write_granule(rdr_type, gran_iet, blob, writer.write_granule(rdr_type, gran_iet, blob,
creation_time=datetime(2017, 9, 12, 1, 37, 43, 474383)) creation_time=datetime(2017, 9, 12, 1, 37, 43, 474383))
writer.write_aggregate(rdr_type, gran_iet, aggr_level)
writer.close() writer.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment