Skip to content
Snippets Groups Projects
Commit b03b94a2 authored by Greg Quinn's avatar Greg Quinn
Browse files

Add crgen CLI

parent 6ed2ab75
No related branches found
No related tags found
No related merge requests found
"""Generate a PDS construction record from a PDS data file"""
import logging
from edosl0util import crio
from edosl0util.cli import util
from edosl0util.crgen import build_cr
def main():
parser = util.default_parser(description=__doc__)
parser.add_argument('input_file')
parser.add_argument('-o', '--output-file', help='generated from input file name by default')
parser.add_argument('-p', '--prev_pds_file',
help='previous PDS data file, used for detecting cross-file packet gaps')
args = parser.parse_args()
util.configure_logging(args)
crgen(args.input_file, args.output_file, args.prev_pds_file)
def crgen(input_file, output_file=None, prev_pds_file=None):
cr = build_cr(input_file, prev_pds_file)
if output_file is None:
output_file = cr['pds_id'] + '.PDS'
logger.info('writing {}'.format(output_file))
crio.write(cr, output_file)
logger = logging.getLogger(__name__)
if __name__ == '__main__':
main()
......@@ -7,8 +7,8 @@ def timestamp(v):
return datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
def default_parser():
parser = argparse.ArgumentParser()
def default_parser(**kwargs):
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument('-v', '--verbose', action='store_true')
return parser
......
......@@ -3,7 +3,9 @@
from datetime import datetime
import itertools
import logging
import os
import edosl0util.crio as crio
from edosl0util.headers import DaySegmentedTimecode
from edosl0util.stream import jpss_packet_stream
......@@ -11,7 +13,7 @@ from edosl0util.stream import jpss_packet_stream
def test_build_apid_info():
# FIXME: build CR comparison into the CLI
calculated = build_cr('P1571289CRISSCIENCEAAT15320210920101.PDS')
ingested = read_pds_cr('P1571289CRISSCIENCEAAT15320210920100.PDS')
ingested = crio.read('P1571289CRISSCIENCEAAT15320210920100.PDS')
insert_fake_cr_info(ingested)
del calculated['completion_time'] # it seems CR completion time does not match PDS
del ingested['completion_time'] # creation time from the file name
......@@ -60,14 +62,28 @@ def build_cr(pds_file, prev_pds_file=None):
def insert_fake_cr_info(cr):
"""Populate a CR with phony values for fields that can't be discovered via a packet scan"""
cr.update({'edos_sw_ver_major': 0, 'edos_sw_ver_minor': 0, 'cr_type': 1, 'test_flag': 0,
'scs_count': 1, 'scs_info': {'start': missing_time_value,
'stop': missing_time_value},
'scs_count': 1, 'scs_info': [{'start': missing_time_value,
'stop': missing_time_value}],
'first_packet_esh_time': missing_time_value,
'last_packet_esh_time': missing_time_value,
'fill_bytes': 0, 'mismatched_length_packets': 0, 'rs_corrected_packets': 0})
insert_fake_apid_info(cr['apid_info'])
def insert_fake_apid_info(apid_info):
"""Fill CR apid_info with phony values for fields that can't be found via a packet scan"""
for entry in apid_info:
entry.update({
'fill_packets': 0, 'fill_packet_info': [], 'fill_bytes': 0,
'mismatched_length_packets': 0, 'mismatched_length_packet_ssc_list': [],
'first_packet_esh_time': missing_time_value,
'last_packet_esh_time': missing_time_value,
'rs_corrected_packets': 0})
for gap in entry['gap_info']:
gap.update({'pre_gap_packet_esh_time': missing_time_value,
'post_gap_packet_esh_time': missing_time_value})
def pds_id_from_path(pds_file):
"""Pull 36-char PDS ID from a file name; that's the CR file name minus the .PDS"""
file_name = os.path.basename(pds_file)
......@@ -80,12 +96,14 @@ def pds_id_from_path(pds_file):
def get_pds_creation_time(pds_file_or_id):
"""Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode"""
pds_file_or_id = os.path.basename(pds_file_or_id)
return datetime_to_ccsds(datetime.strptime(pds_file_or_id[22:33], '%y%j%H%M%S'))
def build_apid_info(scan_apid_info):
"""Build up apid_info resulting from scan_packets into a full apid_info for a CR"""
for entry in scan_apid_info:
apid_info = deepcopy(scan_apid_info)
for entry in apid_info:
entry['scid'] = npp_scid
entry['vcid_count'] = 1
entry['vcid_info'] = [{'scid': npp_scid, 'vcid': npp_apid_to_vcid_map[entry['apid']]}]
......@@ -94,24 +112,9 @@ def build_apid_info(scan_apid_info):
insert_fake_apid_info(apid_info)
return apid_info
def insert_fake_apid_info(apid_info):
"""Fill CR apid_info with phony values for fields that can't be found via a packet scan"""
for entry in apid_info:
entry.update({
'fill_packets': 0, 'fill_packet_info': [], 'fill_bytes': 0,
'mismatched_length_packets': 0, 'mismatched_length_packet_ssc_list': [],
'first_packet_esh_time': missing_time_value,
'last_packet_esh_time': missing_time_value,
'rs_corrected_packets': 0})
for gap in entry['gap_info']:
gap.update({'pre_gap_packet_esh_time': missing_time_value,
'post_gap_packet_esh_time': missing_time_value})
missing_time_value = DaySegmentedTimecode(0, 0, 0)
npp_scid = 157 # FIXME: i guess this should come from the file name
npp_scid = 157
# ripped from MDFCB, 2014-06-05 revision
# modified to place CrIS FOV 6 in VCID 7 after testing against some real data
......@@ -154,6 +157,7 @@ def scan_packets(pds_file, prev_pds_file=None):
def main():
prev_apid_map = build_prev_apid_map(prev_pds_file)
apid_map = {}
logger.info('scanning {}'.format(pds_file))
stream = jpss_packet_stream(open(pds_file, 'rb'))
first_pkt = stream.next()
for pkt in itertools.chain([first_pkt], stream):
......@@ -219,3 +223,6 @@ def datetime_to_ccsds(dt):
return DaySegmentedTimecode(days, micros // 1000, micros % 1000)
else:
return DaySegmentedTimecode()
logger = logging.getLogger(__name__)
......@@ -2,7 +2,6 @@
"""PDS construction record input and output"""
import ctypes as c
import warnings
from edosl0util.headers import BaseStruct, DaySegmentedTimecode
......@@ -13,18 +12,18 @@ def read(cr_file):
rv = {}
with open(cr_file, 'rb') as f:
read_into_dict(f, Main1Struct, rv)
rv['scs_info'] = [read_struct(f, ScsStruct) for i in range(rv['scs_count'])]
rv['scs_info'] = [read_struct(f, ScsStruct) for _ in range(rv['scs_count'])]
read_into_dict(f, Main2Struct, rv)
rv['apid_info'] = []
for i in range(rv['apid_count']):
for _ in range(rv['apid_count']):
d = {}
read_into_dict(f, Apid1Struct, d)
d['vcid_info'] = [read_struct(f, ApidVcidStruct) for j in range(d['vcid_count'])]
d['vcid_info'] = [read_struct(f, ApidVcidStruct) for _ in range(d['vcid_count'])]
read_into_dict(f, Apid2Struct, d)
d['gap_info'] = [read_struct(f, ApidGapStruct) for j in range(d['gap_count'])]
d['gap_info'] = [read_struct(f, ApidGapStruct) for _ in range(d['gap_count'])]
read_into_dict(f, Apid3Struct, d)
d['fill_packet_info'] = [
read_struct(f, ApidFillStruct) for j in range(d['fill_packets'])]
read_struct(f, ApidFillStruct) for _ in range(d['fill_packets'])]
read_into_dict(f, Apid4Struct, d)
d['mismatched_length_packet_ssc_list'] = [
read_struct(f, ApidMismatchedLengthStruct)['packet_ssc']
......@@ -33,15 +32,16 @@ def read(cr_file):
rv['apid_info'].append(d)
read_into_dict(f, Main3Struct, rv)
rv['file_info'] = []
for i in range(rv['file_count']):
for _ in range(rv['file_count']):
d = {}
read_into_dict(f, FileStruct, d)
d['apid_info'] = [read_struct(f, FileApidStruct) for j in range(d['apid_count'])]
d['apid_info'] = [read_struct(f, FileApidStruct) for _ in range(d['apid_count'])]
if d['apid_count'] == 0: # bogus all-zero apid struct is present for the CR file
read_struct(f, FileApidStruct)
rv['file_info'].append(d)
if f.read():
warnings.warn('{} bytes remain after reading CR'.format(len(extra)))
extra = f.read()
if extra:
raise ValueError('{} bytes remain after reading CR'.format(len(extra)))
return rv
def read_into_dict(f, struct, data):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment