Skip to content
Snippets Groups Projects
rdrgen.py 12.36 KiB
import ctypes
import itertools
import os
from collections import OrderedDict
from datetime import datetime

import attr
import h5py
import numpy as np
from astropy.time import Time, TimeDelta

import edosl0util
from edosl0util.jpssrdr import (
    StaticHeader, Apid as ApidListItem, PacketTracker, decode_rdr_blob)


def write_rdr(sat, blob, dir_path='.',
              distributor=None, origin=None, domain=None, compressed=False, orbit_num=0,
              creation_time=None, gran_creation_time=None, software_ver=None):
    # TODO: write out the user block too??
    distributor = distributor or origin or default_origin
    origin = origin or default_origin
    domain = domain or default_domain
    creation_time = creation_time or datetime.now()
    gran_creation_time = gran_creation_time or creation_time
    software_ver = (software_ver
                    or edosl0util.__name__ + '-' + edosl0util.__version__)
    blob_info = decode_rdr_blob(blob)
    rdr_type = SpacecraftDiaryRdrType()  # TODO: select via APID
    gran_iet = blob_info.header.start_boundary
    gran_end_iet = blob_info.header.end_boundary
    gran_time = iet_to_datetime(gran_iet)
    gran_end_time = iet_to_datetime(gran_end_iet)
    gran_id = make_granule_id(sat, gran_iet)
    gran_ver = 'A1'
    format_date = lambda t: t.strftime('%Y%m%d')
    format_time = lambda t: t.strftime('%H%M%S.%fZ')
    file_name = make_rdr_filename(rdr_type.product_id, sat, gran_time, gran_end_time,
                                  orbit_num, creation_time, origin, domain, compressed)
    with h5py.File(os.path.join(dir_path, file_name), 'w') as h5_file:
        set_h5_attrs(h5_file, {
            'Distributor': distributor,
            'Mission_Name': 'S-NPP/JPSS',
            'N_Dataset_Source': origin,
            'N_HDF_Creation_Date': format_date(creation_time),
            'N_HDF_Creation_Time': format_time(creation_time),
            'Platform_Short_Name': platform_short_names[sat]})
        all_grp = h5_file.create_group('All_Data')
        raw_grp = all_grp.create_group(rdr_type.short_name + '_All')
        raw_ds = raw_grp.create_dataset(
            'RawApplicationPackets_0', data=blob, maxshape=[None])
        prod_grp = h5_file.create_group('Data_Products')
        gran_grp = prod_grp.create_group(rdr_type.short_name)
        set_h5_attrs(gran_grp, {
            'Instrument_Short_Name': instrument_short_names[rdr_type.sensor],
            'N_Collection_Short_Name': rdr_type.short_name,
            'N_Dataset_Type_Tag': 'RDR',
            'N_Processing_Domain': domain})
        gran_ds = gran_grp.create_dataset(
            rdr_type.short_name + '_Gran_0', [1], h5_regionref_dtype, maxshape=[None])
        gran_ds[0] = raw_ds.regionref[:]
        set_h5_attrs(gran_ds, {
            'Beginning_Date': format_date(gran_time),
            'Beginning_Time': format_time(gran_time),
            'Ending_Date': format_date(gran_end_time),
            'Ending_Time': format_time(gran_end_time),
            'N_Beginning_Orbit_Number': np.uint64(orbit_num),  # FIXME: detect?
            'N_Beginning_Time_IET': np.uint64(gran_iet),
            'N_Creation_Date': format_date(gran_creation_time),
            'N_Creation_Time': format_time(gran_creation_time),
            'N_Ending_Time_IET': np.uint64(gran_end_iet),
            'N_Granule_ID': gran_id,
            'N_Granule_Status': 'N/A',
            'N_Granule_Version': gran_ver,
            'N_IDPS_Mode': domain,
            'N_JPSS_Document_Ref': rdr_type.document,
            'N_LEOA_Flag': 'Off',
            'N_Packet_Type': [a.name for a in blob_info.apids],
            'N_Packet_Type_Count': [np.uint64(a.pkts_received) for a in blob_info.apids],
            'N_Percent_Missing_Data': np.float32(calc_percent_missing(blob_info)),
            'N_Primary_Label': 'Primary',  # TODO: find out what this is
            'N_Reference_ID': ':'.join([rdr_type.short_name, gran_id, gran_ver]),
            'N_Software_Version': software_ver})
        aggr_ds = gran_grp.create_dataset(
            rdr_type.short_name + '_Aggr', [1], h5_ref_dtype, maxshape=[None])
        aggr_ds[0] = raw_grp.ref
        set_h5_attrs(aggr_ds, {
            'AggregateBeginningDate': format_date(gran_time),
            'AggregateBeginningGranuleID': gran_id,
            'AggregateBeginningOrbitNumber': np.uint64(orbit_num),
            'AggregateBeginningTime': format_time(gran_time),
            'AggregateEndingDate': format_date(gran_end_time),
            'AggregateEndingGranuleID': gran_id,
            'AggregateEndingOrbitNumber': np.uint64(orbit_num),
            'AggregateEndingTime': format_time(gran_end_time),
            'AggregateNumberGranules': np.uint64(1)})
    return file_name


def set_h5_attrs(h5_obj, attrs):
    # for some reason all the H5 attributes are in 2-D arrays in IDPS files
    for name, value in attrs.items():
        if isinstance(value, list):
            value = [[x] for x in value]
        else:
            value = [[value]]
        h5_obj.attrs[name] = value


def build_rdr_blob(sat, pkt_stream):
    first_pkt = next(pkt_stream)  # FIXME: what if there are no packets?
    rdr_type = SpacecraftDiaryRdrType()  # TODO: select based on first packet APID
    granule_iet = calc_iet_granule(satellite_base_times[sat], rdr_type.gran_len,
                                   timecode_to_iet(first_pkt.secondary_header.timecode))
    granule_iet_end = granule_iet + rdr_type.gran_len

    total_pkt_size = 0
    apid_info = OrderedDict()
    total_trackers = 0
    all_pkts = []
    for apid in rdr_type.apids:
        apid_info[apid.num] = {
            'name': apid.name,
            'pkts_reserved': apid.max_expected, 'pkts_received': 0,
            'first_tracker_index': total_trackers,
            'pkt_info': [{} for _ in range(apid.max_expected)]}
        total_trackers += apid.max_expected

    for pkt in itertools.chain([first_pkt], pkt_stream):
        if pkt.apid not in apid_info:
            raise ValueError(
                'APID {} not expected for {}'.format(pkt.apid, rdr_type.short_name))
        pkt_iet = timecode_to_iet(pkt.secondary_header.timecode)
        if not granule_iet <= pkt_iet < granule_iet_end:
            raise ValueError('packet stream crosses granule boundary')
        info = apid_info[pkt.apid]
        pkt_info = info['pkt_info'][info['pkts_received']]
        pkt_info['obs_time'] = pkt_iet
        pkt_info['seq_num'] = pkt.seqid
        pkt_info['size'] = pkt.size
        pkt_info['offset'] = total_pkt_size
        info['pkts_received'] += 1
        total_pkt_size += pkt.size
        all_pkts.append(pkt)

    apid_list_offset = ctypes.sizeof(StaticHeader)
    pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem)
    ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof(PacketTracker)
    buf_size = ap_storage_offset + total_pkt_size
    buf = np.zeros([buf_size], np.uint8)  # zeros needed to null-pad strings

    header = StaticHeader.from_buffer(buf)
    header.satellite = platform_short_names[sat]
    header.sensor = instrument_short_names[rdr_type.sensor]
    header.type_id = rdr_type.type_id
    header.num_apids = len(apid_info)
    header.apid_list_offset = apid_list_offset
    header.pkt_tracker_offset = pkt_tracker_offset
    header.ap_storage_offset = ap_storage_offset
    header.next_pkt_pos = total_pkt_size
    header.start_boundary = granule_iet
    header.end_boundary = granule_iet_end

    for i, (apid, info) in enumerate(apid_info.items()):
        offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem)
        item = ApidListItem.from_buffer(buf, offset)
        item.name = info['name']
        item.value = apid
        item.pkt_tracker_start_idx = info['first_tracker_index']
        item.pkts_reserved = info['pkts_reserved']
        item.pkts_received = info['pkts_received']

        for j, pkt_info in enumerate(info['pkt_info']):
            offset = (header.pkt_tracker_offset
                      + (info['first_tracker_index'] + j) * ctypes.sizeof(PacketTracker))
            tracker = PacketTracker.from_buffer(buf, offset)
            if pkt_info:
                tracker.obs_time = pkt_info['obs_time']
                tracker.sequence_number = pkt_info['seq_num']
                tracker.size = pkt_info['size']
                tracker.offset = pkt_info['offset']
                tracker.fill_percent = 0
            else:
                tracker.offset = -1

    buf[ap_storage_offset:] = bytearray().join(pkt.bytes() for pkt in all_pkts)

    return buf


def get_cris_science_apids():
    return ([ApidSpec(1289, 'EIGHT_S_SCI', 5), ApidSpec(1290, 'ENG', 1)]
            + get_cris_obs_apids())


def get_cris_obs_apids():
    view_types = ['N', 'S', 'C']  # "normal", "space", "calibration"
    bands = ['LW', 'MW', 'SW']
    num_fovs = 9
    base_apid = 1315
    apids = []
    for i in range(len(view_types) * len(bands) * num_fovs):
        apid = base_apid + i
        view_type = view_types[i // (num_fovs * len(bands))]
        band = bands[i // num_fovs % len(bands)]
        fov = str(i % num_fovs + 1)
        apid_name = view_type + band + fov
        max_expected = get_max_expected_cris_packets(apid_name)
        apids.append(ApidSpec(apid, apid_name, max_expected))
    return apids


def get_max_expected_cris_packets(apid_name):
    if apid_name == 'EIGHT_S_SCI':
        return 5
    elif apid_name == 'ENG':
        return 1
    else:
        view_type = apid_name[0]
        if view_type == 'N':
            return 121
        else:
            return 9


@attr.s
class ApidSpec(object):
    num = attr.ib()
    name = attr.ib()
    max_expected = attr.ib()


class CrisScienceRdrType(object):
    product_id = 'RCRIS'
    short_name = 'CRIS-SCIENCE-RDR'
    gran_len = 31997000
    sensor = 'cris'
    type_id = 'SCIENCE'
    document = '474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf'
    apids = get_cris_science_apids()


class SpacecraftDiaryRdrType(object):
    product_id = 'RNSCA'
    short_name = 'SPACECRAFT-DIARY-RDR'
    gran_len = 20000000
    sensor = None
    type_id = 'DIARY'
    document = '474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200B.pdf'
    apids = [ApidSpec(0, 'CRITICAL', max_expected=21),
             ApidSpec(8, 'ADCS_HKH', max_expected=21),
             ApidSpec(11, 'DIARY', max_expected=21)]


def make_rdr_filename(prod_id, sat, gran_time, gran_end_time, orbit_num, creation_time,
                      origin, domain, compressed):
    sat_strs = {'snpp': 'npp'}
    def format_gran_time(t):
        return t.strftime('%H%M%S') + str(t.microsecond / 100000)
    def format_origin(o):
        if o.endswith('-'):
            return o[:-1] + ('c' if compressed else 'u')
        else:
            return o
    return '{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5'.format(
        p=prod_id, s=sat_strs[sat], d=gran_time, b=format_gran_time(gran_time),
        e=format_gran_time(gran_end_time), n=orbit_num, c=creation_time,
        o=format_origin(origin), m=domain)


def timecode_to_iet(timecode):
    # FIXME: move to timecode.py
    ccsds_epoch = Time('1958-01-01', scale='tai')
    day = Time(ccsds_epoch.jd + timecode.days, scale='utc', format='jd') - iet_epoch
    return int(day.sec * 1e6 + timecode.milliseconds * 1e3 + timecode.microseconds)


def iet_to_datetime(iet):
    return (iet_epoch + TimeDelta(iet * 1e-6, format='sec')).utc.datetime


def calc_iet_granule(base_time, gran_len, iet):
    return (iet - base_time) // gran_len * gran_len + base_time


def calc_iet_aggregate(base_time, gran_len, grans_per_aggr, iet):
    # see "DDS Aggregation Methodology" in CDFCB vol I
    Y = gran_len
    N = grans_per_aggr
    G_s = calc_iet_granule(base_time, Y, iet)
    A_n = G_s // (N * Y)
    O = G_s % Y
    A_s = A_n * (Y * N) + O
    return A_s


def make_granule_id(sat, gran_iet):
    tenths = (gran_iet - satellite_base_times[sat]) // 100000
    return '{}{:012d}'.format(platform_short_names[sat], tenths)


def calc_percent_missing(common_rdr):
    total_received = sum(a.pkts_received for a in common_rdr.apids)
    total_reserved = sum(a.pkts_reserved for a in common_rdr.apids)
    return 100. * (total_reserved - total_received) / total_reserved


iet_epoch = Time('1958-01-01', scale='tai')
satellite_base_times = {'snpp': 1698019234000000}
platform_short_names = {'snpp': 'NPP'}
instrument_short_names = {'cris': 'CrIS', None: 'SPACECRAFT'}
default_origin = 'ssec'
default_domain = 'dev'

h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference)
h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference)