Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
test_rdrgen.py 7.54 KiB
import os
import subprocess
from io import BytesIO
from datetime import datetime
import h5py
import edosl0util.rdrgen as m
from edosl0util.jpssrdr import decode_rdr_blob
from edosl0util.stream import jpss_packet_stream
def generate_rdr_packets(rdr_file):
"""Iterate over all packets in an RDR
For RDRs with packaged spacecraft diary (e.g. RCRIS-RNSCA), all packets from
one RDR type will be provided before the other. Within each RDR type packets
will be provided in the order in which they were received when the RDR was
constructed (i.e. storage order not tracker order).
"""
with h5py.File(rdr_file, 'r') as h5_file:
for grp in h5_file['All_Data'].values():
for gran_idx in range(len(grp)):
blob = grp['RawApplicationPackets_{}'.format(gran_idx)][:]
for pkt in generate_rdr_blob_packets(blob):
yield pkt
def generate_rdr_blob_packets(blob):
"""Iterate packets in an RDR blob in storage (not tracker) order"""
# TODO: move to an edosl0util module?
info = decode_rdr_blob(blob)
pkt_buf = blob[info.header.ap_storage_offset:]
return jpss_packet_stream(BytesIO(pkt_buf))
def test_can_reproduce_cris_rdr(tmpdir):
verify_rdr_reproduction(
'RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5',
str(tmpdir))
def verify_rdr_reproduction(orig_file_name, tmp_dir):
orig_file = os.path.join(os.path.dirname(__file__), orig_file_name)
new_file, = m.build_rdr('snpp', generate_rdr_packets(orig_file), output_dir=tmp_dir)
new_file = os.path.join(tmp_dir, new_file)
with h5py.File(orig_file, 'r') as orig_h5, h5py.File(new_file, 'a') as new_h5:
root_attrs = ['Distributor', 'N_Dataset_Source', 'N_HDF_Creation_Date',
'N_HDF_Creation_Time']
coll_attrs = ['N_Processing_Domain']
aggr_attrs = ['AggregateBeginningOrbitNumber', 'AggregateEndingOrbitNumber']
gran_attrs = ['N_Beginning_Orbit_Number', 'N_Creation_Date', 'N_Creation_Time',
'N_IDPS_Mode', 'N_Software_Version', 'N_Granule_Version',
'N_Reference_ID']
def copy_attrs(get_obj, attrs):
orig_obj = get_obj(orig_h5)
new_obj = get_obj(new_h5)
for attr in attrs:
new_obj.attrs[attr] = orig_obj.attrs[attr]
copy_attrs(lambda h5: h5, root_attrs)
for coll in orig_h5['Data_Products']:
copy_attrs(lambda h5: h5['Data_Products'][coll], coll_attrs)
copy_attrs(lambda h5: h5['Data_Products'][coll][coll + '_Aggr'], aggr_attrs)
for gran_idx in range(len(orig_h5['Data_Products'][coll]) - 1):
copy_attrs(
lambda h5: h5['Data_Products'][coll][coll + '_Gran_' + str(gran_idx)],
gran_attrs)
p = subprocess.Popen(
['h5diff', '-c', '-p', '1e-5', orig_file, new_file],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = p.communicate()
assert not stdout
assert p.returncode == 0
def test_can_reproduce_rdr_from_class():
class_rdr_file = 'RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5'
class_rdr_path = os.path.join(os.path.dirname(__file__), class_rdr_file)
# read buffer of raw packets from the RDR
with h5py.File(class_rdr_path, 'r') as class_h5_file:
class_blob = (
class_h5_file['All_Data/SPACECRAFT-DIARY-RDR_All/RawApplicationPackets_0'][:])
rdr_info = decode_rdr_blob(class_blob)
ini = rdr_info.header.ap_storage_offset
fin = ini + rdr_info.header.next_pkt_pos
pkt_buf = class_blob[ini:fin]
# generate new RDR from packets, injecting matching metadata from CLASS file
rdr_type = m.SpacecraftDiaryRdrType
gran_iet = 1883865714000000
aggr_level = 1
pkt_stream = jpss_packet_stream(BytesIO(pkt_buf.tobytes()))
blob = m.build_rdr_blob('snpp', pkt_stream, rdr_type, gran_iet)
tmp_dir = '/tmp'
writer = m.RdrWriter(
'snpp', [rdr_type], gran_iet, aggr_level, tmp_dir,
distributor='arch', origin='nob-', domain='ops',
creation_time=datetime(2017, 9, 13, 22, 3, 40, 173580),
orbit_num=30441, software_ver='I2.0.03.00')
writer.write_aggregate(rdr_type, gran_iet, aggr_level)
writer.write_granule(rdr_type, gran_iet, blob,
creation_time=datetime(2017, 9, 12, 1, 37, 43, 474383))
writer.close()
# file names should be identical
assert writer.file_name == class_rdr_file
# use h5diff to verify files match. -p option is needed to allow some slop
# in comparing N_Percent_Missing_Data
p = subprocess.Popen(
['h5diff', '-c', '-p', '1e-6',
class_rdr_path, os.path.join(tmp_dir, writer.file_name)],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
assert p.communicate()[0] == ''
assert p.returncode == 0
class TestGranulation(object):
"""Test granule time computations using IETs from IDPS CrIS granules"""
def test_get_granule_start(self):
gran = 1880240293174000 # some actual CrIS granule times
prev_gran = 1880240261177000
next_gran = 1880240325171000
def run(t):
return m.get_granule_start('snpp', self.cris_gran_len, t)
assert run(gran) == gran
assert run(gran + 1) == gran
assert run(gran - 1) == prev_gran
assert run(gran + self.cris_gran_len) == next_gran
def test_get_aggregate_start(self):
aggr = 1880240037198000
aggr_level = 15
def run(t):
return m.get_aggregate_start('snpp', self.cris_gran_len, aggr_level, t)
aggr_len = aggr_level * self.cris_gran_len
assert run(aggr - 1) == aggr - aggr_len
assert run(aggr) == aggr
assert run(aggr + aggr_len - 1) == aggr
assert run(aggr + aggr_len) == aggr + aggr_len
cris_gran_len = 31997000
class TestViirsGroupedPacketTimeTracker(object):
def test_check_sequence_number(self):
group_size = 24
def run(nonfirst_seq_num, first_seq_num):
return m.ViirsGroupedPacketTimeTracker.check_sequence_number(
nonfirst_seq_num, first_seq_num, group_size)
first_seq = 4096
assert not run(4095, first_seq)
assert run(4097, first_seq)
assert run(4119, first_seq)
assert not run(4120, first_seq)
max_seq = 2**14
first_seq = max_seq - 16
assert not run(max_seq - 17, first_seq)
assert run(max_seq - 15, first_seq)
assert run(max_seq - 1, first_seq)
assert run(0, first_seq)
assert run(7, first_seq)
assert not run(8, first_seq)
def test_get_viirs_iet(self):
def run(pkt):
return m.iet_to_datetime(m.ViirsGroupedPacketTimeTracker.get_viirs_iet(pkt))
with open(self.l0_path, 'rb') as l0_file:
stream = jpss_packet_stream(l0_file)
standalone_pkt = next(p for p in stream if p.is_standalone())
first_pkt = next(p for p in stream if p.is_first())
nonfirst_pkt = next(p for p in stream if p.is_continuing())
# expected values haven't been independently verified, just looked at
# to see that they at least make sense
assert run(standalone_pkt) == datetime(2017, 9, 27, 13, 54, 1, 727765)
assert run(first_pkt) == datetime(2017, 9, 27, 13, 54, 1, 746898)
assert run(nonfirst_pkt) == datetime(2017, 9, 27, 13, 54, 1, 748328)
l0_file = 'P1570826VIIRSSCIENCE6T17270135400001.PDS'
l0_path = os.path.join(os.path.dirname(__file__), l0_file)