"""Packet stream editing""" import attr import numpy as np from edosl0util.stream import BasicStream @attr.s class PacketEditSpec(object): apid = attr.ib() start_bit = attr.ib() num_bits = attr.ib() new_value = attr.ib() def edit_packets(file_path, specs): # TODO: optimize so not so much is repeatedly happening in the loop stream = EditablePacketStream(file_path) apid_specs = {} for s in specs: apid_specs.setdefault(s.apid, []).append(s) for pkt in stream: for spec in apid_specs: if pkt.apid in apid_specs: bits = np.unpackbits(np.fromstring(pkt.read(), np.uint8)) for spec in apid_specs[pkt.apid]: ini = spec.start_bit sz = spec.num_bits fin = ini + sz if callable(spec.new_value): old_value = bits[ini:fin].copy() new_value = spec.new_value(old_value) else: new_value = np.asarray(spec.new_value) new_value = new_value.astype(new_value.dtype.newbyteorder(">")) new_value = np.unpackbits(new_value[None].view(np.uint8))[-sz:] bits[ini:fin] = new_value pkt.write(np.packbits(bits).tobytes()) class EditablePacketStream(object): def __init__(self, file_path): self._tracker_stream = BasicStream(open(file_path), with_data=False) self._file = open(file_path, "r+") def __iter__(self): return self def next(self): return EditablePacket(self, next(self._tracker_stream)) def read(self, tracker): self._file.seek(tracker.offset) return self._file.read(tracker.size) def write(self, tracker, new_bytes): self._file.seek(tracker.offset) self._file.write(new_bytes) class EditablePacket(object): def __init__(self, stream, tracker): self._stream = stream self._tracker = tracker @property def apid(self): return self._tracker.h1.apid def read(self): return self._stream.read(self._tracker) def write(self, new_bytes): return self._stream.write(self._tracker, new_bytes)