Skip to content
Snippets Groups Projects
Commit 96d6261d authored by Greg Quinn's avatar Greg Quinn
Browse files

Beginnings of rdrgen module

parent 68e06da3
No related branches found
No related tags found
No related merge requests found
import ctypes
from collections import OrderedDict
import attr
from astropy.time import Time, TimeDelta
import numpy as np
from edosl0util.jpssrdr import StaticHeader, Apid as ApidListItem, PacketTracker
def prototype():
pds_file = 'P1571289CRISSCIENCEA6T17230135400001.PDS'
satellite = 'snpp'
rdr_type = 'CRIS-SCIENCE-RDR'
granule_iet = 1881755831079000
class RdrBlobBuilder(object):
def __init__(self, rdr_type, sat, granule_iet):
self.sat = sat
def build_rdr_blob(pkt_stream, rdr_type, sat, granule_iet):
granule_iet_end = granule_iet + rdr_type.granularity
total_pkt_size = 0
apid_info = OrderedDict()
total_trackers = 0
all_pkts = []
for apid in rdr_type.apids:
apid_info[apid.num] = {
'name': apid.name,
'pkts_reserved': apid.max_expected, 'pkts_received': 0,
'first_tracker_index': total_trackers,
'pkt_info': [{} for _ in range(apid.max_expected)]}
total_trackers += apid.max_expected
for pkt in pkt_stream:
if pkt.apid not in apid_info:
continue
pkt_iet = timecode_to_iet(pkt.secondary_header.timecode)
if not granule_iet <= pkt_iet < granule_iet_end:
continue
info = apid_info[pkt.apid]
pkt_info = info['pkt_info'][info['pkts_received']]
pkt_info['obs_time'] = pkt_iet
pkt_info['seq_num'] = pkt.seqid
pkt_info['size'] = pkt.size
pkt_info['offset'] = total_pkt_size
info['pkts_received'] += 1
total_pkt_size += pkt.size
all_pkts.append(pkt)
apid_list_offset = ctypes.sizeof(StaticHeader)
pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem)
ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof(PacketTracker)
buf_size = ap_storage_offset + total_pkt_size
buf = np.zeros([buf_size], np.uint8) # zeros needed to null-pad strings
header = StaticHeader.from_buffer(buf)
header.satellite = sat # XXX: mapping?
header.sensor = rdr_type.sensor
header.type_id = rdr_type.type_id
header.num_apids = len(apid_info)
header.apid_list_offset = apid_list_offset
header.pkt_tracker_offset = pkt_tracker_offset
header.ap_storage_offset = ap_storage_offset
header.next_pkt_pos = total_pkt_size
header.start_boundary = granule_iet
header.end_boundary = granule_iet_end
for i, (apid, info) in enumerate(apid_info.items()):
offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem)
item = ApidListItem.from_buffer(buf, offset)
item.name = info['name']
item.value = apid
item.pkt_tracker_start_idx = info['first_tracker_index']
item.pkts_reserved = info['pkts_reserved']
item.pkts_received = info['pkts_received']
for j, pkt_info in enumerate(info['pkt_info']):
offset = (header.pkt_tracker_offset
+ (info['first_tracker_index'] + j) * ctypes.sizeof(PacketTracker))
tracker = PacketTracker.from_buffer(buf, offset)
if pkt_info:
tracker.obs_time = pkt_info['obs_time']
tracker.sequence_number = pkt_info['seq_num']
tracker.size = pkt_info['size']
tracker.offset = pkt_info['offset']
tracker.fill_percent = 0
else:
tracker.offset = -1
buf[ap_storage_offset:] = bytearray().join(pkt.bytes() for pkt in all_pkts)
return buf
def get_cris_science_apids():
return ([ApidSpec(1289, 'EIGHT_S_SCI', 5), ApidSpec(1290, 'ENG', 1)]
+ get_cris_obs_apids())
def get_cris_obs_apids():
view_types = ['N', 'S', 'C'] # "normal", "space", "calibration"
bands = ['LW', 'MW', 'SW']
num_fovs = 9
base_apid = 1315
apids = []
for i in range(len(view_types) * len(bands) * num_fovs):
apid = base_apid + i
view_type = view_types[i // (num_fovs * len(bands))]
band = bands[i // num_fovs % len(bands)]
fov = str(i % num_fovs + 1)
apid_name = view_type + band + fov
max_expected = get_max_expected_cris_packets(apid_name)
apids.append(ApidSpec(apid, apid_name, max_expected))
return apids
def get_max_expected_cris_packets(apid_name):
if apid_name == 'EIGHT_S_SCI':
return 5
elif apid_name == 'ENG':
return 1
else:
view_type = apid_name[0]
if view_type == 'N':
return 121
else:
return 9
@attr.s
class ApidSpec(object):
num = attr.ib()
name = attr.ib()
max_expected = attr.ib()
class CrisScienceRdrType(object):
short_name = 'CRIS-SCIENCE-RDR'
granularity = 31997000
apids = get_cris_science_apids()
sensor = 'CrIS'
type_id = 'SCIENCE'
def timecode_to_iet(timecode):
# FIXME: move to timecode.py
ccsds_epoch = iet_epoch = Time('1958-01-01', scale='tai')
day = Time(ccsds_epoch.jd + timecode.days, scale='utc', format='jd') - iet_epoch
return int(day.sec * 1e6 + timecode.milliseconds * 1e3 + timecode.microseconds)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment