"""PDS construction record input and output""" import ctypes as c import numbers from edosl0util.headers import BaseStruct, DaySegmentedTimecode def read(cr_file): """Parse a PDS construction record from a file (*00.PDS)""" def main(): rv = {} with open(cr_file, 'rb') as f: read_into_dict(f, Main1Struct, rv) rv['scs_info'] = [read_struct(f, ScsStruct) for _ in range(rv['scs_count'])] read_into_dict(f, Main2Struct, rv) rv['apid_info'] = [] for _ in range(rv['apid_count']): d = {} read_into_dict(f, Apid1Struct, d) d['vcid_info'] = [read_struct(f, ApidVcidStruct) for _ in range(d['vcid_count'])] read_into_dict(f, Apid2Struct, d) d['gap_info'] = [read_struct(f, ApidGapStruct) for _ in range(d['gap_count'])] read_into_dict(f, Apid3Struct, d) d['fill_packet_info'] = [ read_struct(f, ApidFillStruct) for _ in range(d['fill_packets'])] read_into_dict(f, Apid4Struct, d) d['mismatched_length_packet_ssc_list'] = [ read_struct(f, ApidMismatchedLengthStruct)['packet_ssc'] for i in range(d['mismatched_length_packets'])] read_into_dict(f, Apid5Struct, d) rv['apid_info'].append(d) read_into_dict(f, Main3Struct, rv) rv['file_info'] = [] for _ in range(rv['file_count']): d = {} read_into_dict(f, FileStruct, d) d['apid_info'] = [read_struct(f, FileApidStruct) for _ in range(d['apid_count'])] if d['apid_count'] == 0: # bogus all-zero apid struct is present for the CR file read_struct(f, FileApidStruct) rv['file_info'].append(d) extra = f.read() if extra: raise ValueError('{} bytes remain after reading CR'.format(len(extra))) return rv def read_into_dict(f, struct, data): data.update(read_struct(f, struct)) def read_struct(f, struct): rv = struct_to_dict(struct.from_buffer_copy(f.read(c.sizeof(struct)))) rv = {k: v for k, v in rv.items() if not k.startswith('spare_')} # no spare fields return {k: int(v) if isinstance(v, numbers.Integral) else v for k, v in rv.items()} # no longs return main() def write(cr, out_file): """Write out a PDS construction record file""" def main(): with open(out_file, 'wb') as f: write_struct(cr, Main1Struct, f) for d in cr['scs_info']: write_struct(d, ScsStruct, f) write_struct(cr, Main2Struct, f) for d in cr['apid_info']: write_struct(d, Apid1Struct, f) for dd in d['vcid_info']: write_struct(dd, ApidVcidStruct, f) write_struct(d, Apid2Struct, f) for dd in d['gap_info']: write_struct(dd, ApidGapStruct, f) write_struct(d, Apid3Struct, f) for dd in d['fill_packet_info']: write_struct(dd, ApidFillStruct, f) write_struct(d, Apid4Struct, f) for ssc in d['mismatched_length_packet_ssc_list']: write_struct({'ssc': ssc}, ApidMismatchedLengthStruct, f) write_struct(d, Apid5Struct, f) write_struct(cr, Main3Struct, f) for d in cr['file_info']: write_struct(d, FileStruct, f) for dd in d['apid_info']: write_struct(dd, FileApidStruct, f) if d['apid_count'] == 0: write_struct({}, FileApidStruct, f) # one all-zero apid struct if no others def write_struct(data, struct, out): fields = [f[0] for f in struct._fields_] struct_data = {} for k, v in data.items(): if k in fields: if isinstance(v, str): struct_data[k] = v.encode('utf-8') else: struct_data[k] = v out.write(memoryview(struct(**struct_data))) main() def struct_to_dict(s): return {f[0]: getattr(s, f[0]) for f in s._fields_} class Main1Struct(BaseStruct): _fields_ = [('edos_sw_ver_major', c.c_uint8), ('edos_sw_ver_minor', c.c_uint8), ('cr_type', c.c_uint8), ('spare_1', c.c_uint8 * 1), ('pds_id', c.c_char * 36), ('test_flag', c.c_uint8), ('spare_2', c.c_uint8 * 9), ('scs_count', c.c_uint16)] class ScsStruct(BaseStruct): _fields_ = [('start', DaySegmentedTimecode), ('stop', DaySegmentedTimecode)] class Main2Struct(BaseStruct): _fields_ = [('fill_bytes', c.c_uint64), ('mismatched_length_packets', c.c_uint32), ('first_packet_time', DaySegmentedTimecode), ('last_packet_time', DaySegmentedTimecode), ('first_packet_esh_time', DaySegmentedTimecode), ('last_packet_esh_time', DaySegmentedTimecode), ('rs_corrected_packets', c.c_uint32), ('total_packets', c.c_uint32), ('total_bytes', c.c_uint64), ('gap_count', c.c_uint32), ('completion_time', DaySegmentedTimecode), ('spare_3', c.c_uint8 * 7), ('apid_count', c.c_uint8)] class Apid1Struct(BaseStruct): _fields_ = [('spare_1', c.c_uint8 * 1), ('scid', c.c_uint8), ('apid', c.c_uint16), ('first_packet_offset', c.c_uint64), ('spare_2', c.c_uint8 * 3), ('vcid_count', c.c_uint8)] class ApidVcidStruct(BaseStruct): _fields_ = [('spare_1', c.c_uint16), ('spare_2', c.c_uint16, 2), ('scid', c.c_uint16, 8), ('vcid', c.c_uint16, 6)] class Apid2Struct(BaseStruct): _fields_ = [('gap_count', c.c_uint32)] class ApidGapStruct(BaseStruct): _fields_ = [('first_missing_ssc', c.c_uint32), ('post_gap_packet_offset', c.c_uint64), ('missing_packet_count', c.c_uint32), ('pre_gap_packet_time', DaySegmentedTimecode), ('post_gap_packet_time', DaySegmentedTimecode), ('pre_gap_packet_esh_time', DaySegmentedTimecode), ('post_gap_packet_esh_time', DaySegmentedTimecode)] class Apid3Struct(BaseStruct): _fields_ = [('fill_packets', c.c_uint32)] class ApidFillStruct(BaseStruct): _fields_ = [('packet_ssc', c.c_uint32), ('packet_offset', c.c_uint64), ('first_fill_byte', c.c_uint32)] class Apid4Struct(BaseStruct): _fields_ = [('fill_bytes', c.c_uint64), ('mismatched_length_packets', c.c_uint32)] class ApidMismatchedLengthStruct(BaseStruct): _fields_ = [('packet_ssc', c.c_uint32)] class Apid5Struct(BaseStruct): _fields_ = [('first_packet_time', DaySegmentedTimecode), ('last_packet_time', DaySegmentedTimecode), ('first_packet_esh_time', DaySegmentedTimecode), ('last_packet_esh_time', DaySegmentedTimecode), ('rs_corrected_packets', c.c_uint32), ('total_packets', c.c_uint32), ('total_bytes', c.c_uint64), ('spare_3', c.c_uint64)] class Main3Struct(BaseStruct): _fields_ = [('spare_4', c.c_uint8 * 3), ('file_count', c.c_uint8)] class FileStruct(BaseStruct): _fields_ = [('file_name', c.c_char * 40), ('spare_1', c.c_uint8 * 3), ('apid_count', c.c_uint8)] class FileApidStruct(BaseStruct): _fields_ = [('spare_1', c.c_uint8 * 1), ('scid', c.c_uint8), ('apid', c.c_uint16), ('first_packet_time', DaySegmentedTimecode), ('last_packet_time', DaySegmentedTimecode), ('spare_2', c.c_uint8 * 4)]