From 7c6bd7fd1c45d0f3f87abdcaa32eba3987ecdc0f Mon Sep 17 00:00:00 2001
From: Bruce Flynn <brucef@ssec.wisc.edu>
Date: Tue, 20 Oct 2015 03:00:46 +0000
Subject: [PATCH] Reimpl merge. Support stream that doesn't read data.

Replaced the merge code with something less efficient but easier to
grok.

New stream impl that does not read the data that will be used to create
and index used in new merge code.
---
 edosl0util/_old_merge.py | 183 ++++++++++++++++++++++++++++++++++++
 edosl0util/cli.py        |  42 +++++++--
 edosl0util/headers.py    |  15 ++-
 edosl0util/jpssrdr.py    |  14 +--
 edosl0util/merge.py      | 198 +++++++++++++++++++++++----------------
 edosl0util/split.py      |  20 ++--
 edosl0util/stream.py     | 191 ++++++++++++++-----------------------
 edosl0util/trunc.py      |   5 +-
 8 files changed, 425 insertions(+), 243 deletions(-)
 create mode 100644 edosl0util/_old_merge.py

diff --git a/edosl0util/_old_merge.py b/edosl0util/_old_merge.py
new file mode 100644
index 0000000..87b01eb
--- /dev/null
+++ b/edosl0util/_old_merge.py
@@ -0,0 +1,183 @@
+# encoding: utf-8
+__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
+
+import io
+import sys
+import logging
+from collections import deque
+
+from edosl0util.stream import (
+    jpss_packet_stream,
+    NonConsecutiveSeqId,
+    PacketTooShort
+)
+
+LOG = logging.getLogger(__name__)
+
+
+class InvalidPacketGroup(Exception):
+    """
+    If a there exists packets in a group with different apids or if the group
+    does not end in a last packet.
+    """
+
+
+def _group_packets(stream):
+    """
+    Returns a generator that yields all packets until a timestamps, i.e., a
+    packet group.
+    """
+    packet = stream.next()
+    while not packet.stamp:
+        yield packet
+        packet = stream.next()
+    # put the next packet with a stamp so it will be next
+    stream.push_back(packet)
+
+
+class StreamManager(object):
+
+    def __init__(self, streams):
+        self.streams = list(streams)
+        self.current_idx = 0
+        self.valid = [True]*len(self.streams)
+        self.tried = [False]*len(self.streams)
+        self.handling_error = False
+
+    def invalid(self, stream):
+        idx = self.streams.index(stream)
+        self.valid[idx] = False
+        self.current_idx = 0
+
+    end_of_stream = invalid
+
+    def have_valid(self):
+        return any(self.valid)
+
+    def set_error(self, stream):
+        idx = self.streams.index(stream)
+        self.tried[idx] = True
+
+    missing_seqid = set_error
+    invalid_group = set_error
+
+    def next(self):
+        if self.handling_error:
+            if all(self.tried):
+                # tried every stream, can't recover, move on
+                self.tried = [False]*len(self.stream)
+                self.handling_error = False
+            else:
+                # there is a stream we haven't tried, use it
+                self.current_idx = self.tried.index(False)
+                self.tried[self.current_idx] = True
+                return self.streams[self.current_idx]
+
+        self.current_idx = self.valid.index(True, self.current_idx)
+        return self.streams[self.current_idx]
+
+
+def next_stamp_and_apid(stream, stamp, apid):
+    LOG.debug("seeking to %s apid %s", stamp, apid)
+    stream.seek_to(stamp, apid)
+    stream.seek_to_next_stamp()
+
+
+def next_stamp(stream, stamp):
+    LOG.debug("seeking to %s", stamp)
+    stream.seek_to(stamp)
+
+
+def merge(streams, output=sys.stdout):
+    last_packet = None
+
+    streams = StreamManager(streams)
+    stream = streams.next()
+    while streams.have_valid():
+        LOG.debug("stream %s", stream)
+        LOG.debug("streams valid: %s", streams.valid)
+        LOG.debug("streams handling_error:%s tried:%s", streams.handling_error, streams.tried)
+        LOG.debug("streams current_idx: %s", streams.current_idx)
+        try:
+            # Do until `next` causes StopIteration
+            while True:
+                packets_to_write = deque()
+                packet = stream.next()
+                if packet.is_standalone():
+                    packets_to_write.append(packet)
+
+                elif packet.is_first():  # packet group
+                    group = deque([packet])
+                    group.extend(_group_packets(stream))
+                    if not group[0].is_first() \
+                            and group[-1].is_last() \
+                            and group[0].apid == group[-1].apid:
+                        raise InvalidPacketGroup()
+                    packets_to_write.extend(group)
+
+                else:
+                    LOG.debug("skipping hanging packet: %s", packet)
+
+                # First packet always has a stamp because it's either
+                # standalone or part of a valid group
+                last_packet = packets_to_write[0]
+                while packets_to_write:
+                    pkt = packets_to_write.popleft()
+                    output.write(pkt.bytes())
+
+        except NonConsecutiveSeqId:
+            LOG.debug('missing sequence id, next stream')
+            streams.missing_seqid(stream)
+
+            LOG.debug("seeking to %s apid %s", last_packet.stamp, last_packet.apid)
+            stream = streams.next()
+
+        except InvalidPacketGroup:
+            LOG.debug("invalid group, next stream")
+            streams.invalid_group(stream)
+            stream = streams.next()
+            next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid)
+
+        except PacketTooShort as err:
+            LOG.error("corrupt stream, removing: %s", err)
+            streams.invalid(stream)
+            stream = streams.next()
+            next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid)
+
+        except StopIteration:
+            LOG.debug("end-of-stream %s", stream)
+            streams.end_of_stream(stream)
+            if streams.have_valid():
+                stream = streams.next()
+                next_stamp(stream, last_packet.stamp)
+
+
+def merge_files(filepaths, destpath):
+    filepaths, streams = make_streams(filepaths, fail_on_missing=True)
+    for i in range(0, len(filepaths)):
+        LOG.debug("stream %s filepath %s", streams[i], filepaths[i])
+    merge(streams, output=io.open(destpath, 'wb'))
+
+
+def _cmp_by_first_pkt(filea, fileb):
+    """
+    Compare 2 L0 files by their first available timestamp. Used to sort L0 files
+    by time.
+    """
+    stampa = stampb = None
+    for pkt in jpss_packet_stream(io.open(filea, 'rb')):
+        if pkt.stamp:
+            stampa = pkt.stamp
+            break
+    for pkt in jpss_packet_stream(io.open(fileb, 'rb')):
+        if pkt.stamp:
+            stampb = pkt.stamp
+            break
+    return cmp(stampa, stampb)
+
+
+def make_streams(*filepaths):
+    return [
+        jpss_packet_stream(io.open(f, 'rb'))
+        for f in sorted(filepaths, cmp=_cmp_by_first_pkt)
+    ]
diff --git a/edosl0util/cli.py b/edosl0util/cli.py
index 51059c4..c07a779 100644
--- a/edosl0util/cli.py
+++ b/edosl0util/cli.py
@@ -58,17 +58,19 @@ def cmd_split():
 
 def cmd_info():
     parser = _default_parser()
+    parser.add_argument('-a', '--aqua', action='store_true')
     parser.add_argument('filepath')
     args = parser.parse_args()
     _configure_logging(args)
 
     num_packets = 0
-    packets = stream.PacketStream(io.open(args.filepath, 'rb'))
-    first = datetime(3000, 1, 1)
-    last = datetime(1970, 1, 1)
+    if not args.aqua:
+        packets = stream.jpss_packet_stream(io.open(args.filepath, 'rb'))
+    else:
+        packets = stream.aqua_packet_stream(io.open(args.filepath, 'rb'))
     while True:
         try:
-            packet = packets.next()
+            packets.next()
             num_packets += 1
         except stream.PacketTooShort as err:
             LOG.warn("corrupt packet stream after %d packets: %s",
@@ -76,9 +78,6 @@ def cmd_info():
             break
         except StopIteration:
             break
-        if packet.stamp:
-            first = min(packet.stamp, first)
-            last = max(packet.stamp, last)
     total = 0
     first, last, info = packets.info()
     LOG.info("First: %s", first)
@@ -92,18 +91,43 @@ def cmd_info():
 def cmd_merge():
     parser = _default_parser()
     parser.add_argument('-o', '--output', default='out.pds')
+    def interval(v):
+        dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
+        return [dt(x) for x in v.split(',')]
+    parser.add_argument(
+        '-t', '--trunc-to', type=interval,
+        help=('Truncate to the interval given as coma separated timestamps of '
+              'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the '
+              'end time is exclusive.'))
     parser.add_argument('pds', nargs='+')
     args = parser.parse_args()
+
     _configure_logging(args)
 
-    merge.merge_files(args.pds, args.output)
+    index = set()  # will remove duplicates
+    for filepath in args.pds:
+        LOG.debug('indexing %s', filepath)
+        fobj = io.open(filepath, 'rb')
+        index.update(merge.read_packet_index(fobj))
+
+    LOG.info('sorting index with %d pointers', len(index))
+    index = sorted(index)
+    LOG.info('writing index to %s', args.output)
+    with io.open(args.output, 'wb') as fobj:
+        for ptr in index:
+            if args.trunc_to:
+                interval = args.trunc_to
+                if ptr.stamp >= interval[0] and ptr.stamp < interval[1]:
+                    fobj.write(ptr.bytes())
+            else:
+                fobj.write(ptr.bytes())
 
 
 def cmd_rdr2l0():
     parser = _default_parser()
     parser.add_argument('-o', '--output')
     parser.add_argument('-f', '--skipfill', action='store_true')
-    parser.add_argument('sensor', choices=('viirs', 'atms', 'cris'))
+    parser.add_argument('sensor', choices=('atms', 'cris'))
     parser.add_argument('rdr')
     args = parser.parse_args()
     _configure_logging(args)
diff --git a/edosl0util/headers.py b/edosl0util/headers.py
index d4067ad..bb017f7 100644
--- a/edosl0util/headers.py
+++ b/edosl0util/headers.py
@@ -53,6 +53,9 @@ class PrimaryHeader(BaseStruct):
     ]
 
 
+PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader)
+
+
 class Timecode(BaseStruct):
     """
     Secondary header timecode baseclass.
@@ -169,12 +172,14 @@ class JpssFirstSecondaryHeader(BaseStruct):
     ]
 
 
+_jpss_headers = {
+    GROUP_FIRST: JpssFirstSecondaryHeader,
+    GROUP_CONTINUING: None,
+    GROUP_LAST: None,
+    GROUP_STANDALONE: JpssSecondaryHeader,
+}
 def jpss_header_lookup(primary_header):
-    grouping = primary_header.sequence_grouping
-    if grouping == GROUP_FIRST:
-        return JpssFirstSecondaryHeader
-    elif grouping == GROUP_STANDALONE:
-        return JpssSecondaryHeader
+    return _jpss_headers.get(primary_header.sequence_grouping)
 
 
 def amsu_headers():
diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py
index 70b9607..d8ef6a8 100644
--- a/edosl0util/jpssrdr.py
+++ b/edosl0util/jpssrdr.py
@@ -182,17 +182,6 @@ def sort_packets_by_apid(packets, order=None):
         return sorted(packets, key=lambda p: p.packet.apid)
 
 
-def sort_packets_edos(rdr, packets):
-    """
-    Sort packets by timestamp and order the apids the same as EDOS NASA L0
-    files.
-    """
-    order = None
-    if rdr.header.sensor.lower() == 'viirs':
-        order = [826, 821] + range(0, 826)
-    return sort_packets_by_obs_time(sort_packets_by_apid(packets, order=order))
-
-
 def convert_to_nasa_l0(sensor, filename, skipfill=False):
     """
     Convert a JSPP RDR to a NASA L0 PDS file.
@@ -201,8 +190,7 @@ def convert_to_nasa_l0(sensor, filename, skipfill=False):
         LOG.debug(rdr.header)
         for apid in rdr.apids:
             LOG.debug(apid)
-        packets = sort_packets_edos(rdr, rdr.packets())
-        for packet in packets:
+        for packet in rdr.packets():
             if packet.tracker.fill_percent != 0:
                 LOG.debug('fill: %s', packet.tracker)
                 if skipfill:
diff --git a/edosl0util/merge.py b/edosl0util/merge.py
index 2478fe8..dcc67cf 100644
--- a/edosl0util/merge.py
+++ b/edosl0util/merge.py
@@ -1,95 +1,129 @@
-# encoding: utf-8
-__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
+"""
 
+1. Cat PDS files together
+2. Index packets
+    - drop any leading hanging packets
+    -
+3. Sort index
+4. Write
+
+"""
 import io
-import sys
+import os
 import logging
+from datetime import datetime
 from collections import deque
 
-from edosl0util.stream import (
-    make_streams,
-    NonConsecutiveSeqId,
-    PacketTooShort
-)
+from edosl0util import headers
+from edosl0util.stream import PacketStream, BasicStream
 
 LOG = logging.getLogger(__name__)
 
 
-class InvalidPacketGroup(Exception):
-    """
-    If a there exists packets in a group with different apids or if the group
-    does not end in a last packet.
-    """
-
-
-def _group_packets(stream):
+class _Ptr(object):
     """
-    Returns a generator that yields all packets until a timestamps, i.e., a
-    packet group.
+    Represents one or more packets that share the same time stamp and apid.
     """
-    packet = stream.next()
-    while not packet.stamp:
-        yield packet
+    def __init__(self, fobj, stamp, apid, offset, size):
+        self.fobj = fobj
+        self.stamp = stamp
+        self.apid = apid
+        self.offset = offset
+        self.size = size
+        self.count = 1
+
+    def __repr__(self):
+        attrs = ' '.join(
+                '{}={}'.format(k, v)
+                for k, v in sorted(vars(self).items())
+                if not k.startswith('_'))
+
+        return '<{:s} {:s}>'.format(self.__class__.__name__, attrs)
+
+    def __cmp__(self, that):
+        return cmp(
+            (self.stamp, self.apid),
+            (that.stamp, that.apid)
+        )
+
+    # instances with same stamp/apid will compare the same
+    def __hash__(self):
+        return hash((self.stamp, self.apid))
+
+    def bytes(self):
+        self.fobj.seek(self.offset, os.SEEK_SET)
+        return self.fobj.read(self.size)
+
+
+def read_packet_index(fobj):
+    lookup = headers.jpss_header_lookup
+    stream = PacketStream(BasicStream(fobj, lookup, with_data=False))
+    index = deque()
+
+    try:
+        # drop any leading hanging packets
+        count = 0
         packet = stream.next()
-    # put the next packet with a stamp so it will be next
-    stream.push_back(packet)
-
-
-def merge(streams, output=sys.stdout):
-    last_packet = None
-    # streams are removed as they are exhausted
-    streams = list(streams)
-    while streams:
-        stream = streams.pop(0)
-        try:
-            if last_packet is not None:
-                LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid)
-                stream.seek_to(last_packet.stamp, last_packet.apid)
-                stream.seek_to_next_stamp()
-
-            # Do until `next` causes StopIteration
-            while True:
-                packets_to_write = deque()
+        while not packet.stamp:
+            packet = stream.next()
+            count += 1
+        if count:
+            LOG.info('dropped %d leading packets', count)
+
+        while True:
+            ptr = _Ptr(
+                fobj,
+                stamp=packet.stamp,
+                apid=packet.apid,
+                offset=packet.offset,
+                size=packet.size,
+            )
+            index.append(ptr)
+            # collect all packets for this stamp/group
+            packet = stream.next()
+            while not packet.stamp:
+                ptr.size += packet.size
+                ptr.count += 1
                 packet = stream.next()
-                if packet.is_standalone():
-                    packets_to_write.append(packet)
-
-                elif packet.is_first():  # packet group
-                    group = deque([packet])
-                    group.extend(_group_packets(stream))
-                    if not group[0].is_first() \
-                            and group[-1].is_last() \
-                            and group[0].apid == group[-1].apid:
-                        raise InvalidPacketGroup()
-                    packets_to_write.extend(group)
-
-                else:
-                    LOG.debug("skipping hanging packet: %s", packet)
-
-                # First packet always has a stamp because it's either
-                # standalone or part of a valid group
-                last_packet = packets_to_write[0]
-                while packets_to_write:
-                    pkt = packets_to_write.popleft()
-                    output.write(pkt.bytes())
-
-        except NonConsecutiveSeqId:
-            streams.append(stream)  # stream still usable
-            LOG.debug('missing sequence id, next stream')
-
-        except InvalidPacketGroup:
-            streams.append(stream)  # stream still usable
-            LOG.debug("invalid group, switching streams:%s", group)
-
-        except PacketTooShort as err:
-            LOG.error("corrupt stream, removing: %s", err)
-
-        except StopIteration:
-            LOG.debug("end-of-stream %s", stream)
-
-
-def merge_files(filepaths, destpath):
-    filepaths, streams = make_streams(filepaths, fail_on_missing=True)
-    for i in range(0, len(filepaths)):
-        LOG.debug("stream %s filepath %s", streams[i], filepaths[i])
-    merge(streams, output=io.open(destpath, 'wb'))
+
+    except StopIteration:
+        pass
+
+    return index
+
+
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-o', '--output', default='out.pds')
+    dtarg = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
+    def interval(v):
+        dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
+        return [dt(x) for x in v.split(',')]
+    parser.add_argument(
+        '-t', '--trunc-to', type=interval,
+        help=('Truncate to the interval given as coma separated timestamps of '
+              'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the '
+              'end time is exclusive.'))
+    parser.add_argument('pds', nargs='+')
+    args = parser.parse_args()
+
+    logging.basicConfig(level=logging.DEBUG, format='%(message)s')
+
+    index = set()  # will remove duplicates
+    for filepath in args.pds:
+        LOG.debug('indexing %s', filepath)
+        fobj = io.open(filepath, 'rb')
+        index.update(read_packet_index(fobj))
+
+    LOG.info('sorting index with %d pointers', len(index))
+    index = sorted(index)
+    LOG.info('writing index to %s', args.output)
+    with io.open(args.output, 'wb') as fobj:
+        for ptr in index:
+            if args.trunc_to:
+                interval = args.trunc_to
+                if ptr.stamp >= interval[0] and ptr.stamp < interval[1]:
+                    fobj.write(ptr.bytes())
+            else:
+                fobj.write(ptr.bytes())
diff --git a/edosl0util/split.py b/edosl0util/split.py
index 7b6a0bd..aff0442 100644
--- a/edosl0util/split.py
+++ b/edosl0util/split.py
@@ -6,10 +6,10 @@ import io
 from datetime import datetime
 
 from edosl0util.timecode import unixtime
-from edosl0util.stream import PacketStream
+from edosl0util.stream import jpss_packet_stream
 
 
-def split_stream(fobj, minutes):
+def split_stream(stream, minutes):
     """Split a VIIRS L0 PDS data stream into data blobs based on their scan
     time mod the number of minutes provided.
 
@@ -20,8 +20,7 @@ def split_stream(fobj, minutes):
     cur_bucket = 0  # cur time bucket of size 'minutes'
 
     pkt_count = 0
-    original_offset = fobj.tell()
-    for pkt in PacketStream(fobj):
+    for pkt in stream:
         # do the bucketing based on secondary header timestamps
         if pkt.stamp:
             hdrtime = unixtime(pkt.stamp)
@@ -31,8 +30,7 @@ def split_stream(fobj, minutes):
                 cur_bucket = pkt_bucket
 
             if pkt_bucket > cur_bucket:
-                offset = fobj.tell() - original_offset
-                yield cur_bucket, offset, pkt_count, buf
+                yield cur_bucket, pkt_count, buf
                 pkt_count = 0
                 buf = bytearray()
             cur_bucket = pkt_bucket
@@ -41,8 +39,7 @@ def split_stream(fobj, minutes):
         buf += pkt.bytes()
         pkt_count += 1
 
-    offset = fobj.tell() - original_offset
-    yield cur_bucket, offset, pkt_count, buf
+    yield cur_bucket, pkt_count, buf
 
 
 def _replace_pdsname_stamp(filename, stamp):
@@ -75,8 +72,7 @@ def split_file(filepath, minutes, destdir):
     """
     Split a level0 PDS file into X minutes files by filename.
 
-    :param filepath: Path to a Level0 PDS file. It is assumed the file as a
-        standard level 0 PDS filename.
+    :param filepath: Path to a Level0 PDS file, with a standard L0 PDS filename.
     :param minutes: Number of minutes per bucket. Buckets always start at the
         top of the hour. For example, a bucket size of 6 will create 10 6-min
         buckets starting at minutes 0, 6, 12, etc ...
@@ -87,8 +83,8 @@ def split_file(filepath, minutes, destdir):
     :raises RuntimeError: If a file exists with the same name of a bucket file.
     """
     destdir = destdir or '.'
-    stream = split_stream(io.open(filepath, 'rb'), minutes)
-    for timestamp, offset, pkts, blob in stream:
+    stream = split_stream(jpss_packet_stream(io.open(filepath, 'rb')), minutes)
+    for timestamp, pkts, blob in stream:
         stamp = datetime.utcfromtimestamp(timestamp)
         dirname, filename = os.path.split(filepath)
         newname = _filename_for_splitfile(filename, stamp, minutes)
diff --git a/edosl0util/stream.py b/edosl0util/stream.py
index 2c7c075..e23225a 100644
--- a/edosl0util/stream.py
+++ b/edosl0util/stream.py
@@ -1,7 +1,8 @@
 import io
+import os
 import logging
 import ctypes as c
-from collections import deque, defaultdict
+from collections import deque, defaultdict, namedtuple
 
 from edosl0util.headers import (
     PrimaryHeader,
@@ -31,9 +32,6 @@ class PacketTooShort(Error):
     After this error occurs the stream is no longer usable because data offsets
     are no longer reliable.
     """
-    def __init__(self, header=None):
-        self.primary_header = header
-        self.args = (self.primary_header,)
 
 
 class NonConsecutiveSeqId(Error):
@@ -42,12 +40,13 @@ class NonConsecutiveSeqId(Error):
     """
 
 
-class SimpleStream(object):
-    """
-    Generator that yields PrimaryHeaders and data. Files are read using mmap.
-    """
-    def __init__(self, fobj):
+class BasicStream(object):
+    Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data'])
+
+    def __init__(self, fobj, header_lookup=None, with_data=True):
         self.fobj = fobj
+        self.header_lookup = header_lookup
+        self.with_data = with_data
 
     def __iter__(self):
         return self
@@ -55,91 +54,79 @@ class SimpleStream(object):
     def __next__(self):
         return self.next()
 
-    def next(self):
-        psize = c.sizeof(PrimaryHeader)
-        buf = self.fobj.read(psize)
-        if not buf:
+    def _read(self, size):
+        buf = self.fobj.read(size)
+        if not buf:  # EOF
             raise StopIteration()
-        if len(buf) < psize:
-            raise PacketTooShort(header=None)
+        if len(buf) != size:
+            raise PacketTooShort(
+                'expected to read {:d} bytes, got {:d}'
+                .format(size, len(buf)))
+        return buf
+
+    def read_primary_header(self):
+        buf = self._read(headers.PRIMARY_HEADER_SIZE)
         h1 = PrimaryHeader.from_buffer_copy(buf)
-        # read user data
-        size = h1.data_length_minus1 + 1
-        buf = self.fobj.read(size)
-        if len(buf) < size:
-            raise PacketTooShort(header=h1)
-        return h1, buf
-
+        return h1, headers.PRIMARY_HEADER_SIZE
 
-class FullStream(SimpleStream):
-    """
-    Iterable returning primary and secondary header and any left over bytes
-    of "user" data.
-    """
-    def __init__(self, header_lookup, fobj):
-        """
-        :param header_lookup: function that takes a primary header and returns
-            a secondary header struct to use, or None.
-        """
-        self.header_lookup = header_lookup
-        super(FullStream, self).__init__(fobj)
+    def read_secondary_header(self, ph):
+        H2Impl = self.header_lookup(ph) if self.header_lookup else None
+        if H2Impl is None:
+            return bytes(), 0
+        h2size = c.sizeof(H2Impl)
+        buf = self._read(h2size)
+        return H2Impl.from_buffer_copy(buf), h2size
 
     def next(self):
-        h1, buf = super(FullStream, self).next()
-        h2 = ''
-        if h1.secondary_header_flag:
-            H2Impl = self.header_lookup(h1)
-            if H2Impl:
-                hsize = c.sizeof(H2Impl)
-                h2 = H2Impl.from_buffer_copy(buf)
-                return h1, h2, buf[hsize:]
-        return h1, h2, buf
-
-
-def jpss_full_stream(fobj):
-    """
-    `FullStream` impl for JPSS secondary headers.
-    """
-    return FullStream(headers.jpss_header_lookup, fobj)
-
-
-def aqua_full_stream(fobj):
-    """
-    `FullStream` impl for Aqua secondary headers.
-    """
-    return FullStream(headers.aqua_header_lookup, fobj)
+        offset = self.fobj.tell()
+        h1, h1size = self.read_primary_header()
+        h2, h2size = self.read_secondary_header(h1)
+        # data length includes h2size
+        total_size = h1size + h1.data_length_minus1 + 1
+        data_size = h1.data_length_minus1 + 1 - h2size
+        if self.with_data:
+            data = self._read(data_size)
+        else:
+            data = None
+            self.fobj.seek(data_size, os.SEEK_CUR)
+        return self.Tracker(h1, h2, total_size, offset, data)
 
 
 class Packet(object):
-    def __init__(self, h1, h2, data):
-        self.data = data
+    def __init__(self, h1, h2, data, data_size=None, offset=None):
         self.primary_header = h1
         self.secondary_header = h2
+        self.data = data
+        self.size = data_size
+        self.offset = offset
+        self.apid = h1.apid
+        self.seqid = self.primary_header.source_sequence_count
 
     def __str__(self):
-        return '<Packet apid=%d seqid=%d stamp=%s>' % \
-               (self.apid, self.seqid, self.stamp)
+        return '<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>' % \
+               (self.apid, self.seqid, self.stamp, self.size, self.offset)
     __repr__ = __str__
 
     @property
-    def apid(self):
-        return self.primary_header.apid
-
-    @property
-    def seqid(self):
-        return self.primary_header.source_sequence_count
+    def stamp(self):
+        return (
+            self.secondary_header and
+            hasattr(self.secondary_header, 'timecode') and
+            self.secondary_header.timecode.asdatetime() or None)
 
     @property
-    def stamp(self):
-        h2 = self.secondary_header
-        return h2.timecode.asdatetime() if hasattr(h2, 'timecode') else None
+    def timestamp(self):
+        return (
+            self.secondary_header and
+            hasattr(self.secondary_header, 'timecode') and
+            self.secondary_header.timecode.astimestamp() or None)
 
     def bytes(self):
         return buffer(self.primary_header) + \
             buffer(self.secondary_header) + self.data
 
     def is_group(self):
-        return False
+        return self.is_first() or self.is_continuing() or self.is_last()
 
     def is_first(self):
         return self.primary_header.sequence_grouping == GROUP_FIRST
@@ -155,24 +142,17 @@ class Packet(object):
 
 
 class PacketStream(object):
-    """
-    Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
-    necessary to generate a single packet is read into memory at a time.
-    """
     SEQID_NOTSET = -1
 
-    def __init__(self, fobj, fail_on_missing=False):
-        """
-        :param fobj: File-like object to read stream from
-        """
-        self._stream = jpss_full_stream(fobj)
+    def __init__(self, data_stream, fail_on_missing=False):
+        self._stream = data_stream
         self._seek_cache = deque()
         self._first = None
         self._last = None
         self._apid_info = defaultdict(
-                lambda: {'count': 0,
-                         'last_seqid': self.SEQID_NOTSET,
-                         'num_missing': 0})
+            lambda: {'count': 0,
+                     'last_seqid': self.SEQID_NOTSET,
+                     'num_missing': 0})
         self._fail_on_missing = fail_on_missing
 
     def __iter__(self):
@@ -189,8 +169,8 @@ class PacketStream(object):
         # return any items on the seek cache before reading more
         if len(self._seek_cache):
             return self._seek_cache.popleft()
-
-        packet = Packet(*self._stream.next())
+        h1, h2, data_size, offset, data = self._stream.next()
+        packet = Packet(h1, h2, data, data_size=data_size, offset=offset)
         have_missing = self._update_info(packet)
         if have_missing and self._fail_on_missing:
             self.push_back(packet)
@@ -227,9 +207,6 @@ class PacketStream(object):
         """
         Seek forward such that the next packet provided will be the first packet
         having a timestamp available in the secondary header >= stamp and apid.
-
-        :raises: StopIteration if the end of the stream is reached before the
-                 criteria can be met.
         """
         # seek past partial groups
         packet = self.next()
@@ -251,36 +228,12 @@ class PacketStream(object):
                 packet = self.next()
             self.push_back(packet)
 
-    def seek_to_next_stamp(self):
-        """
-        Seek to the next packet with a timestamp available in the secondary
-        header. For standalone packets this is essentially the same as `next`.
-        For packet groups it is essentially a "next group" function.
-        """
-        packet = self.next() # get past current packet
-        packet = self.next()
-        while not packet.stamp:
-            packet = self.next()
-        self.push_back(packet)
 
+def jpss_packet_stream(fobj, **kwargs):
+    stream = BasicStream(fobj, headers.jpss_header_lookup)
+    return PacketStream(stream, **kwargs)
 
-def _cmp_by_first_pkt(filea, fileb):
-    """
-    Compare 2 L0 files by their first available timestamp. Used to sort L0 files
-    by time.
-    """
-    stampa = stampb = None
-    for pkt in PacketStream(io.open(filea, 'rb')):
-        if pkt.stamp:
-            stampa = pkt.stamp
-            break
-    for pkt in PacketStream(io.open(fileb, 'rb')):
-        if pkt.stamp:
-            stampb = pkt.stamp
-            break
-    return cmp(stampa, stampb)
-
-
-def make_streams(filepaths, **kwargs):
-    filepaths.sort(_cmp_by_first_pkt)
-    return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths]
+
+def aqua_packet_stream(fobj, **kwargs):
+    stream = BasicStream(fobj, headers.aqua_header_lookup)
+    return PacketStream(stream, **kwargs)
diff --git a/edosl0util/trunc.py b/edosl0util/trunc.py
index 7d944b2..fdf7080 100644
--- a/edosl0util/trunc.py
+++ b/edosl0util/trunc.py
@@ -2,12 +2,11 @@
 __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
 
 import io
-from edosl0util.stream import PacketStream
+from edosl0util.stream import jpss_packet_stream
 
 
 def trunc_stream(stream, start, end):
     stream.seek_to(start)
-    stream.seek_to_next_stamp()
 
     pkt = stream.next()
     while pkt.stamp <= end:
@@ -21,5 +20,5 @@ def trunc_stream(stream, start, end):
 
 
 def trunc_file(filename, start, end):
-    stream = PacketStream(io.open(filename, 'rb'))
+    stream = jpss_packet_stream(io.open(filename, 'rb'))
     return trunc_stream(stream, start, end)
-- 
GitLab