Skip to content
Snippets Groups Projects
Commit 6872712e authored by Greg Quinn's avatar Greg Quinn
Browse files

Add rdrgen.write_rdr

parent 96d6261d
No related branches found
No related tags found
No related merge requests found
from pkg_resources import get_distribution
__version__ = get_distribution(__name__).version
\ No newline at end of file
......@@ -192,7 +192,7 @@ def _rdrs_for_packet_dataset(group):
if group:
for name, buf in _generate_packet_datasets(group):
try:
rdr = _decode_rdr_blob(buf)
rdr = decode_rdr_blob(buf)
except ValueError as e:
LOG.warn('{} ({})'.format(e.message, name))
continue
......@@ -200,7 +200,7 @@ def _rdrs_for_packet_dataset(group):
return rdrs
def _decode_rdr_blob(buf):
def decode_rdr_blob(buf):
if buf.shape[0] < c.sizeof(StaticHeader):
raise ValueError('Not enough bytes for static header')
header = StaticHeader.from_buffer(buf)
......
import ctypes
import itertools
from collections import OrderedDict
from datetime import datetime
import attr
from astropy.time import Time, TimeDelta
import h5py
import numpy as np
from astropy.time import Time, TimeDelta
from edosl0util.jpssrdr import StaticHeader, Apid as ApidListItem, PacketTracker
import edosl0util
from edosl0util.jpssrdr import (
StaticHeader, Apid as ApidListItem, PacketTracker, decode_rdr_blob)
def prototype():
pds_file = 'P1571289CRISSCIENCEA6T17230135400001.PDS'
satellite = 'snpp'
rdr_type = 'CRIS-SCIENCE-RDR'
granule_iet = 1881755831079000
def write_rdr(sat, blob,
distributor=None, origin=None, domain=None, orbit_number=0,
creation_time=None, gran_creation_time=None, software_ver=None):
# TODO: write out the user block too??
distributor = distributor or origin or default_origin
origin = origin or default_origin
domain = domain or default_domain
creation_time = creation_time or datetime.now()
gran_creation_time = gran_creation_time or creation_time
software_ver = (software_ver
or edosl0util.__name__ + '-' + edosl0util.__version__)
blob_info = decode_rdr_blob(blob)
rdr_type = SpacecraftDiaryRdrType() # TODO: select via APID
gran_iet = blob_info.header.start_boundary
gran_end_iet = blob_info.header.end_boundary
gran_time = iet_to_datetime(gran_iet)
gran_end_time = iet_to_datetime(gran_end_iet)
gran_id = make_granule_id(sat, gran_iet)
gran_ver = 'A1'
format_date = lambda t: t.strftime('%Y%m%d')
format_time = lambda t: t.strftime('%H%M%S.%fZ')
with h5py.File('rdr.h5', 'w') as h5_file:
set_h5_attrs(h5_file, {
'Distributor': distributor,
'Mission_Name': 'S-NPP/JPSS',
'N_Dataset_Source': origin,
'N_HDF_Creation_Date': format_date(creation_time),
'N_HDF_Creation_Time': format_time(creation_time),
'Platform_Short_Name': platform_short_names[sat]})
all_grp = h5_file.create_group('All_Data')
raw_grp = all_grp.create_group(rdr_type.short_name + '_All')
raw_ds = raw_grp.create_dataset(
'RawApplicationPackets_0', data=blob, maxshape=[None])
prod_grp = h5_file.create_group('Data_Products')
gran_grp = prod_grp.create_group(rdr_type.short_name)
set_h5_attrs(gran_grp, {
'Instrument_Short_Name': instrument_short_names[rdr_type.sensor],
'N_Collection_Short_Name': rdr_type.short_name,
'N_Dataset_Type_Tag': 'RDR',
'N_Processing_Domain': domain})
gran_ds = gran_grp.create_dataset(
rdr_type.short_name + '_Gran_0', [1], h5_regionref_dtype, maxshape=[None])
gran_ds[0] = raw_ds.regionref[:]
set_h5_attrs(gran_ds, {
'Beginning_Date': format_date(gran_time),
'Beginning_Time': format_time(gran_time),
'Ending_Date': format_date(gran_end_time),
'Ending_Time': format_time(gran_end_time),
'N_Beginning_Orbit_Number': np.uint64(orbit_number), # FIXME: detect?
'N_Beginning_Time_IET': np.uint64(gran_iet),
'N_Creation_Date': format_date(gran_creation_time),
'N_Creation_Time': format_time(gran_creation_time),
'N_Ending_Time_IET': np.uint64(gran_end_iet),
'N_Granule_ID': gran_id,
'N_Granule_Status': 'N/A',
'N_Granule_Version': gran_ver,
'N_IDPS_Mode': domain,
'N_JPSS_Document_Ref': rdr_type.document,
'N_LEOA_Flag': 'Off',
'N_Packet_Type': [a.name for a in blob_info.apids],
'N_Packet_Type_Count': [np.uint64(a.pkts_received) for a in blob_info.apids],
'N_Percent_Missing_Data': np.float32(calc_percent_missing(blob_info)),
'N_Primary_Label': 'Primary', # TODO: find out what this is
'N_Reference_ID': ':'.join([rdr_type.short_name, gran_id, gran_ver]),
'N_Software_Version': software_ver})
aggr_ds = gran_grp.create_dataset(
rdr_type.short_name + '_Aggr', [1], h5_ref_dtype, maxshape=[None])
aggr_ds[0] = raw_grp.ref
set_h5_attrs(aggr_ds, {
'AggregateBeginningDate': format_date(gran_time),
'AggregateBeginningGranuleID': gran_id,
'AggregateBeginningOrbitNumber': np.uint64(orbit_number),
'AggregateBeginningTime': format_time(gran_time),
'AggregateEndingDate': format_date(gran_end_time),
'AggregateEndingGranuleID': gran_id,
'AggregateEndingOrbitNumber': np.uint64(orbit_number),
'AggregateEndingTime': format_time(gran_end_time),
'AggregateNumberGranules': np.uint64(1)})
class RdrBlobBuilder(object):
def __init__(self, rdr_type, sat, granule_iet):
self.sat = sat
def set_h5_attrs(h5_obj, attrs):
# for some reason all the H5 attributes are in 2-D arrays in IDPS files
for name, value in attrs.items():
if isinstance(value, list):
value = [[x] for x in value]
else:
value = [[value]]
h5_obj.attrs[name] = value
def build_rdr_blob(pkt_stream, rdr_type, sat, granule_iet):
granule_iet_end = granule_iet + rdr_type.granularity
def build_rdr_blob(sat, pkt_stream):
first_pkt = next(pkt_stream) # FIXME: what if there are no packets?
rdr_type = SpacecraftDiaryRdrType() # TODO: select based on first packet APID
granule_iet = calc_iet_granule(satellite_base_times[sat], rdr_type.gran_len,
timecode_to_iet(first_pkt.secondary_header.timecode))
granule_iet_end = granule_iet + rdr_type.gran_len
total_pkt_size = 0
apid_info = OrderedDict()
......@@ -35,12 +122,13 @@ def build_rdr_blob(pkt_stream, rdr_type, sat, granule_iet):
'pkt_info': [{} for _ in range(apid.max_expected)]}
total_trackers += apid.max_expected
for pkt in pkt_stream:
for pkt in itertools.chain([first_pkt], pkt_stream):
if pkt.apid not in apid_info:
continue
raise ValueError(
'APID {} not expected for {}'.format(pkt.apid, rdr_type.short_name))
pkt_iet = timecode_to_iet(pkt.secondary_header.timecode)
if not granule_iet <= pkt_iet < granule_iet_end:
continue
raise ValueError('packet stream crosses granule boundary')
info = apid_info[pkt.apid]
pkt_info = info['pkt_info'][info['pkts_received']]
pkt_info['obs_time'] = pkt_iet
......@@ -58,8 +146,8 @@ def build_rdr_blob(pkt_stream, rdr_type, sat, granule_iet):
buf = np.zeros([buf_size], np.uint8) # zeros needed to null-pad strings
header = StaticHeader.from_buffer(buf)
header.satellite = sat # XXX: mapping?
header.sensor = rdr_type.sensor
header.satellite = platform_short_names[sat]
header.sensor = instrument_short_names[rdr_type.sensor]
header.type_id = rdr_type.type_id
header.num_apids = len(apid_info)
header.apid_list_offset = apid_list_offset
......@@ -140,14 +228,67 @@ class ApidSpec(object):
class CrisScienceRdrType(object):
short_name = 'CRIS-SCIENCE-RDR'
granularity = 31997000
apids = get_cris_science_apids()
sensor = 'CrIS'
gran_len = 31997000
sensor = 'cris'
type_id = 'SCIENCE'
document = '474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf'
apids = get_cris_science_apids()
class SpacecraftDiaryRdrType(object):
short_name = 'SPACECRAFT-DIARY-RDR'
gran_len = 20000000
sensor = None
type_id = 'DIARY'
document = '474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200B.pdf'
apids = [ApidSpec(0, 'CRITICAL', max_expected=21),
ApidSpec(8, 'ADCS_HKH', max_expected=21),
ApidSpec(11, 'DIARY', max_expected=21)]
def timecode_to_iet(timecode):
# FIXME: move to timecode.py
ccsds_epoch = iet_epoch = Time('1958-01-01', scale='tai')
ccsds_epoch = Time('1958-01-01', scale='tai')
day = Time(ccsds_epoch.jd + timecode.days, scale='utc', format='jd') - iet_epoch
return int(day.sec * 1e6 + timecode.milliseconds * 1e3 + timecode.microseconds)
def iet_to_datetime(iet):
return (iet_epoch + TimeDelta(iet * 1e-6, format='sec')).utc.datetime
def calc_iet_granule(base_time, gran_len, iet):
return (iet - base_time) // gran_len * gran_len + base_time
def calc_iet_aggregate(base_time, gran_len, grans_per_aggr, iet):
# see "DDS Aggregation Methodology" in CDFCB vol I
Y = gran_len
N = grans_per_aggr
G_s = calc_iet_granule(base_time, Y, iet)
A_n = G_s // (N * Y)
O = G_s % Y
A_s = A_n * (Y * N) + O
return A_s
def make_granule_id(sat, gran_iet):
tenths = (gran_iet - satellite_base_times[sat]) // 100000
return '{}{:012d}'.format(platform_short_names[sat], tenths)
def calc_percent_missing(common_rdr):
total_received = sum(a.pkts_received for a in common_rdr.apids)
total_reserved = sum(a.pkts_reserved for a in common_rdr.apids)
return 100. * (total_reserved - total_received) / total_reserved
iet_epoch = Time('1958-01-01', scale='tai')
satellite_base_times = {'snpp': 1698019234000000}
platform_short_names = {'snpp': 'NPP'}
instrument_short_names = {'cris': 'CrIS', None: 'SPACECRAFT'}
default_origin = 'ssec'
default_domain = 'dev'
h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference)
h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment