Something went wrong on our end
-
Greg Quinn authoredGreg Quinn authored
rdrgen.py 12.36 KiB
import ctypes
import itertools
import os
from collections import OrderedDict
from datetime import datetime
import attr
import h5py
import numpy as np
from astropy.time import Time, TimeDelta
import edosl0util
from edosl0util.jpssrdr import (
StaticHeader, Apid as ApidListItem, PacketTracker, decode_rdr_blob)
def write_rdr(sat, blob, dir_path='.',
distributor=None, origin=None, domain=None, compressed=False, orbit_num=0,
creation_time=None, gran_creation_time=None, software_ver=None):
# TODO: write out the user block too??
distributor = distributor or origin or default_origin
origin = origin or default_origin
domain = domain or default_domain
creation_time = creation_time or datetime.now()
gran_creation_time = gran_creation_time or creation_time
software_ver = (software_ver
or edosl0util.__name__ + '-' + edosl0util.__version__)
blob_info = decode_rdr_blob(blob)
rdr_type = SpacecraftDiaryRdrType() # TODO: select via APID
gran_iet = blob_info.header.start_boundary
gran_end_iet = blob_info.header.end_boundary
gran_time = iet_to_datetime(gran_iet)
gran_end_time = iet_to_datetime(gran_end_iet)
gran_id = make_granule_id(sat, gran_iet)
gran_ver = 'A1'
format_date = lambda t: t.strftime('%Y%m%d')
format_time = lambda t: t.strftime('%H%M%S.%fZ')
file_name = make_rdr_filename(rdr_type.product_id, sat, gran_time, gran_end_time,
orbit_num, creation_time, origin, domain, compressed)
with h5py.File(os.path.join(dir_path, file_name), 'w') as h5_file:
set_h5_attrs(h5_file, {
'Distributor': distributor,
'Mission_Name': 'S-NPP/JPSS',
'N_Dataset_Source': origin,
'N_HDF_Creation_Date': format_date(creation_time),
'N_HDF_Creation_Time': format_time(creation_time),
'Platform_Short_Name': platform_short_names[sat]})
all_grp = h5_file.create_group('All_Data')
raw_grp = all_grp.create_group(rdr_type.short_name + '_All')
raw_ds = raw_grp.create_dataset(
'RawApplicationPackets_0', data=blob, maxshape=[None])
prod_grp = h5_file.create_group('Data_Products')
gran_grp = prod_grp.create_group(rdr_type.short_name)
set_h5_attrs(gran_grp, {
'Instrument_Short_Name': instrument_short_names[rdr_type.sensor],
'N_Collection_Short_Name': rdr_type.short_name,
'N_Dataset_Type_Tag': 'RDR',
'N_Processing_Domain': domain})
gran_ds = gran_grp.create_dataset(
rdr_type.short_name + '_Gran_0', [1], h5_regionref_dtype, maxshape=[None])
gran_ds[0] = raw_ds.regionref[:]
set_h5_attrs(gran_ds, {
'Beginning_Date': format_date(gran_time),
'Beginning_Time': format_time(gran_time),
'Ending_Date': format_date(gran_end_time),
'Ending_Time': format_time(gran_end_time),
'N_Beginning_Orbit_Number': np.uint64(orbit_num), # FIXME: detect?
'N_Beginning_Time_IET': np.uint64(gran_iet),
'N_Creation_Date': format_date(gran_creation_time),
'N_Creation_Time': format_time(gran_creation_time),
'N_Ending_Time_IET': np.uint64(gran_end_iet),
'N_Granule_ID': gran_id,
'N_Granule_Status': 'N/A',
'N_Granule_Version': gran_ver,
'N_IDPS_Mode': domain,
'N_JPSS_Document_Ref': rdr_type.document,
'N_LEOA_Flag': 'Off',
'N_Packet_Type': [a.name for a in blob_info.apids],
'N_Packet_Type_Count': [np.uint64(a.pkts_received) for a in blob_info.apids],
'N_Percent_Missing_Data': np.float32(calc_percent_missing(blob_info)),
'N_Primary_Label': 'Primary', # TODO: find out what this is
'N_Reference_ID': ':'.join([rdr_type.short_name, gran_id, gran_ver]),
'N_Software_Version': software_ver})
aggr_ds = gran_grp.create_dataset(
rdr_type.short_name + '_Aggr', [1], h5_ref_dtype, maxshape=[None])
aggr_ds[0] = raw_grp.ref
set_h5_attrs(aggr_ds, {
'AggregateBeginningDate': format_date(gran_time),
'AggregateBeginningGranuleID': gran_id,
'AggregateBeginningOrbitNumber': np.uint64(orbit_num),
'AggregateBeginningTime': format_time(gran_time),
'AggregateEndingDate': format_date(gran_end_time),
'AggregateEndingGranuleID': gran_id,
'AggregateEndingOrbitNumber': np.uint64(orbit_num),
'AggregateEndingTime': format_time(gran_end_time),
'AggregateNumberGranules': np.uint64(1)})
return file_name
def set_h5_attrs(h5_obj, attrs):
# for some reason all the H5 attributes are in 2-D arrays in IDPS files
for name, value in attrs.items():
if isinstance(value, list):
value = [[x] for x in value]
else:
value = [[value]]
h5_obj.attrs[name] = value
def build_rdr_blob(sat, pkt_stream):
first_pkt = next(pkt_stream) # FIXME: what if there are no packets?
rdr_type = SpacecraftDiaryRdrType() # TODO: select based on first packet APID
granule_iet = calc_iet_granule(satellite_base_times[sat], rdr_type.gran_len,
timecode_to_iet(first_pkt.secondary_header.timecode))
granule_iet_end = granule_iet + rdr_type.gran_len
total_pkt_size = 0
apid_info = OrderedDict()
total_trackers = 0
all_pkts = []
for apid in rdr_type.apids:
apid_info[apid.num] = {
'name': apid.name,
'pkts_reserved': apid.max_expected, 'pkts_received': 0,
'first_tracker_index': total_trackers,
'pkt_info': [{} for _ in range(apid.max_expected)]}
total_trackers += apid.max_expected
for pkt in itertools.chain([first_pkt], pkt_stream):
if pkt.apid not in apid_info:
raise ValueError(
'APID {} not expected for {}'.format(pkt.apid, rdr_type.short_name))
pkt_iet = timecode_to_iet(pkt.secondary_header.timecode)
if not granule_iet <= pkt_iet < granule_iet_end:
raise ValueError('packet stream crosses granule boundary')
info = apid_info[pkt.apid]
pkt_info = info['pkt_info'][info['pkts_received']]
pkt_info['obs_time'] = pkt_iet
pkt_info['seq_num'] = pkt.seqid
pkt_info['size'] = pkt.size
pkt_info['offset'] = total_pkt_size
info['pkts_received'] += 1
total_pkt_size += pkt.size
all_pkts.append(pkt)
apid_list_offset = ctypes.sizeof(StaticHeader)
pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem)
ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof(PacketTracker)
buf_size = ap_storage_offset + total_pkt_size
buf = np.zeros([buf_size], np.uint8) # zeros needed to null-pad strings
header = StaticHeader.from_buffer(buf)
header.satellite = platform_short_names[sat]
header.sensor = instrument_short_names[rdr_type.sensor]
header.type_id = rdr_type.type_id
header.num_apids = len(apid_info)
header.apid_list_offset = apid_list_offset
header.pkt_tracker_offset = pkt_tracker_offset
header.ap_storage_offset = ap_storage_offset
header.next_pkt_pos = total_pkt_size
header.start_boundary = granule_iet
header.end_boundary = granule_iet_end
for i, (apid, info) in enumerate(apid_info.items()):
offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem)
item = ApidListItem.from_buffer(buf, offset)
item.name = info['name']
item.value = apid
item.pkt_tracker_start_idx = info['first_tracker_index']
item.pkts_reserved = info['pkts_reserved']
item.pkts_received = info['pkts_received']
for j, pkt_info in enumerate(info['pkt_info']):
offset = (header.pkt_tracker_offset
+ (info['first_tracker_index'] + j) * ctypes.sizeof(PacketTracker))
tracker = PacketTracker.from_buffer(buf, offset)
if pkt_info:
tracker.obs_time = pkt_info['obs_time']
tracker.sequence_number = pkt_info['seq_num']
tracker.size = pkt_info['size']
tracker.offset = pkt_info['offset']
tracker.fill_percent = 0
else:
tracker.offset = -1
buf[ap_storage_offset:] = bytearray().join(pkt.bytes() for pkt in all_pkts)
return buf
def get_cris_science_apids():
return ([ApidSpec(1289, 'EIGHT_S_SCI', 5), ApidSpec(1290, 'ENG', 1)]
+ get_cris_obs_apids())
def get_cris_obs_apids():
view_types = ['N', 'S', 'C'] # "normal", "space", "calibration"
bands = ['LW', 'MW', 'SW']
num_fovs = 9
base_apid = 1315
apids = []
for i in range(len(view_types) * len(bands) * num_fovs):
apid = base_apid + i
view_type = view_types[i // (num_fovs * len(bands))]
band = bands[i // num_fovs % len(bands)]
fov = str(i % num_fovs + 1)
apid_name = view_type + band + fov
max_expected = get_max_expected_cris_packets(apid_name)
apids.append(ApidSpec(apid, apid_name, max_expected))
return apids
def get_max_expected_cris_packets(apid_name):
if apid_name == 'EIGHT_S_SCI':
return 5
elif apid_name == 'ENG':
return 1
else:
view_type = apid_name[0]
if view_type == 'N':
return 121
else:
return 9
@attr.s
class ApidSpec(object):
num = attr.ib()
name = attr.ib()
max_expected = attr.ib()
class CrisScienceRdrType(object):
product_id = 'RCRIS'
short_name = 'CRIS-SCIENCE-RDR'
gran_len = 31997000
sensor = 'cris'
type_id = 'SCIENCE'
document = '474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf'
apids = get_cris_science_apids()
class SpacecraftDiaryRdrType(object):
product_id = 'RNSCA'
short_name = 'SPACECRAFT-DIARY-RDR'
gran_len = 20000000
sensor = None
type_id = 'DIARY'
document = '474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200B.pdf'
apids = [ApidSpec(0, 'CRITICAL', max_expected=21),
ApidSpec(8, 'ADCS_HKH', max_expected=21),
ApidSpec(11, 'DIARY', max_expected=21)]
def make_rdr_filename(prod_id, sat, gran_time, gran_end_time, orbit_num, creation_time,
origin, domain, compressed):
sat_strs = {'snpp': 'npp'}
def format_gran_time(t):
return t.strftime('%H%M%S') + str(t.microsecond / 100000)
def format_origin(o):
if o.endswith('-'):
return o[:-1] + ('c' if compressed else 'u')
else:
return o
return '{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5'.format(
p=prod_id, s=sat_strs[sat], d=gran_time, b=format_gran_time(gran_time),
e=format_gran_time(gran_end_time), n=orbit_num, c=creation_time,
o=format_origin(origin), m=domain)
def timecode_to_iet(timecode):
# FIXME: move to timecode.py
ccsds_epoch = Time('1958-01-01', scale='tai')
day = Time(ccsds_epoch.jd + timecode.days, scale='utc', format='jd') - iet_epoch
return int(day.sec * 1e6 + timecode.milliseconds * 1e3 + timecode.microseconds)
def iet_to_datetime(iet):
return (iet_epoch + TimeDelta(iet * 1e-6, format='sec')).utc.datetime
def calc_iet_granule(base_time, gran_len, iet):
return (iet - base_time) // gran_len * gran_len + base_time
def calc_iet_aggregate(base_time, gran_len, grans_per_aggr, iet):
# see "DDS Aggregation Methodology" in CDFCB vol I
Y = gran_len
N = grans_per_aggr
G_s = calc_iet_granule(base_time, Y, iet)
A_n = G_s // (N * Y)
O = G_s % Y
A_s = A_n * (Y * N) + O
return A_s
def make_granule_id(sat, gran_iet):
tenths = (gran_iet - satellite_base_times[sat]) // 100000
return '{}{:012d}'.format(platform_short_names[sat], tenths)
def calc_percent_missing(common_rdr):
total_received = sum(a.pkts_received for a in common_rdr.apids)
total_reserved = sum(a.pkts_reserved for a in common_rdr.apids)
return 100. * (total_reserved - total_received) / total_reserved
iet_epoch = Time('1958-01-01', scale='tai')
satellite_base_times = {'snpp': 1698019234000000}
platform_short_names = {'snpp': 'NPP'}
instrument_short_names = {'cris': 'CrIS', None: 'SPACECRAFT'}
default_origin = 'ssec'
default_domain = 'dev'
h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference)
h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference)