diff --git a/edosl0util/pkted.py b/edosl0util/pkted.py new file mode 100644 index 0000000000000000000000000000000000000000..e6615d89805cd5d5471502b4aeaa8e33ccb068d4 --- /dev/null +++ b/edosl0util/pkted.py @@ -0,0 +1,69 @@ +"""Packet stream editing""" + +import attr +import numpy as np + +from edosl0util.stream import BasicStream + +@attr.s +class PacketEditSpec(object): + apid = attr.ib() + start_bit = attr.ib() + num_bits = attr.ib() + new_value = attr.ib() + + +def edit_packets(file_path, specs): + # TODO: optimize so not so much is repeatedly happening in the loop + stream = EditablePacketStream(file_path) + apid_specs = {} + for s in specs: + apid_specs.setdefault(s.apid, []).append(s) + for pkt in stream: + for spec in apid_specs: + if pkt.apid in apid_specs: + bits = np.unpackbits(np.fromstring(pkt.read(), np.uint8)) + for spec in apid_specs[pkt.apid]: + ini = spec.start_bit + sz = spec.num_bits + fin = ini + sz + new_arr = np.asarray(spec.new_value) + new_arr = new_arr.astype(new_arr.dtype.newbyteorder('>')) + bits[ini:fin] = np.unpackbits(new_arr[None].view(np.uint8))[-sz:] + pkt.write(np.packbits(bits).tobytes()) + + +class EditablePacketStream(object): + def __init__(self, file_path): + self._tracker_stream = BasicStream(open(file_path), with_data=False) + self._file = open(file_path, 'r+') + + def __iter__(self): + return self + + def next(self): + return EditablePacket(self, next(self._tracker_stream)) + + def read(self, tracker): + self._file.seek(tracker.offset) + return self._file.read(tracker.size) + + def write(self, tracker, new_bytes): + self._file.seek(tracker.offset) + self._file.write(new_bytes) + + +class EditablePacket(object): + def __init__(self, stream, tracker): + self._stream = stream + self._tracker = tracker + + @property + def apid(self): + return self._tracker.h1.apid + + def read(self): + return self._stream.read(self._tracker) + + def write(self, new_bytes): + return self._stream.write(self._tracker, new_bytes)