Skip to content
Snippets Groups Projects
test_rdrgen.py 7.54 KiB
import os
import subprocess
from io import BytesIO
from datetime import datetime

import h5py

import edosl0util.rdrgen as m
from edosl0util.jpssrdr import decode_rdr_blob
from edosl0util.stream import jpss_packet_stream


def generate_rdr_packets(rdr_file):
    """Iterate over all packets in an RDR

    For RDRs with packaged spacecraft diary (e.g. RCRIS-RNSCA), all packets from
    one RDR type will be provided before the other. Within each RDR type packets
    will be provided in the order in which they were received when the RDR was
    constructed (i.e. storage order not tracker order).
    """
    with h5py.File(rdr_file, 'r') as h5_file:
        for grp in h5_file['All_Data'].values():
            for gran_idx in range(len(grp)):
                blob = grp['RawApplicationPackets_{}'.format(gran_idx)][:]
                for pkt in generate_rdr_blob_packets(blob):
                    yield pkt


def generate_rdr_blob_packets(blob):
    """Iterate packets in an RDR blob in storage (not tracker) order"""
    # TODO: move to an edosl0util module?
    info = decode_rdr_blob(blob)
    pkt_buf = blob[info.header.ap_storage_offset:]
    return jpss_packet_stream(BytesIO(pkt_buf))


def test_can_reproduce_cris_rdr(tmpdir):
    verify_rdr_reproduction(
        'RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5',
        str(tmpdir))


def verify_rdr_reproduction(orig_file_name, tmp_dir):
    orig_file = os.path.join(os.path.dirname(__file__), orig_file_name)
    new_file, = m.build_rdr('snpp', generate_rdr_packets(orig_file), output_dir=tmp_dir)
    new_file = os.path.join(tmp_dir, new_file)
    with h5py.File(orig_file, 'r') as orig_h5, h5py.File(new_file, 'a') as new_h5:
        root_attrs = ['Distributor', 'N_Dataset_Source', 'N_HDF_Creation_Date',
                      'N_HDF_Creation_Time']
        coll_attrs = ['N_Processing_Domain']
        aggr_attrs = ['AggregateBeginningOrbitNumber', 'AggregateEndingOrbitNumber']
        gran_attrs = ['N_Beginning_Orbit_Number', 'N_Creation_Date', 'N_Creation_Time',
                      'N_IDPS_Mode', 'N_Software_Version', 'N_Granule_Version',
                      'N_Reference_ID']
        def copy_attrs(get_obj, attrs):
            orig_obj = get_obj(orig_h5)
            new_obj = get_obj(new_h5)
            for attr in attrs:
                new_obj.attrs[attr] = orig_obj.attrs[attr]
        copy_attrs(lambda h5: h5, root_attrs)
        for coll in orig_h5['Data_Products']:
            copy_attrs(lambda h5: h5['Data_Products'][coll], coll_attrs)
            copy_attrs(lambda h5: h5['Data_Products'][coll][coll + '_Aggr'], aggr_attrs)
            for gran_idx in range(len(orig_h5['Data_Products'][coll]) - 1):
                copy_attrs(
                    lambda h5: h5['Data_Products'][coll][coll + '_Gran_' + str(gran_idx)],
                    gran_attrs)
    p = subprocess.Popen(
        ['h5diff', '-c', '-p', '1e-5', orig_file, new_file],
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, _ = p.communicate()
    assert not stdout
    assert p.returncode == 0



def test_can_reproduce_rdr_from_class():
    class_rdr_file = 'RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5'
    class_rdr_path = os.path.join(os.path.dirname(__file__), class_rdr_file)

    # read buffer of raw packets from the RDR
    with h5py.File(class_rdr_path, 'r') as class_h5_file:
        class_blob = (
            class_h5_file['All_Data/SPACECRAFT-DIARY-RDR_All/RawApplicationPackets_0'][:])
        rdr_info = decode_rdr_blob(class_blob)
        ini = rdr_info.header.ap_storage_offset
        fin = ini + rdr_info.header.next_pkt_pos
        pkt_buf = class_blob[ini:fin]

    # generate new RDR from packets, injecting matching metadata from CLASS file
    rdr_type = m.SpacecraftDiaryRdrType
    gran_iet = 1883865714000000
    aggr_level = 1
    pkt_stream = jpss_packet_stream(BytesIO(pkt_buf.tobytes()))
    blob = m.build_rdr_blob('snpp', pkt_stream, rdr_type, gran_iet)
    tmp_dir = '/tmp'
    writer = m.RdrWriter(
        'snpp', [rdr_type], gran_iet, aggr_level, tmp_dir,
        distributor='arch', origin='nob-', domain='ops',
        creation_time=datetime(2017, 9, 13, 22, 3, 40, 173580),
        orbit_num=30441, software_ver='I2.0.03.00')
    writer.write_aggregate(rdr_type, gran_iet, aggr_level)
    writer.write_granule(rdr_type, gran_iet, blob,
                         creation_time=datetime(2017, 9, 12, 1, 37, 43, 474383))
    writer.close()


    # file names should be identical
    assert writer.file_name == class_rdr_file

    # use h5diff to verify files match. -p option is needed to allow some slop
    # in comparing N_Percent_Missing_Data
    p = subprocess.Popen(
        ['h5diff', '-c', '-p', '1e-6',
            class_rdr_path, os.path.join(tmp_dir, writer.file_name)],
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    assert p.communicate()[0] == ''
    assert p.returncode == 0


class TestGranulation(object):
    """Test granule time computations using IETs from IDPS CrIS granules"""

    def test_get_granule_start(self):
        gran = 1880240293174000  # some actual CrIS granule times
        prev_gran = 1880240261177000
        next_gran = 1880240325171000
        def run(t):
            return m.get_granule_start('snpp', self.cris_gran_len, t)
        assert run(gran) == gran
        assert run(gran + 1) == gran
        assert run(gran - 1) == prev_gran
        assert run(gran + self.cris_gran_len) == next_gran

    def test_get_aggregate_start(self):
        aggr = 1880240037198000
        aggr_level = 15
        def run(t):
            return m.get_aggregate_start('snpp', self.cris_gran_len, aggr_level, t)
        aggr_len = aggr_level * self.cris_gran_len
        assert run(aggr - 1) == aggr - aggr_len
        assert run(aggr) == aggr
        assert run(aggr + aggr_len - 1) == aggr
        assert run(aggr + aggr_len) == aggr + aggr_len

    cris_gran_len = 31997000


class TestViirsGroupedPacketTimeTracker(object):

    def test_check_sequence_number(self):
        group_size = 24
        def run(nonfirst_seq_num, first_seq_num):
            return m.ViirsGroupedPacketTimeTracker.check_sequence_number(
                nonfirst_seq_num, first_seq_num, group_size)

        first_seq = 4096
        assert not run(4095, first_seq)
        assert run(4097, first_seq)
        assert run(4119, first_seq)
        assert not run(4120, first_seq)

        max_seq = 2**14
        first_seq = max_seq - 16
        assert not run(max_seq - 17, first_seq)
        assert run(max_seq - 15, first_seq)
        assert run(max_seq - 1, first_seq)
        assert run(0, first_seq)
        assert run(7, first_seq)
        assert not run(8, first_seq)

    def test_get_viirs_iet(self):
        def run(pkt):
            return m.iet_to_datetime(m.ViirsGroupedPacketTimeTracker.get_viirs_iet(pkt))

        with open(self.l0_path, 'rb') as l0_file:
            stream = jpss_packet_stream(l0_file)
            standalone_pkt = next(p for p in stream if p.is_standalone())
            first_pkt = next(p for p in stream if p.is_first())
            nonfirst_pkt = next(p for p in stream if p.is_continuing())

        # expected values haven't been independently verified, just looked at
        # to see that they at least make sense
        assert run(standalone_pkt) == datetime(2017, 9, 27, 13, 54, 1, 727765)
        assert run(first_pkt) == datetime(2017, 9, 27, 13, 54, 1, 746898)
        assert run(nonfirst_pkt) == datetime(2017, 9, 27, 13, 54, 1, 748328)

    l0_file = 'P1570826VIIRSSCIENCE6T17270135400001.PDS'
    l0_path = os.path.join(os.path.dirname(__file__), l0_file)