Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
crio.py 8.16 KiB
"""PDS construction record input and output"""
import ctypes as c
import numbers
from edosl0util.headers import BaseStruct, DaySegmentedTimecode
class ReadError(ValueError):
"""Unable to parse construction record due to improper format"""
def read(cr_file):
"""Parse a PDS construction record from a file (*00.PDS)"""
def main():
rv = {}
with open(cr_file, "rb") as f:
read_into_dict(f, Main1Struct, rv)
rv["scs_info"] = [read_struct(f, ScsStruct) for _ in range(rv["scs_count"])]
read_into_dict(f, Main2Struct, rv)
rv["apid_info"] = []
for _ in range(rv["apid_count"]):
d = {}
read_into_dict(f, Apid1Struct, d)
d["vcid_info"] = [
read_struct(f, ApidVcidStruct) for _ in range(d["vcid_count"])
]
read_into_dict(f, Apid2Struct, d)
d["gap_info"] = [
read_struct(f, ApidGapStruct) for _ in range(d["gap_count"])
]
read_into_dict(f, Apid3Struct, d)
d["fill_packet_info"] = [
read_struct(f, ApidFillStruct) for _ in range(d["fill_packets"])
]
read_into_dict(f, Apid4Struct, d)
d["mismatched_length_packet_ssc_list"] = [
read_struct(f, ApidMismatchedLengthStruct)["packet_ssc"]
for i in range(d["mismatched_length_packets"])
]
read_into_dict(f, Apid5Struct, d)
rv["apid_info"].append(d)
read_into_dict(f, Main3Struct, rv)
rv["file_info"] = []
for _ in range(rv["file_count"]):
d = {}
read_into_dict(f, FileStruct, d)
d["apid_info"] = [
read_struct(f, FileApidStruct) for _ in range(d["apid_count"])
]
if (
d["apid_count"] == 0
): # bogus all-zero apid struct is present for the CR file
read_struct(f, FileApidStruct)
rv["file_info"].append(d)
extra = f.read()
if extra:
raise ReadError("{} bytes remain after reading CR".format(len(extra)))
return rv
def read_into_dict(f, struct, data):
data.update(read_struct(f, struct))
def read_struct(f, struct):
buf = f.read(c.sizeof(struct))
if len(buf) < c.sizeof(struct):
raise ReadError("Unexpected EOF reading CR")
rv = struct_to_dict(struct.from_buffer_copy(buf))
rv = {
k: v for k, v in rv.items() if not k.startswith("spare_")
} # no spare fields
return {
k: int(v) if isinstance(v, numbers.Integral) else v for k, v in rv.items()
} # no longs
return main()
def write(cr, out_file):
"""Write out a PDS construction record file"""
def main():
with open(out_file, "wb") as f:
write_struct(cr, Main1Struct, f)
for d in cr["scs_info"]:
write_struct(d, ScsStruct, f)
write_struct(cr, Main2Struct, f)
for d in cr["apid_info"]:
write_struct(d, Apid1Struct, f)
for dd in d["vcid_info"]:
write_struct(dd, ApidVcidStruct, f)
write_struct(d, Apid2Struct, f)
for dd in d["gap_info"]:
write_struct(dd, ApidGapStruct, f)
write_struct(d, Apid3Struct, f)
for dd in d["fill_packet_info"]:
write_struct(dd, ApidFillStruct, f)
write_struct(d, Apid4Struct, f)
for ssc in d["mismatched_length_packet_ssc_list"]:
write_struct({"ssc": ssc}, ApidMismatchedLengthStruct, f)
write_struct(d, Apid5Struct, f)
write_struct(cr, Main3Struct, f)
for d in cr["file_info"]:
write_struct(d, FileStruct, f)
for dd in d["apid_info"]:
write_struct(dd, FileApidStruct, f)
if d["apid_count"] == 0:
write_struct(
{}, FileApidStruct, f
) # one all-zero apid struct if no others
def write_struct(data, struct, out):
fields = [f[0] for f in struct._fields_]
struct_data = {}
for k, v in data.items():
if k in fields:
if isinstance(v, str):
struct_data[k] = v.encode("utf-8")
else:
struct_data[k] = v
out.write(memoryview(struct(**struct_data)))
main()
def struct_to_dict(s):
return {f[0]: getattr(s, f[0]) for f in s._fields_}
class Main1Struct(BaseStruct):
_fields_ = [
("edos_sw_ver_major", c.c_uint8),
("edos_sw_ver_minor", c.c_uint8),
("cr_type", c.c_uint8),
("spare_1", c.c_uint8 * 1),
("pds_id", c.c_char * 36),
("test_flag", c.c_uint8),
("spare_2", c.c_uint8 * 9),
("scs_count", c.c_uint16),
]
class ScsStruct(BaseStruct):
_fields_ = [("start", DaySegmentedTimecode), ("stop", DaySegmentedTimecode)]
class Main2Struct(BaseStruct):
_fields_ = [
("fill_bytes", c.c_uint64),
("mismatched_length_packets", c.c_uint32),
("first_packet_time", DaySegmentedTimecode),
("last_packet_time", DaySegmentedTimecode),
("first_packet_esh_time", DaySegmentedTimecode),
("last_packet_esh_time", DaySegmentedTimecode),
("rs_corrected_packets", c.c_uint32),
("total_packets", c.c_uint32),
("total_bytes", c.c_uint64),
("gap_count", c.c_uint32),
("completion_time", DaySegmentedTimecode),
("spare_3", c.c_uint8 * 7),
("apid_count", c.c_uint8),
]
class Apid1Struct(BaseStruct):
_fields_ = [
("spare_1", c.c_uint8 * 1),
("scid", c.c_uint8),
("apid", c.c_uint16),
("first_packet_offset", c.c_uint64),
("spare_2", c.c_uint8 * 3),
("vcid_count", c.c_uint8),
]
class ApidVcidStruct(BaseStruct):
_fields_ = [
("spare_1", c.c_uint16),
("spare_2", c.c_uint16, 2),
("scid", c.c_uint16, 8),
("vcid", c.c_uint16, 6),
]
class Apid2Struct(BaseStruct):
_fields_ = [("gap_count", c.c_uint32)]
class ApidGapStruct(BaseStruct):
_fields_ = [
("first_missing_ssc", c.c_uint32),
("post_gap_packet_offset", c.c_uint64),
("missing_packet_count", c.c_uint32),
("pre_gap_packet_time", DaySegmentedTimecode),
("post_gap_packet_time", DaySegmentedTimecode),
("pre_gap_packet_esh_time", DaySegmentedTimecode),
("post_gap_packet_esh_time", DaySegmentedTimecode),
]
class Apid3Struct(BaseStruct):
_fields_ = [("fill_packets", c.c_uint32)]
class ApidFillStruct(BaseStruct):
_fields_ = [
("packet_ssc", c.c_uint32),
("packet_offset", c.c_uint64),
("first_fill_byte", c.c_uint32),
]
class Apid4Struct(BaseStruct):
_fields_ = [("fill_bytes", c.c_uint64), ("mismatched_length_packets", c.c_uint32)]
class ApidMismatchedLengthStruct(BaseStruct):
_fields_ = [("packet_ssc", c.c_uint32)]
class Apid5Struct(BaseStruct):
_fields_ = [
("first_packet_time", DaySegmentedTimecode),
("last_packet_time", DaySegmentedTimecode),
("first_packet_esh_time", DaySegmentedTimecode),
("last_packet_esh_time", DaySegmentedTimecode),
("rs_corrected_packets", c.c_uint32),
("total_packets", c.c_uint32),
("total_bytes", c.c_uint64),
("spare_3", c.c_uint64),
]
class Main3Struct(BaseStruct):
_fields_ = [("spare_4", c.c_uint8 * 3), ("file_count", c.c_uint8)]
class FileStruct(BaseStruct):
_fields_ = [
("file_name", c.c_char * 40),
("spare_1", c.c_uint8 * 3),
("apid_count", c.c_uint8),
]
class FileApidStruct(BaseStruct):
_fields_ = [
("spare_1", c.c_uint8 * 1),
("scid", c.c_uint8),
("apid", c.c_uint16),
("first_packet_time", DaySegmentedTimecode),
("last_packet_time", DaySegmentedTimecode),
("spare_2", c.c_uint8 * 4),
]