Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
crgen.py 10.64 KiB
"""EDOS PDS construction record generation for SUOMI NPP"""
from datetime import datetime
import copy
import difflib
import itertools
import logging
import os
import pprint
import edosl0util.crio as crio
from edosl0util.headers import DaySegmentedTimecode
from edosl0util.stream import jpss_packet_stream
from edosl0util.timecode import dt_to_cds
def diff_crs(real_file, generated_file):
"""Print full diff output from a real EDOS CR to one generated by this module
Only fields which can be generated by this module via a packet scan are compared.
Nothing is output if the CRs are the same.
"""
def main():
real = crio.read(real_file)
generated = crio.read(generated_file)
make_comparable(real, generated)
if real != generated:
diff = difflib.ndiff(
pprint.pformat(real).splitlines(),
pprint.pformat(generated).splitlines(),
)
for line in diff:
print(line)
def make_comparable(real, generated):
insert_fake_cr_info(real)
del generated[
"completion_time"
] # it seems CR completion time does not match PDS
del real["completion_time"] # creation time from the file name
main()
def build_cr(pds_file, prev_pds_file=None):
"""Best-effort CR generation by scanning a PDS data file
Previous PDS data file may also be given to make gap detection more complete.
"""
def main():
scan = scan_packets(pds_file, prev_pds_file)
rv = {}
rv["pds_id"] = pds_id_from_path(pds_file)
rv["completion_time"] = get_pds_creation_time(pds_file)
rv["first_packet_time"] = scan["first_packet_time"]
rv["last_packet_time"] = scan["last_packet_time"]
rv["apid_info"] = build_apid_info(scan["apid_info"])
rv["apid_count"] = len(rv["apid_info"])
rv["file_info"] = file_info(rv["pds_id"], rv["apid_info"])
rv["file_count"] = len(rv["file_info"])
rv.update(aggregated_values(rv["apid_info"]))
insert_fake_cr_info(rv)
return rv
def file_info(pds_id, apid_info):
cr_entry = {"file_name": pds_id + ".PDS", "apid_count": 0, "apid_info": []}
data_entry = {
"file_name": os.path.basename(pds_file),
"apid_count": len(apid_info),
"apid_info": [
{
"scid": npp_scid,
"apid": entry["apid"],
"first_packet_time": entry["first_packet_time"],
"last_packet_time": entry["last_packet_time"],
}
for entry in apid_info
],
}
return [cr_entry, data_entry]
def aggregated_values(apid_info):
keys = [
"total_packets",
"total_bytes",
"gap_count",
"fill_bytes",
"mismatched_length_packets",
"rs_corrected_packets",
]
return {key: sum(entry[key] for entry in apid_info) for key in keys}
return main()
def insert_fake_cr_info(cr):
"""Populate a CR with phony values for fields that can't be discovered via a packet scan"""
cr.update(
{
"edos_sw_ver_major": 0,
"edos_sw_ver_minor": 0,
"cr_type": 1,
"test_flag": 0,
"scs_count": 1,
"scs_info": [{"start": missing_time_value, "stop": missing_time_value}],
"first_packet_esh_time": missing_time_value,
"last_packet_esh_time": missing_time_value,
"fill_bytes": 0,
"mismatched_length_packets": 0,
"rs_corrected_packets": 0,
}
)
insert_fake_apid_info(cr["apid_info"])
def insert_fake_apid_info(apid_info):
"""Fill CR apid_info with phony values for fields that can't be found via a packet scan"""
for entry in apid_info:
entry.update(
{
"fill_packets": 0,
"fill_packet_info": [],
"fill_bytes": 0,
"mismatched_length_packets": 0,
"mismatched_length_packet_ssc_list": [],
"first_packet_esh_time": missing_time_value,
"last_packet_esh_time": missing_time_value,
"rs_corrected_packets": 0,
}
)
for gap in entry["gap_info"]:
gap.update(
{
"pre_gap_packet_esh_time": missing_time_value,
"post_gap_packet_esh_time": missing_time_value,
}
)
def pds_id_from_path(pds_file):
"""Pull 36-char PDS ID from a file name; that's the CR file name minus the .PDS"""
file_name = os.path.basename(pds_file)
pds_file_name_length = 40
if len(file_name) != pds_file_name_length:
raise ValueError(
"PDS file name {} not of expected length {}".format(
file_name, pds_file_name_length
)
)
return file_name[:34] + "00"
def get_pds_creation_time(pds_file_or_id):
"""Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode"""
pds_file_or_id = os.path.basename(pds_file_or_id)
return create_timecode(
dt_to_cds(datetime.strptime(pds_file_or_id[22:33], "%y%j%H%M%S"))
)
def build_apid_info(scan_apid_info):
"""Build up apid_info resulting from scan_packets into a full apid_info for a CR"""
apid_info = copy.deepcopy(scan_apid_info)
for entry in apid_info:
entry["scid"] = npp_scid
entry["vcid_count"] = 1
entry["vcid_info"] = [
{"scid": npp_scid, "vcid": npp_apid_to_vcid_map[entry["apid"]]}
]
entry["gap_count"] = len(entry["gap_info"])
del entry[
"last_packet_ssc"
] # field was needed for bookkeeping but isn't in the CR
insert_fake_apid_info(apid_info)
return apid_info
missing_time_value = DaySegmentedTimecode(0, 0, 0)
npp_scid = 157
# ripped from MDFCB, 2014-06-05 revision
# modified to place CrIS FOV 6 in VCID 7 after testing against some real data
npp_vcid_to_apids_map = {
0: (
list(range(0, 15))
+ list(range(16, 50))
+ [65, 70, 100, 146, 155, 512, 513, 518, 543, 544, 545, 550, 768, 769, 773]
+ list(range(1280, 1289))
),
1: [50, 101, 515, 528, 530, 531],
2: [102],
3: [103, 514, 536],
4: [104],
5: [105],
6: [106, 1289, 1290]
+ list(
set(range(1315, 1396)) - set(range(1318, 1391, 9)) - set(range(1320, 1393, 9))
),
7: [107] + list(range(1318, 1391, 9)) + list(range(1320, 1393, 9)),
8: [108, 1294, 1295, 1296, 1398],
9: [109],
10: [110],
11: [111, 560, 561, 564, 565, 576],
12: [112, 562, 563, 566],
13: [113, 546, 577, 578, 579, 580, 581, 582],
14: [114],
15: [115],
16: [116] + list(range(800, 806)) + list(range(807, 824)) + [825, 826],
17: [117, 806],
18: [118, 770] + list(range(830, 854)) + [855, 856],
19: [119],
20: [120],
21: [121, 517, 524, 549, 556, 780, 1291, 1292, 1293, 1397],
22: [122],
24: [147, 148, 149, 150],
}
npp_apid_to_vcid_map = {
apid: vcid for vcid, apids in npp_vcid_to_apids_map.items() for apid in apids
}
def scan_packets(pds_file, prev_pds_file=None):
"""Scan a PDS data file for information needed to produce a construction record"""
def main():
prev_apid_map = build_prev_apid_map(prev_pds_file)
apid_map = {}
logger.info("scanning {}".format(pds_file))
stream = jpss_packet_stream(open(pds_file, "rb"))
first_pkt_time = None
last_pkt_time = None
for pkt in stream:
entry = apid_map.get(pkt.apid)
if not entry:
entry_from_prev_pds = prev_apid_map.get(pkt.apid)
apid_map[pkt.apid] = init_entry(pkt, entry_from_prev_pds)
else:
update_entry(entry, pkt)
if pkt.cds_timecode:
if not first_pkt_time:
first_pkt_time = pkt.cds_timecode
last_pkt_time = pkt.cds_timecode
return {
"first_packet_time": create_timecode(first_pkt_time),
"last_packet_time": create_timecode(last_pkt_time),
"apid_info": [apid_map[k] for k in sorted(apid_map)],
}
def build_prev_apid_map(prev_pds_file):
if prev_pds_file:
return {
entry["apid"]: entry
for entry in scan_packets(prev_pds_file)["apid_info"]
}
else:
return {}
def init_entry(pkt, entry_from_prev_pds):
rv = {
"apid": pkt.apid,
"first_packet_time": create_timecode(pkt.cds_timecode),
"first_packet_offset": pkt.offset,
"last_packet_time": create_timecode(pkt.cds_timecode),
"last_packet_ssc": pkt.seqid,
"total_packets": 1,
"total_bytes": pkt.size,
"gap_info": [],
}
if entry_from_prev_pds:
update_gap_info(
rv["gap_info"],
entry_from_prev_pds["last_packet_ssc"],
entry_from_prev_pds["last_packet_time"],
pkt,
)
return rv
def update_entry(entry, new_pkt):
prev_last_ssc = entry["last_packet_ssc"]
prev_last_time = entry["last_packet_time"]
if new_pkt.cds_timecode:
if entry["first_packet_time"] == DaySegmentedTimecode():
entry["first_packet_time"] = create_timecode(new_pkt.cds_timecode)
entry["last_packet_time"] = create_timecode(new_pkt.cds_timecode)
entry["last_packet_ssc"] = new_pkt.seqid
entry["total_packets"] += 1
entry["total_bytes"] += new_pkt.size
update_gap_info(entry["gap_info"], prev_last_ssc, prev_last_time, new_pkt)
def update_gap_info(gap_info, last_ssc, last_pkt_time, new_pkt):
ssc_limit = 16384 # one more than highest SSC
expected_new_ssc = (last_ssc + 1) % ssc_limit
if new_pkt.seqid != expected_new_ssc:
gap_entry = {
"first_missing_ssc": expected_new_ssc,
"missing_packet_count": (new_pkt.seqid - expected_new_ssc) % ssc_limit,
"pre_gap_packet_time": last_pkt_time,
"post_gap_packet_time": create_timecode(new_pkt.cds_timecode),
"post_gap_packet_offset": new_pkt.offset,
}
gap_info.append(gap_entry)
return main()
def create_timecode(tc):
"""Convert a packet cds_timecode to DaySegmentedTimecode
Handles input of None by returning epoch value of 1958-01-01.
"""
return DaySegmentedTimecode(*tc) if tc else DaySegmentedTimecode()
idps_epoch = datetime(1958, 1, 1)
logger = logging.getLogger(__name__)