Skip to content
Snippets Groups Projects
  • Bruce Flynn's avatar
    ec676e24
    initial impl. Needs testing and verification. · ec676e24
    Bruce Flynn authored
    This seems to work both through rdr2l0 and rdrgen in that it produces
    valid RDRs.
    
    I was not able to compare them against what RT-STPS v7 is putting out
    because rdrgen seems to put out 5 additional spacecraft granules over what
    RT-STPS is producing for a VIIRS RDR. I'm guessing that RT-STPS is just dropping
    spacecraft granules that don't overlap the science data or something.
    
    So, this needs a bit more thought and verification
    ec676e24
    History
    initial impl. Needs testing and verification.
    Bruce Flynn authored
    This seems to work both through rdr2l0 and rdrgen in that it produces
    valid RDRs.
    
    I was not able to compare them against what RT-STPS v7 is putting out
    because rdrgen seems to put out 5 additional spacecraft granules over what
    RT-STPS is producing for a VIIRS RDR. I'm guessing that RT-STPS is just dropping
    spacecraft granules that don't overlap the science data or something.
    
    So, this needs a bit more thought and verification
jpssrdr.py 13.37 KiB
"""
Code for reading/writing/manipulating JPSS Common RDR files as documented in:

    Joint Polar Satellite System (JPSS)
    Common Data Format Control Book - External (CDFCB-X)
    Volume II - RDR Formats

    http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf
"""
import ctypes as c
import glob
import logging
import os
from collections import namedtuple

import numpy as np
from h5py import File as H5File

from edosl0util.stream import jpss_packet_stream

from .headers import BaseStruct
from .merge import VIIRS_APID_ORDER, merge

__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."


LOG = logging.getLogger(__name__)

satellite_to_scid = {"npp": 157, "snpp": 157, "j01": 159, "jpss1": 159, "j02": 177, "jpss2": 177}


class StaticHeader(BaseStruct):
    """
    Common RDR static header.
    """

    _fields_ = [
        ("satellite", c.c_char * 4),
        ("sensor", c.c_char * 16),
        ("type_id", c.c_char * 16),
        ("num_apids", c.c_uint32),
        ("apid_list_offset", c.c_uint32),
        ("pkt_tracker_offset", c.c_uint32),
        ("ap_storage_offset", c.c_uint32),
        ("next_pkt_pos", c.c_uint32),
        ("start_boundary", c.c_uint64),
        ("end_boundary", c.c_uint64),
    ]


class Apid(BaseStruct):
    """
    Entry in the ApidList storage area.
    """

    _fields_ = [
        ("name", c.c_char * 16),
        ("value", c.c_uint32),
        ("pkt_tracker_start_idx", c.c_uint32),
        ("pkts_reserved", c.c_uint32),
        ("pkts_received", c.c_uint32),
    ]


class PacketTracker(BaseStruct):
    """
    Entry in the PacketTracker storage area.
    """

    _fields_ = [
        ("obs_time", c.c_int64),
        ("sequence_number", c.c_int32),
        ("size", c.c_int32),
        ("offset", c.c_int32),
        ("fill_percent", c.c_int32),
    ]


def _make_packet_impl(size):
    """
    Create a struct for a CCSDS packet. The struct size is dynamic so we need
    a new class for each packet.
    """
    # size minus the CCSDS primary header size
    data_size = size - 6

    class PktImpl(BaseStruct):  # noqa
        _fields_ = [
            ("version", c.c_uint8, 3),
            ("type", c.c_uint8, 1),
            ("secondary_header", c.c_uint8, 1),
            ("apid", c.c_uint16, 11),
            ("sequence_grouping", c.c_uint8, 2),
            ("sequence_number", c.c_uint16, 14),
            ("length_minus1", c.c_uint16),
            ("data", c.c_uint8 * data_size),
        ]

    return PktImpl


Packet = namedtuple("Packet", ("tracker", "packet"))


class CommonRdr(namedtuple("CommonRdr", ("buf", "header", "apids"))):
    """
    A single common rdr as found in a RawApplicationPackets_X dataset.
    """

    def packets_for_apid(self, apid):
        """
        Return a generator that yields tuples of (tracker, packet) for the
        given `Apid`.
        """
        for tracker, packet in _packets_for_apid(self.buf, self.header, apid):
            yield Packet(tracker, packet)

    def packets(self):
        """
        Return a generator that yields `Packet`s for each apid in the
        apid list.
        """
        for apid in self.apids:
            for packet in self.packets_for_apid(apid):
                yield packet


def _packets_for_apid(buf, header, apid):
    """
    Generate tuples of (PacketTracker, Packet)
    """
    t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof(
        PacketTracker
    )
    for idx in range(apid.pkts_received):
        tracker = PacketTracker.from_buffer(buf, t_off)
        t_off += c.sizeof(PacketTracker)
        if tracker.offset < 0:  # -1 == missing
            continue

        p_off = header.ap_storage_offset + tracker.offset
        pkt_impl = _make_packet_impl(tracker.size)

        pkt = pkt_impl.from_buffer(buf, p_off)
        assert c.sizeof(pkt) == tracker.size
        yield tracker, pkt


def _read_apid_list(header, buf):
    """
    Generate Apid-s
    """
    offset = header.apid_list_offset
    for idx in range(header.num_apids):
        yield Apid.from_buffer(buf, offset)
        offset += c.sizeof(Apid)


def _sorted_packet_dataset_names(names):
    names = [k for k in names if k.startswith("RawApplicationPackets_")]
    return sorted(names, key=lambda x: int(x.split("_")[-1]))


def _generate_packet_datasets(group):
    dsnames = group.keys()
    for name in _sorted_packet_dataset_names(dsnames):
        ds = group[name]
        yield name, np.array(ds)


def _find_data_group(fobj, name, sensors=None):
    sensors = sensors or ["viirs", "cris", "atms"]
    for sensor in sensors:
        group = fobj.get("/All_Data/{}-{}-RDR_All".format(sensor.upper(), name.upper()))
        if group:
            return group

        # Some files have CrIS rather than CRIS
        if sensor == "cris":
            group = fobj.get("/All_Data/{}-{}-RDR_All".format("CrIS", name.upper()))
            if group:
                return group


def _find_science_group(fobj):
    return _find_data_group(fobj, "SCIENCE")


def _find_dwell_group(fobj):
    for name in ("HSKDWELL", "IMDWELL", "SSMDWELL"):
        group = _find_data_group(fobj, name, sensors=["cris"])
        if group:
            return group
    group = _find_data_group(fobj, "DWELL", sensors=["atms"])
    if group:
        return group


def _find_diag_group(fobj):
    return _find_data_group(fobj, "DIAGNOSTIC", sensors=["cris", "atms"])


def _find_telemetry_group(fobj):
    return _find_data_group(fobj, "TELEMETRY", sensors=["cris", "atms"])


def _find_spacecraft_group(fobj):
    return _find_data_group(fobj, "DIARY", sensors=["SPACECRAFT"])


def _rdrs_for_packet_dataset(group):
    rdrs = []
    if group:
        for name, buf in _generate_packet_datasets(group):
            try:
                rdr = decode_rdr_blob(buf)
            except ValueError as e:
                LOG.warning("{} ({})".format(e, name))
                continue
            rdrs.append(rdr)
    return rdrs


def decode_rdr_blob(buf):
    if buf.shape[0] < c.sizeof(StaticHeader):
        raise ValueError("Not enough bytes for static header")
    header = StaticHeader.from_buffer(buf)
    apids = _read_apid_list(header, buf)
    return CommonRdr(buf, header, list(apids))


def rdr_datasets(filepath):
    fobj = H5File(filepath, mode="r")
    rdr = dict(
        telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)),
        diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)),
        dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)),
        science=_rdrs_for_packet_dataset(_find_science_group(fobj)),
        ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)),
    )
    fobj.close()
    return rdr


def rdr_ccsds_packets(rdr, skipfill=False):
    """
    Get all CCSDS packets from an `CommonRdr` in packet tracker order.
    """
    for packet in rdr.packets():
        if packet.tracker.fill_percent != 0:
            LOG.debug("fill: %s", packet.tracker)
            if skipfill:
                continue
        yield packet.packet


def sort_packets_by_obs_time(packets):
    """
    Sort `Packet`s in-place by the PacketTracker obs_time.
    """
    return sorted(packets, key=lambda p: p.tracker.obs_time)


def sort_packets_by_apid(packets, order=None):
    """
    Sort packets in-place by apid. An error will be raised if `order` is given
    and packet has an apid not in the list.
    """
    if order:
        return sorted(packets, key=lambda p: order.index(p.packet.apid))
    else:
        return sorted(packets, key=lambda p: p.packet.apid)

def _write_packets(pkts, dest, skipfill):
    for pkt in pkts:
        if pkt.tracker.fill_percent != 0 and skipfill:
            continue
        dest.write(pkt.packet)


def write_rdr_datasets(filepath, ancillary=False, skipfill=False, workdir=None):
    workdir = workdir or "."
    rdrname = os.path.basename(filepath)
    rdrs = rdr_datasets(filepath)

    for typekey in rdrs:
        if not rdrs[typekey]:
            continue
        if ancillary and typekey == "ancillary":
            for idx, rdr in enumerate(rdrs["ancillary"]):
                packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids}
                for apid, pkts in packets.items():
                    LOG.debug(
                        "writing ancillary gran %d %s-%s-%s %d",
                        idx,
                        rdr.header.satellite.decode(),
                        rdr.header.sensor.decode(),
                        rdr.header.type_id.decode(),
                        apid,
                    )
                    pktfile = os.path.join(
                        workdir, "{}.{}{}.pkts".format(rdrname, typekey, apid)
                    )
                    with open(pktfile, "ab") as dest:
                        _write_packets(pkts, dest, skipfill)
        else:
            pktfile = os.path.join(workdir, "{}.{}.pkts".format(rdrname, typekey))
            LOG.debug("writing %s", pktfile)
            with open(pktfile, "ab") as dest:
                for idx, rdr in enumerate(rdrs[typekey]):
                    LOG.debug(
                        "... %s gran %d %s-%s-%s",
                        typekey,
                        idx,
                        rdr.header.satellite.decode(),
                        rdr.header.sensor.decode(),
                        rdr.header.type_id.decode(),
                    )
                    _write_packets(rdr.packets(), dest, skipfill)
    return rdrs


def pdsfilename(product, created):
    if len(product) < 20:
        product = product + "A" * (20 - len(product))
    return "{}XT{:%y%j%H%M%S}001.PDS".format(product, created)


def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir):
    workdir = workdir or "."
    product = "P{}{}".format(satellite_to_scid[satellite], product)
    for filepath in rdrs:
        LOG.info("dumping %s", filepath)
        write_rdr_datasets(filepath, skipfill=True, workdir=workdir)

    # alphanumeric sorting to bootstrap final sort
    inputs = sorted(glob.glob(os.path.join(workdir, filepat)))
    streams = [jpss_packet_stream(open(f, "rb")) for f in inputs]

    pdsname = os.path.join(workdir, pdsfilename(product, start))
    LOG.info("merging to %s", pdsname)
    order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None
    with open(pdsname, "wb") as dest:
        merge(streams, output=dest, trunc_to=[start, end], apid_order=order)
    return pdsname if workdir != "." else pdsname


def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1280CRISHSK"
    return _do_rdr_to_l0(
        "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
    )


def cris_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1291CRISDWELL"
    return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)


def cris_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1289CRISSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )


def atms_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0518ATMSHSK"
    return _do_rdr_to_l0(
        "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
    )


def atms_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0517ATMSDWELL"
    return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)


def atms_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0515ATMSSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )


def viirs_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0826VIIRSSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )


def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None):
    workdir = workdir or "."
    for filepath in rdrs:
        LOG.info("dumping %s", filepath)
        write_rdr_datasets(filepath, ancillary=True, skipfill=True, workdir=workdir)

    filenames = []
    scid = satellite_to_scid[satellite]
    if scid < 177:
        apids = (0, 8, 11)
    else:
        apids = (11, 30, 37)
    for apid in apids:
        # alphanumeric sorting to bootstrap final sort
        inputs = sorted(
            glob.glob(os.path.join(workdir, "*.ancillary{}.pkts".format(apid)))
        )
        files = [open(f, "rb") for f in inputs]
        streams = [jpss_packet_stream(f) for f in files]

        product = "P{}{:04d}".format(scid, apid)
        pdsname = pdsfilename(product, start)
        LOG.info("merging to %s", pdsname)
        with open(os.path.join(workdir, pdsname), "wb") as dest:
            merge(streams, output=dest, trunc_to=[start, end])
        filenames.append(os.path.join(workdir, pdsname).replace("./", ""))

        # lots of files so make sure they're closed
        [f.close() for f in files]
    return filenames


def rdr_to_l0(satellite, rdrs, start, end):
    filepaths = []
    for product, filepath in {f.split("_", 1)[0]: f for f in rdrs}.items():
        if "RNSCA" in product:
            filepaths += spacecraft_to_l0(satellite, [filepath], start, end)
        if "RVIRS" in product:
            filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end))
        if "RCRIS" in product:
            filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end))
        if "RATMS" in product:
            filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end))
    return filepaths