Newer
Older
"""Packet stream editing"""
import attr
import numpy as np
from edosl0util.stream import BasicStream
@attr.s
class PacketEditSpec(object):
apid = attr.ib()
start_bit = attr.ib()
num_bits = attr.ib()
new_value = attr.ib()
def edit_packets(file_path, specs):
# TODO: optimize so not so much is repeatedly happening in the loop
stream = EditablePacketStream(file_path)
apid_specs = {}
for s in specs:
apid_specs.setdefault(s.apid, []).append(s)
for pkt in stream:
for spec in apid_specs:
if pkt.apid in apid_specs:
bits = np.unpackbits(np.fromstring(pkt.read(), np.uint8))
for spec in apid_specs[pkt.apid]:
ini = spec.start_bit
sz = spec.num_bits
fin = ini + sz
if callable(spec.new_value):
old_value = bits[ini:fin].copy()
new_value = spec.new_value(old_value)
else:
new_value = np.asarray(spec.new_value)
new_value = new_value.astype(new_value.dtype.newbyteorder(">"))
new_value = np.unpackbits(new_value[None].view(np.uint8))[-sz:]
bits[ini:fin] = new_value
pkt.write(np.packbits(bits).tobytes())
class EditablePacketStream(object):
def __init__(self, file_path):
self._tracker_stream = BasicStream(open(file_path), with_data=False)
def __iter__(self):
return self
def next(self):
return EditablePacket(self, next(self._tracker_stream))
def read(self, tracker):
self._file.seek(tracker.offset)
return self._file.read(tracker.size)
def write(self, tracker, new_bytes):
self._file.seek(tracker.offset)
self._file.write(new_bytes)
class EditablePacket(object):
def __init__(self, stream, tracker):
self._stream = stream
self._tracker = tracker
@property
def apid(self):
return self._tracker.h1.apid
def read(self):
return self._stream.read(self._tracker)
def write(self, new_bytes):
return self._stream.write(self._tracker, new_bytes)