""" Code for reading/writing/manipulating JPSS Common RDR files as documented in: Joint Polar Satellite System (JPSS) Common Data Format Control Book - External (CDFCB-X) Volume II - RDR Formats http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf """ import ctypes as c import glob import logging import os from collections import namedtuple import numpy as np from h5py import File as H5File from edosl0util.stream import jpss_packet_stream from .headers import BaseStruct from .merge import VIIRS_APID_ORDER, merge __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." LOG = logging.getLogger(__name__) satellite_to_scid = {"npp": 157, "snpp": 157, "j01": 159, "jpss1": 159} class StaticHeader(BaseStruct): """ Common RDR static header. """ _fields_ = [ ("satellite", c.c_char * 4), ("sensor", c.c_char * 16), ("type_id", c.c_char * 16), ("num_apids", c.c_uint32), ("apid_list_offset", c.c_uint32), ("pkt_tracker_offset", c.c_uint32), ("ap_storage_offset", c.c_uint32), ("next_pkt_pos", c.c_uint32), ("start_boundary", c.c_uint64), ("end_boundary", c.c_uint64), ] class Apid(BaseStruct): """ Entry in the ApidList storage area. """ _fields_ = [ ("name", c.c_char * 16), ("value", c.c_uint32), ("pkt_tracker_start_idx", c.c_uint32), ("pkts_reserved", c.c_uint32), ("pkts_received", c.c_uint32), ] class PacketTracker(BaseStruct): """ Entry in the PacketTracker storage area. """ _fields_ = [ ("obs_time", c.c_int64), ("sequence_number", c.c_int32), ("size", c.c_int32), ("offset", c.c_int32), ("fill_percent", c.c_int32), ] def _make_packet_impl(size): """ Create a struct for a CCSDS packet. The struct size is dynamic so we need a new class for each packet. """ # size minus the CCSDS primary header size data_size = size - 6 class PktImpl(BaseStruct): # noqa _fields_ = [ ("version", c.c_uint8, 3), ("type", c.c_uint8, 1), ("secondary_header", c.c_uint8, 1), ("apid", c.c_uint16, 11), ("sequence_grouping", c.c_uint8, 2), ("sequence_number", c.c_uint16, 14), ("length_minus1", c.c_uint16), ("data", c.c_uint8 * data_size), ] return PktImpl Packet = namedtuple("Packet", ("tracker", "packet")) class CommonRdr(namedtuple("CommonRdr", ("buf", "header", "apids"))): """ A single common rdr as found in a RawApplicationPackets_X dataset. """ def packets_for_apid(self, apid): """ Return a generator that yields tuples of (tracker, packet) for the given `Apid`. """ for tracker, packet in _packets_for_apid(self.buf, self.header, apid): yield Packet(tracker, packet) def packets(self): """ Return a generator that yields `Packet`s for each apid in the apid list. """ for apid in self.apids: for packet in self.packets_for_apid(apid): yield packet def _packets_for_apid(buf, header, apid): """ Generate tuples of (PacketTracker, Packet) """ t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof( PacketTracker ) for idx in range(apid.pkts_received): tracker = PacketTracker.from_buffer(buf, t_off) t_off += c.sizeof(PacketTracker) if tracker.offset < 0: # -1 == missing continue p_off = header.ap_storage_offset + tracker.offset pkt_impl = _make_packet_impl(tracker.size) pkt = pkt_impl.from_buffer(buf, p_off) assert c.sizeof(pkt) == tracker.size yield tracker, pkt def _read_apid_list(header, buf): """ Generate Apid-s """ offset = header.apid_list_offset for idx in range(header.num_apids): yield Apid.from_buffer(buf, offset) offset += c.sizeof(Apid) def _sorted_packet_dataset_names(names): names = [k for k in names if k.startswith("RawApplicationPackets_")] return sorted(names, key=lambda x: int(x.split("_")[-1])) def _generate_packet_datasets(group): dsnames = group.keys() for name in _sorted_packet_dataset_names(dsnames): ds = group[name] yield name, np.array(ds) def _find_data_group(fobj, name, sensors=None): sensors = sensors or ["viirs", "cris", "atms"] for sensor in sensors: group = fobj.get("/All_Data/{}-{}-RDR_All".format(sensor.upper(), name.upper())) if group: return group # Some files have CrIS rather than CRIS if sensor == "cris": group = fobj.get("/All_Data/{}-{}-RDR_All".format("CrIS", name.upper())) if group: return group def _find_science_group(fobj): return _find_data_group(fobj, "SCIENCE") def _find_dwell_group(fobj): for name in ("HSKDWELL", "IMDWELL", "SSMDWELL"): group = _find_data_group(fobj, name, sensors=["cris"]) if group: return group group = _find_data_group(fobj, "DWELL", sensors=["atms"]) if group: return group def _find_diag_group(fobj): return _find_data_group(fobj, "DIAGNOSTIC", sensors=["cris", "atms"]) def _find_telemetry_group(fobj): return _find_data_group(fobj, "TELEMETRY", sensors=["cris", "atms"]) def _find_spacecraft_group(fobj): return _find_data_group(fobj, "DIARY", sensors=["SPACECRAFT"]) def _rdrs_for_packet_dataset(group): rdrs = [] if group: for name, buf in _generate_packet_datasets(group): try: rdr = decode_rdr_blob(buf) except ValueError as e: LOG.warning("{} ({})".format(e, name)) continue rdrs.append(rdr) return rdrs def decode_rdr_blob(buf): if buf.shape[0] < c.sizeof(StaticHeader): raise ValueError("Not enough bytes for static header") header = StaticHeader.from_buffer(buf) apids = _read_apid_list(header, buf) return CommonRdr(buf, header, list(apids)) def rdr_datasets(filepath): fobj = H5File(filepath, mode="r") rdr = dict( telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)), diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)), dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)), science=_rdrs_for_packet_dataset(_find_science_group(fobj)), ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)), ) fobj.close() return rdr def rdr_ccsds_packets(rdr, skipfill=False): """ Get all CCSDS packets from an `CommonRdr` in packet tracker order. """ for packet in rdr.packets(): if packet.tracker.fill_percent != 0: LOG.debug("fill: %s", packet.tracker) if skipfill: continue yield packet.packet def sort_packets_by_obs_time(packets): """ Sort `Packet`s in-place by the PacketTracker obs_time. """ return sorted(packets, key=lambda p: p.tracker.obs_time) def sort_packets_by_apid(packets, order=None): """ Sort packets in-place by apid. An error will be raised if `order` is given and packet has an apid not in the list. """ if order: return sorted(packets, key=lambda p: order.index(p.packet.apid)) else: return sorted(packets, key=lambda p: p.packet.apid) def _write_packets(pkts, dest, skipfill): for pkt in pkts: if pkt.tracker.fill_percent != 0 and skipfill: continue dest.write(pkt.packet) def write_rdr_datasets(filepath, ancillary=False, skipfill=False, workdir=None): workdir = workdir or "." rdrname = os.path.basename(filepath) rdrs = rdr_datasets(filepath) for typekey in rdrs: if not rdrs[typekey]: continue if ancillary and typekey == "ancillary": for idx, rdr in enumerate(rdrs["ancillary"]): packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids} for apid, pkts in packets.items(): LOG.debug( "writing ancillary gran %d %s-%s-%s %d", idx, rdr.header.satellite.decode(), rdr.header.sensor.decode(), rdr.header.type_id.decode(), apid, ) pktfile = os.path.join( workdir, "{}.{}{}.pkts".format(rdrname, typekey, apid) ) with open(pktfile, "ab") as dest: _write_packets(pkts, dest, skipfill) else: pktfile = os.path.join(workdir, "{}.{}.pkts".format(rdrname, typekey)) LOG.debug("writing %s", pktfile) with open(pktfile, "ab") as dest: for idx, rdr in enumerate(rdrs[typekey]): LOG.debug( "... %s gran %d %s-%s-%s", typekey, idx, rdr.header.satellite.decode(), rdr.header.sensor.decode(), rdr.header.type_id.decode(), ) _write_packets(rdr.packets(), dest, skipfill) return rdrs def pdsfilename(product, created): if len(product) < 20: product = product + "A" * (20 - len(product)) return "{}XT{:%y%j%H%M%S}001.PDS".format(product, created) def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir): workdir = workdir or "." product = "P{}{}".format(satellite_to_scid[satellite], product) for filepath in rdrs: LOG.info("dumping %s", filepath) write_rdr_datasets(filepath, skipfill=True, workdir=workdir) # alphanumeric sorting to bootstrap final sort inputs = sorted(glob.glob(os.path.join(workdir, filepat))) streams = [jpss_packet_stream(open(f, "rb")) for f in inputs] pdsname = os.path.join(workdir, pdsfilename(product, start)) LOG.info("merging to %s", pdsname) order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None with open(pdsname, "wb") as dest: merge(streams, output=dest, trunc_to=[start, end], apid_order=order) return pdsname if workdir != "." else pdsname def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None): product = "1280CRISHSK" return _do_rdr_to_l0( "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir ) def cris_dwell_to_l0(satellite, rdrs, start, end, workdir=None): product = "1291CRISDWELL" return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir) def cris_sci_to_l0(satellite, rdrs, start, end, workdir=None): product = "1289CRISSCIENCE" return _do_rdr_to_l0( "*.science.pkts", satellite, product, rdrs, start, end, workdir ) def atms_hsk_to_l0(satellite, rdrs, start, end, workdir=None): product = "0518ATMSHSK" return _do_rdr_to_l0( "*.telemetry.pkts", satellite, product, rdrs, start, end, workdir ) def atms_dwell_to_l0(satellite, rdrs, start, end, workdir=None): product = "0517ATMSDWELL" return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir) def atms_sci_to_l0(satellite, rdrs, start, end, workdir=None): product = "0515ATMSSCIENCE" return _do_rdr_to_l0( "*.science.pkts", satellite, product, rdrs, start, end, workdir ) def viirs_sci_to_l0(satellite, rdrs, start, end, workdir=None): product = "0826VIIRSSCIENCE" return _do_rdr_to_l0( "*.science.pkts", satellite, product, rdrs, start, end, workdir ) def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None): workdir = workdir or "." for filepath in rdrs: LOG.info("dumping %s", filepath) write_rdr_datasets(filepath, ancillary=True, skipfill=True, workdir=workdir) filenames = [] scid = satellite_to_scid[satellite] for apid in (0, 8, 11): # alphanumeric sorting to bootstrap final sort inputs = sorted( glob.glob(os.path.join(workdir, "*.ancillary{}.pkts".format(apid))) ) files = [open(f, "rb") for f in inputs] streams = [jpss_packet_stream(f) for f in files] product = "P{}{:04d}".format(scid, apid) pdsname = pdsfilename(product, start) LOG.info("merging to %s", pdsname) with open(os.path.join(workdir, pdsname), "wb") as dest: merge(streams, output=dest, trunc_to=[start, end]) filenames.append(os.path.join(workdir, pdsname).replace("./", "")) # lots of files so make sure they're closed [f.close() for f in files] return filenames def rdr_to_l0(satellite, rdrs, start, end): filepaths = [] for product, filepath in {f.split("_", 1)[0]: f for f in rdrs}.items(): if "RNSCA" in product: filepaths += spacecraft_to_l0(satellite, [filepath], start, end) if "RVIRS" in product: filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end)) if "RCRIS" in product: filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end)) if "RATMS" in product: filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end)) return filepaths