Skip to content
Snippets Groups Projects
jpssrdr.py 13.3 KiB
Newer Older
Bruce Flynn's avatar
Bruce Flynn committed
"""
Code for reading/writing/manipulating JPSS Common RDR files as documented in:

    Joint Polar Satellite System (JPSS)
    Common Data Format Control Book - External (CDFCB-X)
    Volume II - RDR Formats

    http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf
"""
import ctypes as c
Bruce Flynn's avatar
Bruce Flynn committed
import glob
Bruce Flynn's avatar
Bruce Flynn committed
import logging
import os
Bruce Flynn's avatar
Bruce Flynn committed
from collections import namedtuple

import numpy as np
from h5py import File as H5File

Bruce Flynn's avatar
Bruce Flynn committed
from edosl0util.stream import jpss_packet_stream

from .headers import BaseStruct
Bruce Flynn's avatar
Bruce Flynn committed
from .merge import VIIRS_APID_ORDER, merge

__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."

Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
LOG = logging.getLogger(__name__)
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
satellite_to_scid = {"npp": 157, "snpp": 157, "j01": 159, "jpss1": 159}
Bruce Flynn's avatar
Bruce Flynn committed

class StaticHeader(BaseStruct):
Bruce Flynn's avatar
Bruce Flynn committed
    """
    Common RDR static header.
    """
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
    _fields_ = [
Bruce Flynn's avatar
Bruce Flynn committed
        ("satellite", c.c_char * 4),
        ("sensor", c.c_char * 16),
        ("type_id", c.c_char * 16),
        ("num_apids", c.c_uint32),
        ("apid_list_offset", c.c_uint32),
        ("pkt_tracker_offset", c.c_uint32),
        ("ap_storage_offset", c.c_uint32),
        ("next_pkt_pos", c.c_uint32),
        ("start_boundary", c.c_uint64),
        ("end_boundary", c.c_uint64),
Bruce Flynn's avatar
Bruce Flynn committed
    ]


class Apid(BaseStruct):
Bruce Flynn's avatar
Bruce Flynn committed
    """
    Entry in the ApidList storage area.
    """
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
    _fields_ = [
Bruce Flynn's avatar
Bruce Flynn committed
        ("name", c.c_char * 16),
        ("value", c.c_uint32),
        ("pkt_tracker_start_idx", c.c_uint32),
        ("pkts_reserved", c.c_uint32),
        ("pkts_received", c.c_uint32),
Bruce Flynn's avatar
Bruce Flynn committed
    ]


class PacketTracker(BaseStruct):
Bruce Flynn's avatar
Bruce Flynn committed
    """
    Entry in the PacketTracker storage area.
    """
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
    _fields_ = [
Bruce Flynn's avatar
Bruce Flynn committed
        ("obs_time", c.c_int64),
        ("sequence_number", c.c_int32),
        ("size", c.c_int32),
        ("offset", c.c_int32),
        ("fill_percent", c.c_int32),
def _make_packet_impl(size):
Bruce Flynn's avatar
Bruce Flynn committed
    """
    Create a struct for a CCSDS packet. The struct size is dynamic so we need
    a new class for each packet.
    """
    # size minus the CCSDS primary header size
    data_size = size - 6
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
    class PktImpl(BaseStruct):  # noqa
        _fields_ = [
Bruce Flynn's avatar
Bruce Flynn committed
            ("version", c.c_uint8, 3),
            ("type", c.c_uint8, 1),
            ("secondary_header", c.c_uint8, 1),
            ("apid", c.c_uint16, 11),
            ("sequence_grouping", c.c_uint8, 2),
            ("sequence_number", c.c_uint16, 14),
            ("length_minus1", c.c_uint16),
            ("data", c.c_uint8 * data_size),
Bruce Flynn's avatar
Bruce Flynn committed
        ]
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
Packet = namedtuple("Packet", ("tracker", "packet"))
Bruce Flynn's avatar
Bruce Flynn committed
class CommonRdr(namedtuple("CommonRdr", ("buf", "header", "apids"))):
    """
    A single common rdr as found in a RawApplicationPackets_X dataset.
    """
Bruce Flynn's avatar
Bruce Flynn committed

    def packets_for_apid(self, apid):
        """
        Return a generator that yields tuples of (tracker, packet) for the
        given `Apid`.
        """
        for tracker, packet in _packets_for_apid(self.buf, self.header, apid):
            yield Packet(tracker, packet)
Bruce Flynn's avatar
Bruce Flynn committed

    def packets(self):
        """
        Return a generator that yields `Packet`s for each apid in the
        apid list.
        """
        for apid in self.apids:
            for packet in self.packets_for_apid(apid):
                yield packet
Bruce Flynn's avatar
Bruce Flynn committed


def _packets_for_apid(buf, header, apid):
Bruce Flynn's avatar
Bruce Flynn committed
    """
    Generate tuples of (PacketTracker, Packet)
    """
Bruce Flynn's avatar
Bruce Flynn committed
    t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof(
        PacketTracker
    )
Bruce Flynn's avatar
Bruce Flynn committed
    for idx in range(apid.pkts_received):
        tracker = PacketTracker.from_buffer(buf, t_off)
        t_off += c.sizeof(PacketTracker)
        if tracker.offset < 0:  # -1 == missing
            continue

        p_off = header.ap_storage_offset + tracker.offset
        pkt_impl = _make_packet_impl(tracker.size)

        pkt = pkt_impl.from_buffer(buf, p_off)
        assert c.sizeof(pkt) == tracker.size
        yield tracker, pkt
Bruce Flynn's avatar
Bruce Flynn committed


def _read_apid_list(header, buf):
    """
Bruce Flynn's avatar
Bruce Flynn committed
    Generate Apid-s
Bruce Flynn's avatar
Bruce Flynn committed
    """
    offset = header.apid_list_offset
    for idx in range(header.num_apids):
Bruce Flynn's avatar
Bruce Flynn committed
        yield Apid.from_buffer(buf, offset)
        offset += c.sizeof(Apid)
def _sorted_packet_dataset_names(names):
Bruce Flynn's avatar
Bruce Flynn committed
    names = [k for k in names if k.startswith("RawApplicationPackets_")]
    return sorted(names, key=lambda x: int(x.split("_")[-1]))
def _generate_packet_datasets(group):
    dsnames = group.keys()
    for name in _sorted_packet_dataset_names(dsnames):
        ds = group[name]
Bruce Flynn's avatar
Bruce Flynn committed
        yield name, np.array(ds)
def _find_data_group(fobj, name, sensors=None):
Bruce Flynn's avatar
Bruce Flynn committed
    sensors = sensors or ["viirs", "cris", "atms"]
    for sensor in sensors:
Bruce Flynn's avatar
Bruce Flynn committed
        group = fobj.get("/All_Data/{}-{}-RDR_All".format(sensor.upper(), name.upper()))
        if group:
            return group

        # Some files have CrIS rather than CRIS
Bruce Flynn's avatar
Bruce Flynn committed
        if sensor == "cris":
            group = fobj.get("/All_Data/{}-{}-RDR_All".format("CrIS", name.upper()))
def _find_science_group(fobj):
Bruce Flynn's avatar
Bruce Flynn committed
    return _find_data_group(fobj, "SCIENCE")


def _find_dwell_group(fobj):
Bruce Flynn's avatar
Bruce Flynn committed
    for name in ("HSKDWELL", "IMDWELL", "SSMDWELL"):
        group = _find_data_group(fobj, name, sensors=["cris"])
Bruce Flynn's avatar
Bruce Flynn committed
    group = _find_data_group(fobj, "DWELL", sensors=["atms"])
    if group:
        return group


def _find_diag_group(fobj):
Bruce Flynn's avatar
Bruce Flynn committed
    return _find_data_group(fobj, "DIAGNOSTIC", sensors=["cris", "atms"])


def _find_telemetry_group(fobj):
Bruce Flynn's avatar
Bruce Flynn committed
    return _find_data_group(fobj, "TELEMETRY", sensors=["cris", "atms"])


def _find_spacecraft_group(fobj):
Bruce Flynn's avatar
Bruce Flynn committed
    return _find_data_group(fobj, "DIARY", sensors=["SPACECRAFT"])


def _rdrs_for_packet_dataset(group):
Bruce Flynn's avatar
Bruce Flynn committed
    rdrs = []
Bruce Flynn's avatar
Bruce Flynn committed
        for name, buf in _generate_packet_datasets(group):
Greg Quinn's avatar
Greg Quinn committed
                rdr = decode_rdr_blob(buf)
            except ValueError as e:
Bruce Flynn's avatar
Bruce Flynn committed
                LOG.warning("{} ({})".format(e, name))
Bruce Flynn's avatar
Bruce Flynn committed
                continue
            rdrs.append(rdr)
Bruce Flynn's avatar
Bruce Flynn committed
    return rdrs
Greg Quinn's avatar
Greg Quinn committed
def decode_rdr_blob(buf):
    if buf.shape[0] < c.sizeof(StaticHeader):
Bruce Flynn's avatar
Bruce Flynn committed
        raise ValueError("Not enough bytes for static header")
    header = StaticHeader.from_buffer(buf)
    apids = _read_apid_list(header, buf)
    return CommonRdr(buf, header, list(apids))


def rdr_datasets(filepath):
Bruce Flynn's avatar
Bruce Flynn committed
    fobj = H5File(filepath, mode="r")
    rdr = dict(
        telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)),
        diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)),
        dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)),
        science=_rdrs_for_packet_dataset(_find_science_group(fobj)),
        ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)),
    )
Bruce Flynn's avatar
Bruce Flynn committed
    fobj.close()
Bruce Flynn's avatar
Bruce Flynn committed


def rdr_ccsds_packets(rdr, skipfill=False):
    """
    Get all CCSDS packets from an `CommonRdr` in packet tracker order.
    """
    for packet in rdr.packets():
        if packet.tracker.fill_percent != 0:
Bruce Flynn's avatar
Bruce Flynn committed
            LOG.debug("fill: %s", packet.tracker)
            if skipfill:
                continue
        yield packet.packet
Bruce Flynn's avatar
Bruce Flynn committed


def sort_packets_by_obs_time(packets):
    """
Bruce Flynn's avatar
Bruce Flynn committed
    Sort `Packet`s in-place by the PacketTracker obs_time.
Bruce Flynn's avatar
Bruce Flynn committed
    """
    return sorted(packets, key=lambda p: p.tracker.obs_time)


def sort_packets_by_apid(packets, order=None):
    """
    Sort packets in-place by apid. An error will be raised if `order` is given
    and packet has an apid not in the list.
    """
    if order:
        return sorted(packets, key=lambda p: order.index(p.packet.apid))
    else:
        return sorted(packets, key=lambda p: p.packet.apid)

def _write_packets(pkts, dest, skipfill):
    for pkt in pkts:
        if pkt.tracker.fill_percent != 0 and skipfill:
            continue
        dest.write(pkt.packet)


Bruce Flynn's avatar
Bruce Flynn committed
def write_rdr_datasets(filepath, ancillary=False, skipfill=False, workdir=None):
    workdir = workdir or "."
    rdrname = os.path.basename(filepath)
    rdrs = rdr_datasets(filepath)
    for typekey in rdrs:
        if not rdrs[typekey]:
            continue
Bruce Flynn's avatar
Bruce Flynn committed
        if ancillary and typekey == "ancillary":
            for idx, rdr in enumerate(rdrs["ancillary"]):
                packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids}
                for apid, pkts in packets.items():
                    LOG.debug(
Bruce Flynn's avatar
Bruce Flynn committed
                        "writing ancillary gran %d %s-%s-%s %d",
                        idx,
                        rdr.header.satellite.decode(),
                        rdr.header.sensor.decode(),
                        rdr.header.type_id.decode(),
                        apid,
                    )
Bruce Flynn's avatar
Bruce Flynn committed
                    pktfile = os.path.join(
                        workdir, "{}.{}{}.pkts".format(rdrname, typekey, apid)
                    )
Bruce Flynn's avatar
Bruce Flynn committed
                    with open(pktfile, "ab") as dest:
                        _write_packets(pkts, dest, skipfill)
        else:
Bruce Flynn's avatar
Bruce Flynn committed
            pktfile = os.path.join(workdir, "{}.{}.pkts".format(rdrname, typekey))
Bruce Flynn's avatar
Bruce Flynn committed
            LOG.debug("writing %s", pktfile)
Bruce Flynn's avatar
Bruce Flynn committed
            with open(pktfile, "ab") as dest:
                for idx, rdr in enumerate(rdrs[typekey]):
                    LOG.debug(
Bruce Flynn's avatar
Bruce Flynn committed
                        "... %s gran %d %s-%s-%s",
                        typekey,
                        idx,
                        rdr.header.satellite.decode(),
                        rdr.header.sensor.decode(),
                        rdr.header.type_id.decode(),
                    )
                    _write_packets(rdr.packets(), dest, skipfill)


def pdsfilename(product, created):
    if len(product) < 20:
Bruce Flynn's avatar
Bruce Flynn committed
        product = product + "A" * (20 - len(product))
    return "{}XT{:%y%j%H%M%S}001.PDS".format(product, created)
Bruce Flynn's avatar
Bruce Flynn committed
def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir):
    workdir = workdir or "."
    product = "P{}{}".format(satellite_to_scid[satellite], product)
    for filepath in rdrs:
Bruce Flynn's avatar
Bruce Flynn committed
        LOG.info("dumping %s", filepath)
Bruce Flynn's avatar
Bruce Flynn committed
        write_rdr_datasets(filepath, skipfill=True, workdir=workdir)

    # alphanumeric sorting to bootstrap final sort
Bruce Flynn's avatar
Bruce Flynn committed
    inputs = sorted(glob.glob(os.path.join(workdir, filepat)))
Bruce Flynn's avatar
Bruce Flynn committed
    streams = [jpss_packet_stream(open(f, "rb")) for f in inputs]
Bruce Flynn's avatar
Bruce Flynn committed
    pdsname = os.path.join(workdir, pdsfilename(product, start))
Bruce Flynn's avatar
Bruce Flynn committed
    LOG.info("merging to %s", pdsname)
    order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None
Bruce Flynn's avatar
Bruce Flynn committed
    with open(pdsname, "wb") as dest:
        merge(streams, output=dest, trunc_to=[start, end], apid_order=order)
Bruce Flynn's avatar
Bruce Flynn committed
    return pdsname if workdir != "." else pdsname
Bruce Flynn's avatar
Bruce Flynn committed
def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1280CRISHSK"
    return _do_rdr_to_l0(
        "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
    )
Bruce Flynn's avatar
Bruce Flynn committed
def cris_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1291CRISDWELL"
    return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)
Bruce Flynn's avatar
Bruce Flynn committed
def cris_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "1289CRISSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )
Bruce Flynn's avatar
Bruce Flynn committed
def atms_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0518ATMSHSK"
    return _do_rdr_to_l0(
        "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
    )
Bruce Flynn's avatar
Bruce Flynn committed
def atms_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0517ATMSDWELL"
    return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)
Bruce Flynn's avatar
Bruce Flynn committed
def atms_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0515ATMSSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )
Bruce Flynn's avatar
Bruce Flynn committed
def viirs_sci_to_l0(satellite, rdrs, start, end, workdir=None):
    product = "0826VIIRSSCIENCE"
    return _do_rdr_to_l0(
        "*.science.pkts", satellite, product, rdrs, start, end, workdir
    )
Bruce Flynn's avatar
Bruce Flynn committed
def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None):
    workdir = workdir or "."
    for filepath in rdrs:
Bruce Flynn's avatar
Bruce Flynn committed
        LOG.info("dumping %s", filepath)
Bruce Flynn's avatar
Bruce Flynn committed
        write_rdr_datasets(filepath, ancillary=True, skipfill=True, workdir=workdir)

    filenames = []
    scid = satellite_to_scid[satellite]
    for apid in (0, 8, 11):
        # alphanumeric sorting to bootstrap final sort
Bruce Flynn's avatar
Bruce Flynn committed
        inputs = sorted(
            glob.glob(os.path.join(workdir, "*.ancillary{}.pkts".format(apid)))
        )
Bruce Flynn's avatar
Bruce Flynn committed
        files = [open(f, "rb") for f in inputs]
        streams = [jpss_packet_stream(f) for f in files]

Bruce Flynn's avatar
Bruce Flynn committed
        product = "P{}{:04d}".format(scid, apid)
        pdsname = pdsfilename(product, start)
Bruce Flynn's avatar
Bruce Flynn committed
        LOG.info("merging to %s", pdsname)
        with open(os.path.join(workdir, pdsname), "wb") as dest:
            merge(streams, output=dest, trunc_to=[start, end])
Bruce Flynn's avatar
Bruce Flynn committed
        filenames.append(os.path.join(workdir, pdsname).replace("./", ""))

        # lots of files so make sure they're closed
        [f.close() for f in files]
    return filenames


def rdr_to_l0(satellite, rdrs, start, end):
    filepaths = []
Bruce Flynn's avatar
Bruce Flynn committed
    for product, filepath in {f.split("_", 1)[0]: f for f in rdrs}.items():
        if "RNSCA" in product:
            filepaths += spacecraft_to_l0(satellite, [filepath], start, end)
Bruce Flynn's avatar
Bruce Flynn committed
        if "RVIRS" in product:
            filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end))
        if "RCRIS" in product:
            filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end))
Bruce Flynn's avatar
Bruce Flynn committed
        if "RATMS" in product:
            filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end))
    return filepaths