Skip to content
Snippets Groups Projects
pkted.py 2.23 KiB
Newer Older
"""Packet stream editing"""

import attr
import numpy as np

from edosl0util.stream import BasicStream

Bruce Flynn's avatar
Bruce Flynn committed

@attr.s
class PacketEditSpec(object):
    apid = attr.ib()
    start_bit = attr.ib()
    num_bits = attr.ib()
    new_value = attr.ib()


def edit_packets(file_path, specs):
    # TODO: optimize so not so much is repeatedly happening in the loop
    stream = EditablePacketStream(file_path)
    apid_specs = {}
    for s in specs:
        apid_specs.setdefault(s.apid, []).append(s)
    for pkt in stream:
        for spec in apid_specs:
            if pkt.apid in apid_specs:
                bits = np.unpackbits(np.fromstring(pkt.read(), np.uint8))
                for spec in apid_specs[pkt.apid]:
                    ini = spec.start_bit
                    sz = spec.num_bits
                    fin = ini + sz
Greg Quinn's avatar
Greg Quinn committed
                    if callable(spec.new_value):
                        old_value = bits[ini:fin].copy()
                        new_value = spec.new_value(old_value)
                    else:
                        new_value = np.asarray(spec.new_value)
Bruce Flynn's avatar
Bruce Flynn committed
                        new_value = new_value.astype(new_value.dtype.newbyteorder(">"))
Greg Quinn's avatar
Greg Quinn committed
                        new_value = np.unpackbits(new_value[None].view(np.uint8))[-sz:]
                    bits[ini:fin] = new_value
                pkt.write(np.packbits(bits).tobytes())


class EditablePacketStream(object):
    def __init__(self, file_path):
        self._tracker_stream = BasicStream(open(file_path), with_data=False)
Bruce Flynn's avatar
Bruce Flynn committed
        self._file = open(file_path, "r+")

    def __iter__(self):
        return self

    def next(self):
        return EditablePacket(self, next(self._tracker_stream))

    def read(self, tracker):
        self._file.seek(tracker.offset)
        return self._file.read(tracker.size)

    def write(self, tracker, new_bytes):
        self._file.seek(tracker.offset)
        self._file.write(new_bytes)


class EditablePacket(object):
    def __init__(self, stream, tracker):
        self._stream = stream
        self._tracker = tracker

    @property
    def apid(self):
        return self._tracker.h1.apid

    def read(self):
        return self._stream.read(self._tracker)

    def write(self, new_bytes):
        return self._stream.write(self._tracker, new_bytes)