Skip to content
Snippets Groups Projects
  • Bruce Flynn's avatar
    341356fa
    MODIS merging support · 341356fa
    Bruce Flynn authored
    Squashed commit of the following:
    
    commit 76ad8de8
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Thu May 6 15:37:10 2021 -0500
    
        test, then push
    
    commit e3a51b48
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Thu May 6 15:31:48 2021 -0500
    
        rdrgen Fix attr comparison issues, support additional mission names
    
    commit bb6b5e6d
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Thu May 6 14:11:49 2021 -0500
    
        modis merging working and tested
    
    commit 58143956
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Wed May 5 13:13:23 2021 -0500
    
        fix bad attr
    
    commit f3bdd86d
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Wed May 5 11:21:20 2021 -0500
    
        add module for EOS utils
    
    commit b18d65bb
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Wed May 5 11:20:59 2021 -0500
    
        merge: support aqua by using seqid in pointer
    341356fa
    History
    MODIS merging support
    Bruce Flynn authored
    Squashed commit of the following:
    
    commit 76ad8de8
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Thu May 6 15:37:10 2021 -0500
    
        test, then push
    
    commit e3a51b48
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Thu May 6 15:31:48 2021 -0500
    
        rdrgen Fix attr comparison issues, support additional mission names
    
    commit bb6b5e6d
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Thu May 6 14:11:49 2021 -0500
    
        modis merging working and tested
    
    commit 58143956
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Wed May 5 13:13:23 2021 -0500
    
        fix bad attr
    
    commit f3bdd86d
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Wed May 5 11:21:20 2021 -0500
    
        add module for EOS utils
    
    commit b18d65bb
    Author: Bruce Flynn <brucef@ssec.wisc.edu>
    Date:   Wed May 5 11:20:59 2021 -0500
    
        merge: support aqua by using seqid in pointer
jpssrdr.py 13.27 KiB
"""
Code for reading/writing/manipulating JPSS Common RDR files as documented in:

    Joint Polar Satellite System (JPSS)
    Common Data Format Control Book - External (CDFCB-X)
    Volume II - RDR Formats

    http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf
"""
import ctypes as c
import glob
import logging
import os
from collections import namedtuple

import numpy as np
from h5py import File as H5File

from edosl0util.stream import jpss_packet_stream

from .headers import BaseStruct
from .merge import VIIRS_APID_ORDER, merge

__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."


LOG = logging.getLogger(__name__)

satellite_to_scid = {"npp": 157, "snpp": 157, "j01": 159, "jpss1": 159}


class StaticHeader(BaseStruct):
    """
    Common RDR static header.
    """

    _fields_ = [
        ("satellite", c.c_char * 4),
        ("sensor", c.c_char * 16),
        ("type_id", c.c_char * 16),
        ("num_apids", c.c_uint32),
        ("apid_list_offset", c.c_uint32),
        ("pkt_tracker_offset", c.c_uint32),
        ("ap_storage_offset", c.c_uint32),
        ("next_pkt_pos", c.c_uint32),
        ("start_boundary", c.c_uint64),
        ("end_boundary", c.c_uint64),
    ]


class Apid(BaseStruct):
    """
    Entry in the ApidList storage area.
    """

    _fields_ = [
        ("name", c.c_char * 16),
        ("value", c.c_uint32),
        ("pkt_tracker_start_idx", c.c_uint32),
        ("pkts_reserved", c.c_uint32),
        ("pkts_received", c.c_uint32),
    ]


class PacketTracker(BaseStruct):
    """
    Entry in the PacketTracker storage area.
    """

    _fields_ = [
        ("obs_time", c.c_int64),
        ("sequence_number", c.c_int32),
        ("size", c.c_int32),
        ("offset", c.c_int32),
        ("fill_percent", c.c_int32),
    ]


def _make_packet_impl(size):
    """
    Create a struct for a CCSDS packet. The struct size is dynamic so we need
    a new class for each packet.
    """
    # size minus the CCSDS primary header size
    data_size = size - 6

    class PktImpl(BaseStruct):  # noqa
        _fields_ = [
            ("version", c.c_uint8, 3),
            ("type", c.c_uint8, 1),
            ("secondary_header", c.c_uint8, 1),
            ("apid", c.c_uint16, 11),
            ("sequence_grouping", c.c_uint8, 2),
            ("sequence_number", c.c_uint16, 14),
            ("length_minus1", c.c_uint16),
            ("data", c.c_uint8 * data_size),
        ]

    return PktImpl


Packet = namedtuple("Packet", ("tracker", "packet"))


class CommonRdr(namedtuple("CommonRdr", ("buf", "header", "apids"))):
    """
    A single common rdr as found in a RawApplicationPackets_X dataset.
    """

    def packets_for_apid(self, apid):
        """
        Return a generator that yields tuples of (tracker, packet) for the
        given `Apid`.
        """
        for tracker, packet in _packets_for_apid(self.buf, self.header, apid):
            yield Packet(tracker, packet)

    def packets(self):
        """
        Return a generator that yields `Packet`s for each apid in the
        apid list.
        """
        for apid in self.apids:
            for packet in self.packets_for_apid(apid):
                yield packet


def _packets_for_apid(buf, header, apid):
    """
    Generate tuples of (PacketTracker, Packet)
    """
    t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof(
        PacketTracker
    )
    for idx in range(apid.pkts_received):
        tracker = PacketTracker.from_buffer(buf, t_off)
        t_off += c.sizeof(PacketTracker)
        if tracker.offset < 0:  # -1 == missing
            continue

        p_off = header.ap_storage_offset + tracker.offset
        pkt_impl = _make_packet_impl(tracker.size)

        pkt = pkt_impl.from_buffer(buf, p_off)
        assert c.sizeof(pkt) == tracker.size
        yield tracker, pkt


def _read_apid_list(header, buf):
    """
    Generate Apid-s
    """
    offset = header.apid_list_offset
    for idx in range(header.num_apids):
        yield Apid.from_buffer(buf, offset)
        offset += c.sizeof(Apid)


def _sorted_packet_dataset_names(names):
    names = [k for k in names if k.startswith("RawApplicationPackets_")]
    return sorted(names, key=lambda x: int(x.split("_")[-1]))


def _generate_packet_datasets(group):
    dsnames = group.keys()
    for name in _sorted_packet_dataset_names(dsnames):
        ds = group[name]
        yield name, np.array(ds)


def _find_data_group(fobj, name, sensors=None):
    sensors = sensors or ["viirs", "cris", "atms"]
    for sensor in sensors:
        group = fobj.get("/All_Data/{}-{}-RDR_All".format(sensor.upper(), name.upper()))
        if group:
            return group

        # Some files have CrIS rather than CRIS
        if sensor == "cris":
            group = fobj.get("/All_Data/{}-{}-RDR_All".format("CrIS", name.upper()))
            if group:
                return group


def _find_science_group(fobj):
    return _find_data_group(fobj, "SCIENCE")


def _find_dwell_group(fobj):
    for name in ("HSKDWELL", "IMDWELL", "SSMDWELL"):
        group = _find_data_group(fobj, name, sensors=["cris"])
        if group:
            return group
    group = _find_data_group(fobj, "DWELL", sensors=["atms"])
    if group:
        return group


def _find_diag_group(fobj):
    return _find_data_group(fobj, "DIAGNOSTIC", sensors=["cris", "atms"])


def _find_telemetry_group(fobj):
    return _find_data_group(fobj, "TELEMETRY", sensors=["cris", "atms"])


def _find_spacecraft_group(fobj):
    return _find_data_group(fobj, "DIARY", sensors=["SPACECRAFT"])


def _rdrs_for_packet_dataset(group):
    rdrs = []
    if group:
        for name, buf in _generate_packet_datasets(group):
            try:
                rdr = decode_rdr_blob(buf)
            except ValueError as e:
                LOG.warning("{} ({})".format(e, name))
                continue
            rdrs.append(rdr)
    return rdrs


def decode_rdr_blob(buf):
    if buf.shape[0] < c.sizeof(StaticHeader):
        raise ValueError("Not enough bytes for static header")
    header = StaticHeader.from_buffer(buf)
    apids = _read_apid_list(header, buf)
    return CommonRdr(buf, header, list(apids))


def rdr_datasets(filepath):
    fobj = H5File(filepath, mode="r")
    rdr = dict(
        telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)),
        diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)),
        dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)),
        science=_rdrs_for_packet_dataset(_find_science_group(fobj)),
        ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)),
    )
    fobj.close()
    return rdr


def rdr_ccsds_packets(rdr, skipfill=False):
    """
    Get all CCSDS packets from an `CommonRdr` in packet tracker order.
    """
    for packet in rdr.packets():
        if packet.tracker.fill_percent != 0:
            LOG.debug("fill: %s", packet.tracker)
            if skipfill:
                continue
        yield packet.packet


def sort_packets_by_obs_time(packets):
    """
    Sort `Packet`s in-place by the PacketTracker obs_time.
    """
    return sorted(packets, key=lambda p: p.tracker.obs_time)


def sort_packets_by_apid(packets, order=None):
    """
    Sort packets in-place by apid. An error will be raised if `order` is given
    and packet has an apid not in the list.
    """
    if order:
        return sorted(packets, key=lambda p: order.index(p.packet.apid))
    else:
        return sorted(packets, key=lambda p: p.packet.apid)

def _write_packets(pkts, dest, skipfill):
    for pkt in pkts:
        if pkt.tracker.fill_percent != 0 and skipfill:
            continue
        dest.write(pkt.packet)


def write_rdr_datasets(filepath, ancillary=False, skipfill=False, workdir=None):
    workdir = workdir or "."
    rdrname = os.path.basename(filepath)
    rdrs = rdr_datasets(filepath)

    for typekey in rdrs:
        if not rdrs[typekey]:
            continue
        if ancillary and typekey == "ancillary":
            for idx, rdr in enumerate(rdrs["ancillary"]):
                packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids}
                for apid, pkts in packets.items():
                    LOG.debug(
                        "writing ancillary gran %d %s-%s-%s %d",
                        idx,
                        rdr.header.satellite.decode(),
                        rdr.header.sensor.decode(),
                        rdr.header.type_id.decode(),
                        apid,
                    )
                    pktfile = os.path.join(
                        workdir, "{}.{}{}.pkts".format(rdrname, typekey, apid)
                    )
                    with open(pktfile, "ab") as dest:
                        _write_packets(pkts, dest, skipfill)
        else:
            pktfile = os.path.join(workdir, "{}.{}.pkts".format(rdrname, typekey))
            LOG.debug("writing %s", pktfile)
            with open(pktfile, "ab") as dest:
                for idx, rdr in enumerate(rdrs[typekey]):
                    LOG.debug(
                        "... %s gran %d %s-%s-%s",
                        typekey,
                        idx,
                        rdr.header.satellite.decode(),
                        rdr.header.sensor.decode(),
                        rdr.header.type_id.decode(),
                    )
                    _write_packets(rdr.packets(), dest, skipfill)
    return rdrs


def pdsfilename(product, created):
    if len(product) < 20:
        product = product + "A" * (20 - len(product))
    return "{}XT{:%y%j%H%M%S}001.PDS".format(product, created)


def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir):
    workdir = workdir or "."
    product = "P{}{}".format(satellite_to_scid[satellite], product)
    for filepath in rdrs:
        LOG.info("dumping %s", filepath)
        write_rdr_datasets(filepath, skipfill=True, workdir=workdir)

    # alphanumeric sorting to bootstrap final sort
    inputs = sorted(glob.glob(os.path.join(workdir, filepat)))
    streams = [jpss_packet_stream(open(f, "rb")) for f in inputs]

    pdsname = os.path.join(workdir, pdsfilename(product, start))
    LOG.info("merging to %s", pdsname)
    order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None
    with open(pdsname, "wb") as dest:
        merge(streams, output=dest, trunc_to=[start, end], apid_order=order)
    return pdsname if workdir != "." else pdsname


def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1280CRISHSK"
    return _do_rdr_to_l0(
        "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
    )


def cris_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1291CRISDWELL"
    return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)


def cris_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1289CRISSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )


def atms_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0518ATMSHSK"
    return _do_rdr_to_l0(
        "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
    )


def atms_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0517ATMSDWELL"
    return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)


def atms_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0515ATMSSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )


def viirs_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0826VIIRSSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )


def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None):
    workdir = workdir or "."
    for filepath in rdrs:
        LOG.info("dumping %s", filepath)
        write_rdr_datasets(filepath, ancillary=True, skipfill=True, workdir=workdir)

    filenames = []
    scid = satellite_to_scid[satellite]
    for apid in (0, 8, 11):
        # alphanumeric sorting to bootstrap final sort
        inputs = sorted(
            glob.glob(os.path.join(workdir, "*.ancillary{}.pkts".format(apid)))
        )
        files = [open(f, "rb") for f in inputs]
        streams = [jpss_packet_stream(f) for f in files]

        product = "P{}{:04d}".format(scid, apid)
        pdsname = pdsfilename(product, start)
        LOG.info("merging to %s", pdsname)
        with open(os.path.join(workdir, pdsname), "wb") as dest:
            merge(streams, output=dest, trunc_to=[start, end])
        filenames.append(os.path.join(workdir, pdsname).replace("./", ""))

        # lots of files so make sure they're closed
        [f.close() for f in files]
    return filenames


def rdr_to_l0(satellite, rdrs, start, end):
    filepaths = []
    for product, filepath in {f.split("_", 1)[0]: f for f in rdrs}.items():
        if "RNSCA" in product:
            filepaths += spacecraft_to_l0(satellite, [filepath], start, end)
        if "RVIRS" in product:
            filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end))
        if "RCRIS" in product:
            filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end))
        if "RATMS" in product:
            filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end))
    return filepaths