Skip to content
Snippets Groups Projects
test_rdrgen.py 6.23 KiB
import os
import subprocess
from io import BytesIO
from datetime import datetime

import h5py
import pytest

import edosl0util.rdrgen as m
from edosl0util.jpssrdr import decode_rdr_blob
from edosl0util.stream import jpss_packet_stream

h5diff_available = (subprocess.call('which h5diff > /dev/null 2>&1', shell=True) == 0)
if not h5diff_available:
    pytest.skip('skipping rdrgen tests (h5diff not available)',
                allow_module_level=True)


def generate_rdr_packets(rdr_file):
    """Iterate over all packets in an RDR

    For RDRs with packaged spacecraft diary (e.g. RCRIS-RNSCA), all packets from
    one RDR type will be provided before the other. Within each RDR type packets
    will be provided in the order in which they were received when the RDR was
    constructed (i.e. storage order not tracker order).
    """
    with h5py.File(rdr_file, 'r') as h5_file:
        for grp in h5_file['All_Data'].values():
            for gran_idx in range(len(grp)):
                blob = grp['RawApplicationPackets_{}'.format(gran_idx)][:]
                for pkt in generate_rdr_blob_packets(blob):
                    yield pkt


def generate_rdr_blob_packets(blob):
    """Iterate packets in an RDR blob in storage (not tracker) order"""
    # TODO: move to an edosl0util module?
    info = decode_rdr_blob(blob)
    pkt_buf = blob[info.header.ap_storage_offset:]
    return jpss_packet_stream(BytesIO(pkt_buf))


def test_can_reproduce_cris_rdr(tmpdir):
    verify_rdr_reproduction(
        'RCRIS-RNSCA_npp_d20171008_t0004096_e0012095_b30810_c20171008061237136301_nobu_ops.h5',
        str(tmpdir))


def verify_rdr_reproduction(orig_file_name, tmp_dir, **build_rdr_opts):
    orig_file = os.path.join(os.path.dirname(__file__), orig_file_name)
    new_file, = m.build_rdr('snpp', generate_rdr_packets(orig_file), output_dir=tmp_dir,
                            **build_rdr_opts)
    new_file = os.path.join(tmp_dir, new_file)
    with h5py.File(orig_file, 'r') as orig_h5, h5py.File(new_file, 'a') as new_h5:
        root_attrs = ['Distributor', 'N_Dataset_Source', 'N_HDF_Creation_Date',
                      'N_HDF_Creation_Time']
        coll_attrs = ['N_Processing_Domain']
        aggr_attrs = ['AggregateBeginningOrbitNumber', 'AggregateEndingOrbitNumber']
        gran_attrs = ['N_Beginning_Orbit_Number', 'N_Creation_Date', 'N_Creation_Time',
                      'N_IDPS_Mode', 'N_Software_Version', 'N_Granule_Version',
                      'N_Reference_ID', 'N_JPSS_Document_Ref']
        def copy_attrs(get_obj, attrs):
            orig_obj = get_obj(orig_h5)
            new_obj = get_obj(new_h5)
            for attr in attrs:
                new_obj.attrs[attr] = orig_obj.attrs[attr]
        copy_attrs(lambda h5: h5, root_attrs)
        for coll in orig_h5['Data_Products']:
            copy_attrs(lambda h5: h5['Data_Products'][coll], coll_attrs)
            copy_attrs(lambda h5: h5['Data_Products'][coll][coll + '_Aggr'], aggr_attrs)
            for gran_idx in range(len(orig_h5['Data_Products'][coll]) - 1):
                copy_attrs(
                    lambda h5: h5['Data_Products'][coll][coll + '_Gran_' + str(gran_idx)],
                    gran_attrs)
    p = subprocess.Popen(
        ['h5diff', '-c', '-p', '1e-5', orig_file, new_file],
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout, _ = p.communicate()
    assert not stdout
    assert p.returncode == 0


def test_can_reproduce_rdr_from_class(tmpdir):
    class_rdr_file = 'RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5'
    verify_rdr_reproduction(class_rdr_file, str(tmpdir), aggr_level=1)


class TestGranulation(object):
    """Test granule time computations using IETs from IDPS CrIS granules"""

    def test_get_granule_start(self):
        gran = 1880240293174000  # some actual CrIS granule times
        prev_gran = 1880240261177000
        next_gran = 1880240325171000
        def run(t):
            return m.get_granule_start('snpp', self.cris_gran_len, t)
        assert run(gran) == gran
        assert run(gran + 1) == gran
        assert run(gran - 1) == prev_gran
        assert run(gran + self.cris_gran_len) == next_gran

    def test_get_aggregate_start(self):
        aggr = 1880240037198000
        aggr_level = 15
        def run(t):
            return m.get_aggregate_start('snpp', self.cris_gran_len, aggr_level, t)
        aggr_len = aggr_level * self.cris_gran_len
        assert run(aggr - 1) == aggr - aggr_len
        assert run(aggr) == aggr
        assert run(aggr + aggr_len - 1) == aggr
        assert run(aggr + aggr_len) == aggr + aggr_len

    cris_gran_len = 31997000


class TestViirsGroupedPacketTimeTracker(object):

    def test_check_sequence_number(self):
        group_size = 24
        def run(nonfirst_seq_num, first_seq_num):
            return m.ViirsGroupedPacketTimeTracker.check_sequence_number(
                nonfirst_seq_num, first_seq_num, group_size)

        first_seq = 4096
        assert not run(4095, first_seq)
        assert run(4097, first_seq)
        assert run(4119, first_seq)
        assert not run(4120, first_seq)

        max_seq = 2**14
        first_seq = max_seq - 16
        assert not run(max_seq - 17, first_seq)
        assert run(max_seq - 15, first_seq)
        assert run(max_seq - 1, first_seq)
        assert run(0, first_seq)
        assert run(7, first_seq)
        assert not run(8, first_seq)

    def test_get_viirs_iet(self):
        def run(pkt):
            return m.iet_to_datetime(m.ViirsGroupedPacketTimeTracker.get_viirs_iet(pkt))

        with open(self.l0_path, 'rb') as l0_file:
            stream = jpss_packet_stream(l0_file)
            standalone_pkt = next(p for p in stream if p.is_standalone())
            first_pkt = next(p for p in stream if p.is_first())
            nonfirst_pkt = next(p for p in stream if p.is_continuing())

        # expected values haven't been independently verified, just looked at
        # to see that they at least make sense
        assert run(standalone_pkt) == datetime(2017, 9, 27, 13, 54, 1, 727765)
        assert run(first_pkt) == datetime(2017, 9, 27, 13, 54, 1, 746898)
        assert run(nonfirst_pkt) == datetime(2017, 9, 27, 13, 54, 1, 748328)

    l0_file = 'P1570826VIIRSSCIENCE6T17270135400001.PDS'
    l0_path = os.path.join(os.path.dirname(__file__), l0_file)