Something went wrong on our end
-
Bruce Flynn authored
Squashed commit of the following: commit 76ad8de8 Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu May 6 15:37:10 2021 -0500 test, then push commit e3a51b48 Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu May 6 15:31:48 2021 -0500 rdrgen Fix attr comparison issues, support additional mission names commit bb6b5e6d Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu May 6 14:11:49 2021 -0500 modis merging working and tested commit 58143956 Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed May 5 13:13:23 2021 -0500 fix bad attr commit f3bdd86d Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed May 5 11:21:20 2021 -0500 add module for EOS utils commit b18d65bb Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed May 5 11:20:59 2021 -0500 merge: support aqua by using seqid in pointer
Bruce Flynn authoredSquashed commit of the following: commit 76ad8de8 Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu May 6 15:37:10 2021 -0500 test, then push commit e3a51b48 Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu May 6 15:31:48 2021 -0500 rdrgen Fix attr comparison issues, support additional mission names commit bb6b5e6d Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu May 6 14:11:49 2021 -0500 modis merging working and tested commit 58143956 Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed May 5 13:13:23 2021 -0500 fix bad attr commit f3bdd86d Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed May 5 11:21:20 2021 -0500 add module for EOS utils commit b18d65bb Author: Bruce Flynn <brucef@ssec.wisc.edu> Date: Wed May 5 11:20:59 2021 -0500 merge: support aqua by using seqid in pointer
jpssrdr.py 13.27 KiB
"""
Code for reading/writing/manipulating JPSS Common RDR files as documented in:
Joint Polar Satellite System (JPSS)
Common Data Format Control Book - External (CDFCB-X)
Volume II - RDR Formats
http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf
"""
import ctypes as c
import glob
import logging
import os
from collections import namedtuple
import numpy as np
from h5py import File as H5File
from edosl0util.stream import jpss_packet_stream
from .headers import BaseStruct
from .merge import VIIRS_APID_ORDER, merge
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
LOG = logging.getLogger(__name__)
satellite_to_scid = {"npp": 157, "snpp": 157, "j01": 159, "jpss1": 159}
class StaticHeader(BaseStruct):
"""
Common RDR static header.
"""
_fields_ = [
("satellite", c.c_char * 4),
("sensor", c.c_char * 16),
("type_id", c.c_char * 16),
("num_apids", c.c_uint32),
("apid_list_offset", c.c_uint32),
("pkt_tracker_offset", c.c_uint32),
("ap_storage_offset", c.c_uint32),
("next_pkt_pos", c.c_uint32),
("start_boundary", c.c_uint64),
("end_boundary", c.c_uint64),
]
class Apid(BaseStruct):
"""
Entry in the ApidList storage area.
"""
_fields_ = [
("name", c.c_char * 16),
("value", c.c_uint32),
("pkt_tracker_start_idx", c.c_uint32),
("pkts_reserved", c.c_uint32),
("pkts_received", c.c_uint32),
]
class PacketTracker(BaseStruct):
"""
Entry in the PacketTracker storage area.
"""
_fields_ = [
("obs_time", c.c_int64),
("sequence_number", c.c_int32),
("size", c.c_int32),
("offset", c.c_int32),
("fill_percent", c.c_int32),
]
def _make_packet_impl(size):
"""
Create a struct for a CCSDS packet. The struct size is dynamic so we need
a new class for each packet.
"""
# size minus the CCSDS primary header size
data_size = size - 6
class PktImpl(BaseStruct): # noqa
_fields_ = [
("version", c.c_uint8, 3),
("type", c.c_uint8, 1),
("secondary_header", c.c_uint8, 1),
("apid", c.c_uint16, 11),
("sequence_grouping", c.c_uint8, 2),
("sequence_number", c.c_uint16, 14),
("length_minus1", c.c_uint16),
("data", c.c_uint8 * data_size),
]
return PktImpl
Packet = namedtuple("Packet", ("tracker", "packet"))
class CommonRdr(namedtuple("CommonRdr", ("buf", "header", "apids"))):
"""
A single common rdr as found in a RawApplicationPackets_X dataset.
"""
def packets_for_apid(self, apid):
"""
Return a generator that yields tuples of (tracker, packet) for the
given `Apid`.
"""
for tracker, packet in _packets_for_apid(self.buf, self.header, apid):
yield Packet(tracker, packet)
def packets(self):
"""
Return a generator that yields `Packet`s for each apid in the
apid list.
"""
for apid in self.apids:
for packet in self.packets_for_apid(apid):
yield packet
def _packets_for_apid(buf, header, apid):
"""
Generate tuples of (PacketTracker, Packet)
"""
t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof(
PacketTracker
)
for idx in range(apid.pkts_received):
tracker = PacketTracker.from_buffer(buf, t_off)
t_off += c.sizeof(PacketTracker)
if tracker.offset < 0: # -1 == missing
continue
p_off = header.ap_storage_offset + tracker.offset
pkt_impl = _make_packet_impl(tracker.size)
pkt = pkt_impl.from_buffer(buf, p_off)
assert c.sizeof(pkt) == tracker.size
yield tracker, pkt
def _read_apid_list(header, buf):
"""
Generate Apid-s
"""
offset = header.apid_list_offset
for idx in range(header.num_apids):
yield Apid.from_buffer(buf, offset)
offset += c.sizeof(Apid)
def _sorted_packet_dataset_names(names):
names = [k for k in names if k.startswith("RawApplicationPackets_")]
return sorted(names, key=lambda x: int(x.split("_")[-1]))
def _generate_packet_datasets(group):
dsnames = group.keys()
for name in _sorted_packet_dataset_names(dsnames):
ds = group[name]
yield name, np.array(ds)
def _find_data_group(fobj, name, sensors=None):
sensors = sensors or ["viirs", "cris", "atms"]
for sensor in sensors:
group = fobj.get("/All_Data/{}-{}-RDR_All".format(sensor.upper(), name.upper()))
if group:
return group
# Some files have CrIS rather than CRIS
if sensor == "cris":
group = fobj.get("/All_Data/{}-{}-RDR_All".format("CrIS", name.upper()))
if group:
return group
def _find_science_group(fobj):
return _find_data_group(fobj, "SCIENCE")
def _find_dwell_group(fobj):
for name in ("HSKDWELL", "IMDWELL", "SSMDWELL"):
group = _find_data_group(fobj, name, sensors=["cris"])
if group:
return group
group = _find_data_group(fobj, "DWELL", sensors=["atms"])
if group:
return group
def _find_diag_group(fobj):
return _find_data_group(fobj, "DIAGNOSTIC", sensors=["cris", "atms"])
def _find_telemetry_group(fobj):
return _find_data_group(fobj, "TELEMETRY", sensors=["cris", "atms"])
def _find_spacecraft_group(fobj):
return _find_data_group(fobj, "DIARY", sensors=["SPACECRAFT"])
def _rdrs_for_packet_dataset(group):
rdrs = []
if group:
for name, buf in _generate_packet_datasets(group):
try:
rdr = decode_rdr_blob(buf)
except ValueError as e:
LOG.warning("{} ({})".format(e, name))
continue
rdrs.append(rdr)
return rdrs
def decode_rdr_blob(buf):
if buf.shape[0] < c.sizeof(StaticHeader):
raise ValueError("Not enough bytes for static header")
header = StaticHeader.from_buffer(buf)
apids = _read_apid_list(header, buf)
return CommonRdr(buf, header, list(apids))
def rdr_datasets(filepath):
fobj = H5File(filepath, mode="r")
rdr = dict(
telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)),
diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)),
dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)),
science=_rdrs_for_packet_dataset(_find_science_group(fobj)),
ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)),
)
fobj.close()
return rdr
def rdr_ccsds_packets(rdr, skipfill=False):
"""
Get all CCSDS packets from an `CommonRdr` in packet tracker order.
"""
for packet in rdr.packets():
if packet.tracker.fill_percent != 0:
LOG.debug("fill: %s", packet.tracker)
if skipfill:
continue
yield packet.packet
def sort_packets_by_obs_time(packets):
"""
Sort `Packet`s in-place by the PacketTracker obs_time.
"""
return sorted(packets, key=lambda p: p.tracker.obs_time)
def sort_packets_by_apid(packets, order=None):
"""
Sort packets in-place by apid. An error will be raised if `order` is given
and packet has an apid not in the list.
"""
if order:
return sorted(packets, key=lambda p: order.index(p.packet.apid))
else:
return sorted(packets, key=lambda p: p.packet.apid)
def _write_packets(pkts, dest, skipfill):
for pkt in pkts:
if pkt.tracker.fill_percent != 0 and skipfill:
continue
dest.write(pkt.packet)
def write_rdr_datasets(filepath, ancillary=False, skipfill=False, workdir=None):
workdir = workdir or "."
rdrname = os.path.basename(filepath)
rdrs = rdr_datasets(filepath)
for typekey in rdrs:
if not rdrs[typekey]:
continue
if ancillary and typekey == "ancillary":
for idx, rdr in enumerate(rdrs["ancillary"]):
packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids}
for apid, pkts in packets.items():
LOG.debug(
"writing ancillary gran %d %s-%s-%s %d",
idx,
rdr.header.satellite.decode(),
rdr.header.sensor.decode(),
rdr.header.type_id.decode(),
apid,
)
pktfile = os.path.join(
workdir, "{}.{}{}.pkts".format(rdrname, typekey, apid)
)
with open(pktfile, "ab") as dest:
_write_packets(pkts, dest, skipfill)
else:
pktfile = os.path.join(workdir, "{}.{}.pkts".format(rdrname, typekey))
LOG.debug("writing %s", pktfile)
with open(pktfile, "ab") as dest:
for idx, rdr in enumerate(rdrs[typekey]):
LOG.debug(
"... %s gran %d %s-%s-%s",
typekey,
idx,
rdr.header.satellite.decode(),
rdr.header.sensor.decode(),
rdr.header.type_id.decode(),
)
_write_packets(rdr.packets(), dest, skipfill)
return rdrs
def pdsfilename(product, created):
if len(product) < 20:
product = product + "A" * (20 - len(product))
return "{}XT{:%y%j%H%M%S}001.PDS".format(product, created)
def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end, workdir):
workdir = workdir or "."
product = "P{}{}".format(satellite_to_scid[satellite], product)
for filepath in rdrs:
LOG.info("dumping %s", filepath)
write_rdr_datasets(filepath, skipfill=True, workdir=workdir)
# alphanumeric sorting to bootstrap final sort
inputs = sorted(glob.glob(os.path.join(workdir, filepat)))
streams = [jpss_packet_stream(open(f, "rb")) for f in inputs]
pdsname = os.path.join(workdir, pdsfilename(product, start))
LOG.info("merging to %s", pdsname)
order = VIIRS_APID_ORDER if "VIIRSSCIENCE" in product else None
with open(pdsname, "wb") as dest:
merge(streams, output=dest, trunc_to=[start, end], apid_order=order)
return pdsname if workdir != "." else pdsname
def cris_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
product = "1280CRISHSK"
return _do_rdr_to_l0(
"*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
)
def cris_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
product = "1291CRISDWELL"
return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)
def cris_sci_to_l0(satellite, rdrs, start, end, workdir=None):
product = "1289CRISSCIENCE"
return _do_rdr_to_l0(
"*.science.pkts", satellite, product, rdrs, start, end, workdir
)
def atms_hsk_to_l0(satellite, rdrs, start, end, workdir=None):
product = "0518ATMSHSK"
return _do_rdr_to_l0(
"*.telemetry.pkts", satellite, product, rdrs, start, end, workdir
)
def atms_dwell_to_l0(satellite, rdrs, start, end, workdir=None):
product = "0517ATMSDWELL"
return _do_rdr_to_l0("*.dwell.pkts", satellite, product, rdrs, start, end, workdir)
def atms_sci_to_l0(satellite, rdrs, start, end, workdir=None):
product = "0515ATMSSCIENCE"
return _do_rdr_to_l0(
"*.science.pkts", satellite, product, rdrs, start, end, workdir
)
def viirs_sci_to_l0(satellite, rdrs, start, end, workdir=None):
product = "0826VIIRSSCIENCE"
return _do_rdr_to_l0(
"*.science.pkts", satellite, product, rdrs, start, end, workdir
)
def spacecraft_to_l0(satellite, rdrs, start, end, workdir=None):
workdir = workdir or "."
for filepath in rdrs:
LOG.info("dumping %s", filepath)
write_rdr_datasets(filepath, ancillary=True, skipfill=True, workdir=workdir)
filenames = []
scid = satellite_to_scid[satellite]
for apid in (0, 8, 11):
# alphanumeric sorting to bootstrap final sort
inputs = sorted(
glob.glob(os.path.join(workdir, "*.ancillary{}.pkts".format(apid)))
)
files = [open(f, "rb") for f in inputs]
streams = [jpss_packet_stream(f) for f in files]
product = "P{}{:04d}".format(scid, apid)
pdsname = pdsfilename(product, start)
LOG.info("merging to %s", pdsname)
with open(os.path.join(workdir, pdsname), "wb") as dest:
merge(streams, output=dest, trunc_to=[start, end])
filenames.append(os.path.join(workdir, pdsname).replace("./", ""))
# lots of files so make sure they're closed
[f.close() for f in files]
return filenames
def rdr_to_l0(satellite, rdrs, start, end):
filepaths = []
for product, filepath in {f.split("_", 1)[0]: f for f in rdrs}.items():
if "RNSCA" in product:
filepaths += spacecraft_to_l0(satellite, [filepath], start, end)
if "RVIRS" in product:
filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end))
if "RCRIS" in product:
filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end))
if "RATMS" in product:
filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end))
return filepaths