Skip to content
Snippets Groups Projects
Commit 0e2f1daf authored by Greg Quinn's avatar Greg Quinn
Browse files

Add pkted (packet editor) module

parent 1de5b7a8
No related branches found
No related tags found
No related merge requests found
"""Packet stream editing"""
import attr
import numpy as np
from edosl0util.stream import BasicStream
@attr.s
class PacketEditSpec(object):
apid = attr.ib()
start_bit = attr.ib()
num_bits = attr.ib()
new_value = attr.ib()
def edit_packets(file_path, specs):
# TODO: optimize so not so much is repeatedly happening in the loop
stream = EditablePacketStream(file_path)
apid_specs = {}
for s in specs:
apid_specs.setdefault(s.apid, []).append(s)
for pkt in stream:
for spec in apid_specs:
if pkt.apid in apid_specs:
bits = np.unpackbits(np.fromstring(pkt.read(), np.uint8))
for spec in apid_specs[pkt.apid]:
ini = spec.start_bit
sz = spec.num_bits
fin = ini + sz
new_arr = np.asarray(spec.new_value)
new_arr = new_arr.astype(new_arr.dtype.newbyteorder('>'))
bits[ini:fin] = np.unpackbits(new_arr[None].view(np.uint8))[-sz:]
pkt.write(np.packbits(bits).tobytes())
class EditablePacketStream(object):
def __init__(self, file_path):
self._tracker_stream = BasicStream(open(file_path), with_data=False)
self._file = open(file_path, 'r+')
def __iter__(self):
return self
def next(self):
return EditablePacket(self, next(self._tracker_stream))
def read(self, tracker):
self._file.seek(tracker.offset)
return self._file.read(tracker.size)
def write(self, tracker, new_bytes):
self._file.seek(tracker.offset)
self._file.write(new_bytes)
class EditablePacket(object):
def __init__(self, stream, tracker):
self._stream = stream
self._tracker = tracker
@property
def apid(self):
return self._tracker.h1.apid
def read(self):
return self._stream.read(self._tracker)
def write(self, new_bytes):
return self._stream.write(self._tracker, new_bytes)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment