""" Code for reading/writing/manipulating JPSS Common RDR files as documented in: Joint Polar Satellite System (JPSS) Common Data Format Control Book - External (CDFCB-X) Volume II - RDR Formats http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf """ import glob from edosl0util.stream import jpss_packet_stream __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." import ctypes as c import logging import os from collections import namedtuple import numpy as np from h5py import File as H5File from .headers import BaseStruct from .merge import merge, VIIRS_APID_ORDER LOG = logging.getLogger(__name__) satellite_to_scid = { 'npp': 157, 'snpp': 157, 'j01': 159, 'jpss1': 159, } class StaticHeader(BaseStruct): """ Common RDR static header. """ _fields_ = [ ('satellite', c.c_char * 4), ('sensor', c.c_char * 16), ('type_id', c.c_char * 16), ('num_apids', c.c_uint32), ('apid_list_offset', c.c_uint32), ('pkt_tracker_offset', c.c_uint32), ('ap_storage_offset', c.c_uint32), ('next_pkt_pos', c.c_uint32), ('start_boundary', c.c_uint64), ('end_boundary', c.c_uint64), ] class Apid(BaseStruct): """ Entry in the ApidList storage area. """ _fields_ = [ ('name', c.c_char * 16), ('value', c.c_uint32), ('pkt_tracker_start_idx', c.c_uint32), ('pkts_reserved', c.c_uint32), ('pkts_received', c.c_uint32), ] class PacketTracker(BaseStruct): """ Entry in the PacketTracker storage area. """ _fields_ = [ ('obs_time', c.c_int64), ('sequence_number', c.c_int32), ('size', c.c_int32), ('offset', c.c_int32), ('fill_percent', c.c_int32), ] def _make_packet_impl(size): """ Create a struct for a CCSDS packet. The struct size is dynamic so we need a new class for each packet. """ # size minus the CCSDS primary header size data_size = size - 6 class PktImpl(BaseStruct): # noqa _fields_ = [ ('version', c.c_uint8, 3), ('type', c.c_uint8, 1), ('secondary_header', c.c_uint8, 1), ('apid', c.c_uint16, 11), ('sequence_grouping', c.c_uint8, 2), ('sequence_number', c.c_uint16, 14), ('length_minus1', c.c_uint16), ('data', c.c_uint8 * data_size) ] return PktImpl Packet = namedtuple('Packet', ('tracker', 'packet')) class CommonRdr(namedtuple('CommonRdr', ('buf', 'header', 'apids'))): """ A single common rdr as found in a RawApplicationPackets_X dataset. """ def packets_for_apid(self, apid): """ Return a generator that yields tuples of (tracker, packet) for the given `Apid`. """ for tracker, packet in _packets_for_apid(self.buf, self.header, apid): yield Packet(tracker, packet) def packets(self): """ Return a generator that yields `Packet`s for each apid in the apid list. """ for apid in self.apids: for packet in self.packets_for_apid(apid): yield packet def _packets_for_apid(buf, header, apid): """ Generate tuples of (PacketTracker, Packet) """ t_off = header.pkt_tracker_offset + apid.pkt_tracker_start_idx * c.sizeof(PacketTracker) for idx in range(apid.pkts_received): tracker = PacketTracker.from_buffer(buf, t_off) t_off += c.sizeof(PacketTracker) if tracker.offset < 0: # -1 == missing continue p_off = header.ap_storage_offset + tracker.offset pkt_impl = _make_packet_impl(tracker.size) pkt = pkt_impl.from_buffer(buf, p_off) assert c.sizeof(pkt) == tracker.size yield tracker, pkt def _read_apid_list(header, buf): """ Generate Apid-s """ offset = header.apid_list_offset for idx in range(header.num_apids): yield Apid.from_buffer(buf, offset) offset += c.sizeof(Apid) def _sorted_packet_dataset_names(names): names = [k for k in names if k.startswith('RawApplicationPackets_')] return sorted(names, key=lambda x: int(x.split('_')[-1])) def _generate_packet_datasets(group): dsnames = group.keys() for name in _sorted_packet_dataset_names(dsnames): ds = group[name] yield name, np.array(ds) def _find_data_group(fobj, name, sensors=None): sensors = sensors or ['viirs', 'cris', 'atms'] for sensor in sensors: group = fobj.get('/All_Data/{}-{}-RDR_All'.format(sensor.upper(), name.upper())) if group: return group def _find_science_group(fobj): return _find_data_group(fobj, 'SCIENCE') def _find_dwell_group(fobj): for name in ('HSKDWELL', 'IMDWELL', 'SSMDWELL'): group = _find_data_group(fobj, name, sensors=['cris']) if group: return group group = _find_data_group(fobj, 'DWELL', sensors=['atms']) if group: return group def _find_diag_group(fobj): return _find_data_group(fobj, 'DIAGNOSTIC', sensors=['cris', 'atms']) def _find_telemetry_group(fobj): return _find_data_group(fobj, 'TELEMETRY', sensors=['cris', 'atms']) def _find_spacecraft_group(fobj): return _find_data_group(fobj, 'DIARY', sensors=['SPACECRAFT']) def _rdrs_for_packet_dataset(group): rdrs = [] if group: for name, buf in _generate_packet_datasets(group): try: rdr = decode_rdr_blob(buf) except ValueError as e: LOG.warning('{} ({})'.format(e, name)) continue rdrs.append(rdr) return rdrs def decode_rdr_blob(buf): if buf.shape[0] < c.sizeof(StaticHeader): raise ValueError('Not enough bytes for static header') header = StaticHeader.from_buffer(buf) apids = _read_apid_list(header, buf) return CommonRdr(buf, header, list(apids)) def rdr_datasets(filepath): fobj = H5File(filepath) rdr = dict( telemetry=_rdrs_for_packet_dataset(_find_telemetry_group(fobj)), diagnostic=_rdrs_for_packet_dataset(_find_diag_group(fobj)), dwell=_rdrs_for_packet_dataset(_find_dwell_group(fobj)), science=_rdrs_for_packet_dataset(_find_science_group(fobj)), ancillary=_rdrs_for_packet_dataset(_find_spacecraft_group(fobj)), ) fobj.close() return rdr def rdr_ccsds_packets(rdr, skipfill=False): """ Get all CCSDS packets from an `CommonRdr` in packet tracker order. """ for packet in rdr.packets(): if packet.tracker.fill_percent != 0: LOG.debug('fill: %s', packet.tracker) if skipfill: continue yield packet.packet def sort_packets_by_obs_time(packets): """ Sort `Packet`s in-place by the PacketTracker obs_time. """ return sorted(packets, key=lambda p: p.tracker.obs_time) def sort_packets_by_apid(packets, order=None): """ Sort packets in-place by apid. An error will be raised if `order` is given and packet has an apid not in the list. """ if order: return sorted(packets, key=lambda p: order.index(p.packet.apid)) else: return sorted(packets, key=lambda p: p.packet.apid) def _write_packets(pkts, dest, skipfill): for pkt in pkts: if pkt.tracker.fill_percent != 0 and skipfill: continue dest.write(pkt.packet) def write_rdr_datasets(filepath, ancillary=False, skipfill=False): rdrname = os.path.basename(filepath) rdrs = rdr_datasets(filepath) for typekey in rdrs: if not rdrs[typekey]: continue if ancillary and typekey == 'ancillary': for idx, rdr in enumerate(rdrs['ancillary']): packets = {a.value: rdr.packets_for_apid(a) for a in rdr.apids} for apid, pkts in packets.items(): LOG.debug( 'writing ancillary gran %d %s-%s-%s %d', idx, rdr.header.satellite.decode(), rdr.header.sensor.decode(), rdr.header.type_id.decode(), apid) pktfile = '{}.{}{}.pkts'.format(rdrname, typekey, apid) with open(pktfile, 'ab') as dest: _write_packets(pkts, dest, skipfill) else: pktfile = '{}.{}.pkts'.format(rdrname, typekey) LOG.debug('writing %s', pktfile) with open(pktfile, 'wb') as dest: for idx, rdr in enumerate(rdrs[typekey]): LOG.debug( '... %s gran %d %s-%s-%s', typekey, idx, rdr.header.satellite.decode(), rdr.header.sensor.decode(), rdr.header.type_id.decode()) _write_packets(rdr.packets(), dest, skipfill) return rdrs def pdsfilename(product, created): if len(product) < 20: product = product + 'A' * (20 - len(product)) return '{}XT{:%y%j%H%M%S}001.PDS'.format(product, created) def _do_rdr_to_l0(filepat, satellite, product, rdrs, start, end): product = 'P{}{}'.format(satellite_to_scid[satellite], product) for filepath in rdrs: LOG.info('dumping %s', filepath) write_rdr_datasets(filepath, skipfill=True) # alphanumeric sorting to bootstrap final sort inputs = sorted(glob.glob(filepat)) streams = [jpss_packet_stream(open(f, 'rb')) for f in inputs] pdsname = pdsfilename(product, start) LOG.info('merging to %s', pdsname) order = VIIRS_APID_ORDER if 'VIIRSSCIENCE' in product else None with open(pdsname, 'wb') as dest: merge(streams, output=dest, trunc_to=[start, end], apid_order=order) return pdsname def cris_hsk_to_l0(satellite, rdrs, start, end): product = '1280CRISHSK' return _do_rdr_to_l0('*.telemetry.pkts', satellite, product, rdrs, start, end) def cris_dwell_to_l0(satellite, rdrs, start, end): product = '1291CRISDWELL' return _do_rdr_to_l0('*.dwell.pkts', satellite, product, rdrs, start, end) def cris_sci_to_l0(satellite, rdrs, start, end): product = '1289CRISSCIENCE' return _do_rdr_to_l0('*.science.pkts', satellite, product, rdrs, start, end) def atms_hsk_to_l0(satellite, rdrs, start, end): product = '0518ATMSHSK' return _do_rdr_to_l0('*.telemetry.pkts', satellite, product, rdrs, start, end) def atms_dwell_to_l0(satellite, rdrs, start, end): product = '0517ATMSDWELL' return _do_rdr_to_l0('*.dwell.pkts', satellite, product, rdrs, start, end) def atms_sci_to_l0(satellite, rdrs, start, end): product = '0515ATMSSCIENCE' return _do_rdr_to_l0('*.science.pkts', satellite, product, rdrs, start, end) def viirs_sci_to_l0(satellite, rdrs, start, end): product = '0826VIIRSSCIENCE' return _do_rdr_to_l0('*.science.pkts', satellite, product, rdrs, start, end) def spacecraft_to_l0(satellite, rdrs, start, end): for filepath in rdrs: LOG.info('dumping %s', filepath) write_rdr_datasets(filepath, ancillary=True, skipfill=True) filenames = [] scid = satellite_to_scid[satellite] for apid in (0, 8, 11): # alphanumeric sorting to bootstrap final sort inputs = sorted(glob.glob('*.ancillary{}.pkts'.format(apid))) files = [open(f, 'rb') for f in inputs] streams = [jpss_packet_stream(f) for f in files] product = 'P{}{:04d}'.format(scid, apid) pdsname = pdsfilename(product, start) LOG.info('merging to %s', pdsname) with open(pdsname, 'wb') as dest: merge(streams, output=dest, trunc_to=[start, end]) filenames.append(pdsname) # lots of files so make sure they're closed [f.close() for f in files] return filenames def rdr_to_l0(satellite, rdrs, start, end): filepaths = [] for product, filepath in {f.split('_', 1)[0]:f for f in rdrs}.items(): if 'RNSCA' in product: filepaths += spacecraft_to_l0(satellite, [filepath], start, end) if 'RVIRS' in product: filepaths.append(viirs_sci_to_l0(satellite, [filepath], start, end)) if 'RCRIS' in product: filepaths.append(cris_sci_to_l0(satellite, [filepath], start, end)) if 'RATMS' in product: filepaths.append(atms_sci_to_l0(satellite, [filepath], start, end)) return filepaths