Skip to content
Snippets Groups Projects
crio.py 8.16 KiB
"""PDS construction record input and output"""

import ctypes as c
import numbers

from edosl0util.headers import BaseStruct, DaySegmentedTimecode


class ReadError(ValueError):
    """Unable to parse construction record due to improper format"""


def read(cr_file):
    """Parse a PDS construction record from a file (*00.PDS)"""

    def main():
        rv = {}
        with open(cr_file, "rb") as f:
            read_into_dict(f, Main1Struct, rv)
            rv["scs_info"] = [read_struct(f, ScsStruct) for _ in range(rv["scs_count"])]
            read_into_dict(f, Main2Struct, rv)
            rv["apid_info"] = []
            for _ in range(rv["apid_count"]):
                d = {}
                read_into_dict(f, Apid1Struct, d)
                d["vcid_info"] = [
                    read_struct(f, ApidVcidStruct) for _ in range(d["vcid_count"])
                ]
                read_into_dict(f, Apid2Struct, d)
                d["gap_info"] = [
                    read_struct(f, ApidGapStruct) for _ in range(d["gap_count"])
                ]
                read_into_dict(f, Apid3Struct, d)
                d["fill_packet_info"] = [
                    read_struct(f, ApidFillStruct) for _ in range(d["fill_packets"])
                ]
                read_into_dict(f, Apid4Struct, d)
                d["mismatched_length_packet_ssc_list"] = [
                    read_struct(f, ApidMismatchedLengthStruct)["packet_ssc"]
                    for i in range(d["mismatched_length_packets"])
                ]
                read_into_dict(f, Apid5Struct, d)
                rv["apid_info"].append(d)
            read_into_dict(f, Main3Struct, rv)
            rv["file_info"] = []
            for _ in range(rv["file_count"]):
                d = {}
                read_into_dict(f, FileStruct, d)
                d["apid_info"] = [
                    read_struct(f, FileApidStruct) for _ in range(d["apid_count"])
                ]
                if (
                    d["apid_count"] == 0
                ):  # bogus all-zero apid struct is present for the CR file
                    read_struct(f, FileApidStruct)
                rv["file_info"].append(d)
            extra = f.read()
            if extra:
                raise ReadError("{} bytes remain after reading CR".format(len(extra)))
            return rv

    def read_into_dict(f, struct, data):
        data.update(read_struct(f, struct))

    def read_struct(f, struct):
        buf = f.read(c.sizeof(struct))
        if len(buf) < c.sizeof(struct):
            raise ReadError("Unexpected EOF reading CR")
        rv = struct_to_dict(struct.from_buffer_copy(buf))
        rv = {
            k: v for k, v in rv.items() if not k.startswith("spare_")
        }  # no spare fields
        return {
            k: int(v) if isinstance(v, numbers.Integral) else v for k, v in rv.items()
        }  # no longs

    return main()


def write(cr, out_file):
    """Write out a PDS construction record file"""

    def main():
        with open(out_file, "wb") as f:
            write_struct(cr, Main1Struct, f)
            for d in cr["scs_info"]:
                write_struct(d, ScsStruct, f)
            write_struct(cr, Main2Struct, f)
            for d in cr["apid_info"]:
                write_struct(d, Apid1Struct, f)
                for dd in d["vcid_info"]:
                    write_struct(dd, ApidVcidStruct, f)
                write_struct(d, Apid2Struct, f)
                for dd in d["gap_info"]:
                    write_struct(dd, ApidGapStruct, f)
                write_struct(d, Apid3Struct, f)
                for dd in d["fill_packet_info"]:
                    write_struct(dd, ApidFillStruct, f)
                write_struct(d, Apid4Struct, f)
                for ssc in d["mismatched_length_packet_ssc_list"]:
                    write_struct({"ssc": ssc}, ApidMismatchedLengthStruct, f)
                write_struct(d, Apid5Struct, f)
            write_struct(cr, Main3Struct, f)
            for d in cr["file_info"]:
                write_struct(d, FileStruct, f)
                for dd in d["apid_info"]:
                    write_struct(dd, FileApidStruct, f)
                if d["apid_count"] == 0:
                    write_struct(
                        {}, FileApidStruct, f
                    )  # one all-zero apid struct if no others

    def write_struct(data, struct, out):
        fields = [f[0] for f in struct._fields_]
        struct_data = {}
        for k, v in data.items():
            if k in fields:
                if isinstance(v, str):
                    struct_data[k] = v.encode("utf-8")
                else:
                    struct_data[k] = v
        out.write(memoryview(struct(**struct_data)))

    main()


def struct_to_dict(s):
    return {f[0]: getattr(s, f[0]) for f in s._fields_}


class Main1Struct(BaseStruct):
    _fields_ = [
        ("edos_sw_ver_major", c.c_uint8),
        ("edos_sw_ver_minor", c.c_uint8),
        ("cr_type", c.c_uint8),
        ("spare_1", c.c_uint8 * 1),
        ("pds_id", c.c_char * 36),
        ("test_flag", c.c_uint8),
        ("spare_2", c.c_uint8 * 9),
        ("scs_count", c.c_uint16),
    ]


class ScsStruct(BaseStruct):
    _fields_ = [("start", DaySegmentedTimecode), ("stop", DaySegmentedTimecode)]


class Main2Struct(BaseStruct):
    _fields_ = [
        ("fill_bytes", c.c_uint64),
        ("mismatched_length_packets", c.c_uint32),
        ("first_packet_time", DaySegmentedTimecode),
        ("last_packet_time", DaySegmentedTimecode),
        ("first_packet_esh_time", DaySegmentedTimecode),
        ("last_packet_esh_time", DaySegmentedTimecode),
        ("rs_corrected_packets", c.c_uint32),
        ("total_packets", c.c_uint32),
        ("total_bytes", c.c_uint64),
        ("gap_count", c.c_uint32),
        ("completion_time", DaySegmentedTimecode),
        ("spare_3", c.c_uint8 * 7),
        ("apid_count", c.c_uint8),
    ]


class Apid1Struct(BaseStruct):
    _fields_ = [
        ("spare_1", c.c_uint8 * 1),
        ("scid", c.c_uint8),
        ("apid", c.c_uint16),
        ("first_packet_offset", c.c_uint64),
        ("spare_2", c.c_uint8 * 3),
        ("vcid_count", c.c_uint8),
    ]


class ApidVcidStruct(BaseStruct):
    _fields_ = [
        ("spare_1", c.c_uint16),
        ("spare_2", c.c_uint16, 2),
        ("scid", c.c_uint16, 8),
        ("vcid", c.c_uint16, 6),
    ]


class Apid2Struct(BaseStruct):
    _fields_ = [("gap_count", c.c_uint32)]


class ApidGapStruct(BaseStruct):
    _fields_ = [
        ("first_missing_ssc", c.c_uint32),
        ("post_gap_packet_offset", c.c_uint64),
        ("missing_packet_count", c.c_uint32),
        ("pre_gap_packet_time", DaySegmentedTimecode),
        ("post_gap_packet_time", DaySegmentedTimecode),
        ("pre_gap_packet_esh_time", DaySegmentedTimecode),
        ("post_gap_packet_esh_time", DaySegmentedTimecode),
    ]


class Apid3Struct(BaseStruct):
    _fields_ = [("fill_packets", c.c_uint32)]


class ApidFillStruct(BaseStruct):
    _fields_ = [
        ("packet_ssc", c.c_uint32),
        ("packet_offset", c.c_uint64),
        ("first_fill_byte", c.c_uint32),
    ]


class Apid4Struct(BaseStruct):
    _fields_ = [("fill_bytes", c.c_uint64), ("mismatched_length_packets", c.c_uint32)]


class ApidMismatchedLengthStruct(BaseStruct):
    _fields_ = [("packet_ssc", c.c_uint32)]


class Apid5Struct(BaseStruct):
    _fields_ = [
        ("first_packet_time", DaySegmentedTimecode),
        ("last_packet_time", DaySegmentedTimecode),
        ("first_packet_esh_time", DaySegmentedTimecode),
        ("last_packet_esh_time", DaySegmentedTimecode),
        ("rs_corrected_packets", c.c_uint32),
        ("total_packets", c.c_uint32),
        ("total_bytes", c.c_uint64),
        ("spare_3", c.c_uint64),
    ]


class Main3Struct(BaseStruct):
    _fields_ = [("spare_4", c.c_uint8 * 3), ("file_count", c.c_uint8)]


class FileStruct(BaseStruct):
    _fields_ = [
        ("file_name", c.c_char * 40),
        ("spare_1", c.c_uint8 * 3),
        ("apid_count", c.c_uint8),
    ]


class FileApidStruct(BaseStruct):
    _fields_ = [
        ("spare_1", c.c_uint8 * 1),
        ("scid", c.c_uint8),
        ("apid", c.c_uint16),
        ("first_packet_time", DaySegmentedTimecode),
        ("last_packet_time", DaySegmentedTimecode),
        ("spare_2", c.c_uint8 * 4),
    ]