From ca5fe382161e5de32ee838a88cfb145bfeeb0c83 Mon Sep 17 00:00:00 2001
From: Bruce Flynn <bmflynn@gmail.com>
Date: Tue, 7 Jul 2015 14:21:28 +0000
Subject: [PATCH] .

---
 edosl0util/cli.py    |   6 +-
 edosl0util/dump.py   |  17 ++--
 edosl0util/split.py  |  28 +++---
 edosl0util/stream.py | 228 +++++++++++++++++++++++++++++++++++--------
 setup.py             |   2 +-
 5 files changed, 211 insertions(+), 70 deletions(-)

diff --git a/edosl0util/cli.py b/edosl0util/cli.py
index a445613..2708516 100644
--- a/edosl0util/cli.py
+++ b/edosl0util/cli.py
@@ -29,9 +29,9 @@ def cmd_dump():
     args = parser.parse_args()
 
     def handler(pkt):
-        print(pkt.h1)
-        if pkt.h2:
-            print("\t%s : %s" % (pkt.stamp, pkt.h2))
+        print(pkt.primary_header)
+        if pkt.secondary_header:
+            print("\t%s : %s" % (pkt.stamp, pkt.secondary_header))
 
     stats = dump.dump(args.pds, handler=(handler if args.verbose else None))
     print("First Packet: {}".format(stats['first_pkt']))
diff --git a/edosl0util/dump.py b/edosl0util/dump.py
index b1dbc97..c2e6332 100644
--- a/edosl0util/dump.py
+++ b/edosl0util/dump.py
@@ -3,7 +3,7 @@ from datetime import datetime
 
 from edos.ccsds import GROUP_FIRST
 
-from edosl0util.stream import packetstream
+from edosl0util.stream import PacketStream
 
 
 MAX_SEQ_ID = 2**14
@@ -21,18 +21,19 @@ def dump(filepath, handler=None):
     missing = 0
     # sequence nums are per APID
     apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0})
-    for pkt in packetstream(filepath):
+    for pkt in PacketStream(filepath):
         packets += 1
-        if pkt.h1.sequence_grouping == GROUP_FIRST:
+        h1 = pkt.primary_header
+        if h1.sequence_grouping == GROUP_FIRST:
             groups += 1
-        apids[pkt.h1.apid]['count'] += 1
+        apids[h1.apid]['count'] += 1
 
-        seq = pkt.h1.source_sequence_count
-        expected_seq = (apids[pkt.h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID
+        seq = h1.source_sequence_count
+        expected_seq = (apids[h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID
         if seq != expected_seq:
             missing += 1
-            apids[pkt.h1.apid]['missing'] += 1
-        apids[pkt.h1.apid]['prev_seq'] = seq
+            apids[h1.apid]['missing'] += 1
+        apids[h1.apid]['prev_seq'] = seq
 
         if pkt.stamp:
             first_pkt = min(pkt.stamp, first_pkt)
diff --git a/edosl0util/split.py b/edosl0util/split.py
index e99aeb3..6629fb6 100644
--- a/edosl0util/split.py
+++ b/edosl0util/split.py
@@ -1,12 +1,7 @@
-import io
-from cStringIO import StringIO
+import array
 
-from edosl0util.timecode import cds_stamp, unixtime
-from edosl0util.stream import packetstream
-
-
-def time_to_roll(time, minutes):
-    return time - time % (minutes * 60)
+from edosl0util.timecode import unixtime
+from edosl0util.stream import PacketStream
 
 
 def split(filepath, minutes):
@@ -15,24 +10,23 @@ def split(filepath, minutes):
 
     A single output file is buffered in memory until it is written.
     """
-    buf = StringIO()  # buffer for a single data file until it is written
+    buf = array.array('c')  # buffer for a single data file until it is written
     cur_bucket = 0  # cur time bucket of size 'minutes'
 
-    for pkt in packetstream(filepath):
+    for pkt in PacketStream(filepath):
         # do the bucketing based on secondary header timestamps
-        if pkt.h2:
-            stamp = cds_stamp(pkt.h2.day, pkt.h2.milliseconds, pkt.h2.microseconds)
-            hdrtime = unixtime(stamp)
+        if pkt.stamp:
+            hdrtime = unixtime(pkt.stamp)
 
             pkt_bucket = hdrtime - hdrtime % (minutes * 60)
             if cur_bucket == 0:
                 cur_bucket = pkt_bucket
 
             if pkt_bucket > cur_bucket:
-                yield cur_bucket, buf.getvalue()
-                buf = StringIO()
+                yield cur_bucket, buf.tostring()
+                buf = array.array('c')
             cur_bucket = pkt_bucket
 
-        buf.write(pkt.data)
+        buf.fromstring(pkt.blob)
 
-    yield cur_bucket, buf.getvalue()
\ No newline at end of file
+    yield cur_bucket, buf.tostring()
\ No newline at end of file
diff --git a/edosl0util/stream.py b/edosl0util/stream.py
index fd48a69..5fa9109 100644
--- a/edosl0util/stream.py
+++ b/edosl0util/stream.py
@@ -1,6 +1,6 @@
 
 import io
-from collections import namedtuple
+from collections import namedtuple, deque, defaultdict
 
 from edos.ccsds import (
     c,
@@ -8,14 +8,13 @@ from edos.ccsds import (
     JpssFirstSecondaryHeader,
     JpssSecondaryHeader,
     GROUP_FIRST,
+    GROUP_LAST,
     GROUP_STANDALONE
 )
 
 from edosl0util.timecode import cds_stamp
 
-
-Packet = namedtuple('Packet', ('h1', 'h2', 'stamp', 'data'))
-
+MAX_SEQ_ID = 2**14
 
 secondary_header_impls = {
     GROUP_FIRST: JpssFirstSecondaryHeader,
@@ -23,55 +22,202 @@ secondary_header_impls = {
 }
 
 
-def packetstream(filepath):
+class Packet(namedtuple('Packet', ('primary_header', 'secondary_header',
+                                   'apid', 'seqid', 'stamp', 'blob'))):
+    def __str__(self):
+        return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp)
+    __repr__ = __str__
+
+
+class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))):
+    def __str__(self):
+        return '<Group apid=%s stamp=%s packets=%d>' % (
+            self.apid, self.packets[0].stamp if self.packets else '', len(self.packets)
+        )
+    __repr__ = __str__
+
+
+class PacketStream(object):
     """
-    Return a generator of `Packet` tuples that will contain the primary header
-    struct and all the packet data, including the header data.
+    Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
+    necessary to generate a single packet is read into memory at a time.
     """
-    with io.open(filepath, mode='rb', buffering=0) as datain:
-        buf = datain.read(c.sizeof(PrimaryHeader))
-        while buf:
-            data = buf
+    def __init__(self, filepath):
+        self._filepath = filepath
+        self._input = io.open(filepath, mode='rb')
 
-            h1 = PrimaryHeader.from_buffer_copy(buf)
+    def __iter__(self):
+        return self
 
-            buf = datain.read(h1.data_length_minus1 + 1)
-            if not buf:
-                break
-            data += buf
+    def _read(self, size):
+        data = self._input.read(size)
+        if not data:
+            raise StopIteration
+        return data
+
+    def next(self):
+        buf = self._read(c.sizeof(PrimaryHeader))
+        data = buf
 
-            stamp = None
-            h2 = None  # only first/standalone have h2 impls
-            H2Impl = secondary_header_impls.get(h1.sequence_grouping)
-            if H2Impl:
-                h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
-                stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
+        h1 = PrimaryHeader.from_buffer_copy(buf)
 
-            yield Packet(h1, h2, stamp, data)
+        buf = self._read(h1.data_length_minus1 + 1)
+        data += buf
 
-            buf = datain.read(c.sizeof(PrimaryHeader))
+        stamp = None
+        h2 = None  # only first/standalone have h2 impls
+        H2Impl = secondary_header_impls.get(h1.sequence_grouping)
+        if H2Impl:
+            h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
+            stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
 
+        return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data)
 
-"""
 
-pdsfiles = sort_pdsfiles_by_pkttime(filenames)
+# FUTURE: Use an iter with a peek function so we don't have to keep items that
+#         were pushed back in memory.
+class _IndecisiveIter(object):
+    """
+    Iterator which allows you to put an item back. Warning, items that are pushed
+    back are kept in memory.
+    """
+    def __init__(self, iterable):
+        self.iterable = iterable
+        self._cache = deque()
 
-apid = None
-seqnum = None
-for pdsfile in pdsfiles:
-    try:
-        pdsfile.seekto(apid, seqnum)
-        for packet in pdsfile.read():
-            output.write(packet)
+    def push_back(self, item):
+        self._cache.append(item)
 
-    except MissingPacket as err:
-        apid = err.apid
-        seqnum = err.seqnum
-        print("missing packet apid:{:d} seqnum:{:d} in {:s}".format(err.apid, err.seqnum, pdsfile))
+    def __iter__(self):
+        return self
 
-    except NoMorePackets:
-        print("{} has no more packets".format(pdsfile))
-        pdsfiles.pop(pdsfile)
+    def next(self):
+        if len(self._cache):
+            return self._cache.pop()
+        return self.iterable.next()
 
 
-"""
+class PacketCollector(object):
+    """
+    Collects packets from a PacketStream into PacketGroup tuples, if applicable.
+    Grouping uses PrimaryHeader.sequence_grouping flag such that any packet
+    between GROUP_FIRST and GROUP_LAST are considered part of the same group.
+    """
+    def __init__(self, stream):
+        self._stream = stream
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        packet = self._stream.next()
+        grouping = packet.primary_header.sequence_grouping
+        if grouping == GROUP_FIRST:
+            blob = bytearray(packet.blob)
+            grppkts = deque([packet])
+            while grouping != GROUP_LAST:
+                packet = self._stream.next()
+                assert not packet.stamp, "Expected only first packet in group to have stamp"
+                blob.extend(packet.blob)
+                grppkts.append(packet)
+                grouping = packet.primary_header.sequence_grouping
+            return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob))
+        return packet
+
+
+class PacketWalker(object):
+    """
+    Provides additional features for a PacketStream, such as seeking and reading
+    packets until a missing sequence id is encountered.
+    """
+    def __init__(self, stream):
+        self._stream = _IndecisiveIter(stream)
+        # seqnums for each apid for locating missing pkts
+        self._seqcache = {}
+        # stamps for each apid so we can seek
+        self._stampcache = defaultdict(lambda: datetime(1900, 1, 1))
+        self._finished = False
+
+    def _next_packet(self):
+        try:
+            packet = self._stream.next()
+            if packet.stamp:
+                self._stampcache[packet.apid] = packet.stamp
+            return packet
+        except StopIteration:
+            self._finished = True
+            raise
+
+    def end_of_stream(self):
+        return self._finished
+
+    def seek(self, stamp=None, apid=None):
+        """
+        Seek forward to a position in the packet stream where stamp is >= the
+        secondary header time. If apid is provided additionally seek to that
+        apid.
+        """
+        if self.end_of_stream():
+            return
+        try:
+            if stamp:
+                self._seek_to_stamp(stamp)
+            if apid is not None:
+                self._seek_to_apid(apid)
+        except StopIteration:
+            pass
+
+    def _seek_to_stamp(self, stamp):
+        while True:
+            packet = self._next_packet()
+            hdr = packet.primary_header
+            if packet.stamp and packet.stamp > self._stampcache[hdr.apid]:
+                self._stream.push_back(packet)
+                return
+
+    def _seek_to_apid(self, apid):
+        while True:
+            packet = self._next_packet()
+            hdr = packet.primary_header
+            if hdr.apid == apid:
+                self._stream.push_back(packet)
+                return
+
+    def _check_if_missing(self, packet):
+        """
+        Return True if this packet does not follow in the current packet in
+        sequence number where sequence number is per APID.
+        """
+        prev_seqid = self._seqcache.get(packet.apid)
+        seqid = packet.seqid
+        missing = False
+
+        if prev_seqid:
+            missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID)
+
+        self._seqcache[packet.apid] = seqid
+        return missing
+
+    def read(self):
+        """
+        Read all remaining packets from current position.
+        """
+        while not self._finished:
+            yield self._next_packet()
+
+    def read_until_missing(self):
+        """
+        Read packets until the first missing sequence number is encountered. The
+        next packet read will be the first packet after the missing.
+        """
+        while not self.end_of_stream():
+            packet = self._next_packet()
+
+            # checking for a seqid gap requires getting the packet after the
+            # gap, therefore we have to put the packet back to be retreived next
+            # time if a missing packet was detected
+            if self._check_if_missing(packet):
+                self._stream.push_back(packet)
+                break
+
+            yield packet
diff --git a/setup.py b/setup.py
index f70dbb0..8ecd8f4 100644
--- a/setup.py
+++ b/setup.py
@@ -14,7 +14,7 @@ setup(
     ],
     entry_points="""
     [console_scripts]
-    l0split = edosl0util.cli:cmd_split
+    edosl0split = edosl0util.cli:cmd_split
     edosl0dump = edosl0util.cli:cmd_dump
     """
 )
\ No newline at end of file
-- 
GitLab