diff --git a/edosl0util/_old_merge.py b/edosl0util/_old_merge.py new file mode 100644 index 0000000000000000000000000000000000000000..87b01eb95660bf81c0d9a63badd18ab49dcdb91b --- /dev/null +++ b/edosl0util/_old_merge.py @@ -0,0 +1,183 @@ +# encoding: utf-8 +__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." + +import io +import sys +import logging +from collections import deque + +from edosl0util.stream import ( + jpss_packet_stream, + NonConsecutiveSeqId, + PacketTooShort +) + +LOG = logging.getLogger(__name__) + + +class InvalidPacketGroup(Exception): + """ + If a there exists packets in a group with different apids or if the group + does not end in a last packet. + """ + + +def _group_packets(stream): + """ + Returns a generator that yields all packets until a timestamps, i.e., a + packet group. + """ + packet = stream.next() + while not packet.stamp: + yield packet + packet = stream.next() + # put the next packet with a stamp so it will be next + stream.push_back(packet) + + +class StreamManager(object): + + def __init__(self, streams): + self.streams = list(streams) + self.current_idx = 0 + self.valid = [True]*len(self.streams) + self.tried = [False]*len(self.streams) + self.handling_error = False + + def invalid(self, stream): + idx = self.streams.index(stream) + self.valid[idx] = False + self.current_idx = 0 + + end_of_stream = invalid + + def have_valid(self): + return any(self.valid) + + def set_error(self, stream): + idx = self.streams.index(stream) + self.tried[idx] = True + + missing_seqid = set_error + invalid_group = set_error + + def next(self): + if self.handling_error: + if all(self.tried): + # tried every stream, can't recover, move on + self.tried = [False]*len(self.stream) + self.handling_error = False + else: + # there is a stream we haven't tried, use it + self.current_idx = self.tried.index(False) + self.tried[self.current_idx] = True + return self.streams[self.current_idx] + + self.current_idx = self.valid.index(True, self.current_idx) + return self.streams[self.current_idx] + + +def next_stamp_and_apid(stream, stamp, apid): + LOG.debug("seeking to %s apid %s", stamp, apid) + stream.seek_to(stamp, apid) + stream.seek_to_next_stamp() + + +def next_stamp(stream, stamp): + LOG.debug("seeking to %s", stamp) + stream.seek_to(stamp) + + +def merge(streams, output=sys.stdout): + last_packet = None + + streams = StreamManager(streams) + stream = streams.next() + while streams.have_valid(): + LOG.debug("stream %s", stream) + LOG.debug("streams valid: %s", streams.valid) + LOG.debug("streams handling_error:%s tried:%s", streams.handling_error, streams.tried) + LOG.debug("streams current_idx: %s", streams.current_idx) + try: + # Do until `next` causes StopIteration + while True: + packets_to_write = deque() + packet = stream.next() + if packet.is_standalone(): + packets_to_write.append(packet) + + elif packet.is_first(): # packet group + group = deque([packet]) + group.extend(_group_packets(stream)) + if not group[0].is_first() \ + and group[-1].is_last() \ + and group[0].apid == group[-1].apid: + raise InvalidPacketGroup() + packets_to_write.extend(group) + + else: + LOG.debug("skipping hanging packet: %s", packet) + + # First packet always has a stamp because it's either + # standalone or part of a valid group + last_packet = packets_to_write[0] + while packets_to_write: + pkt = packets_to_write.popleft() + output.write(pkt.bytes()) + + except NonConsecutiveSeqId: + LOG.debug('missing sequence id, next stream') + streams.missing_seqid(stream) + + LOG.debug("seeking to %s apid %s", last_packet.stamp, last_packet.apid) + stream = streams.next() + + except InvalidPacketGroup: + LOG.debug("invalid group, next stream") + streams.invalid_group(stream) + stream = streams.next() + next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid) + + except PacketTooShort as err: + LOG.error("corrupt stream, removing: %s", err) + streams.invalid(stream) + stream = streams.next() + next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid) + + except StopIteration: + LOG.debug("end-of-stream %s", stream) + streams.end_of_stream(stream) + if streams.have_valid(): + stream = streams.next() + next_stamp(stream, last_packet.stamp) + + +def merge_files(filepaths, destpath): + filepaths, streams = make_streams(filepaths, fail_on_missing=True) + for i in range(0, len(filepaths)): + LOG.debug("stream %s filepath %s", streams[i], filepaths[i]) + merge(streams, output=io.open(destpath, 'wb')) + + +def _cmp_by_first_pkt(filea, fileb): + """ + Compare 2 L0 files by their first available timestamp. Used to sort L0 files + by time. + """ + stampa = stampb = None + for pkt in jpss_packet_stream(io.open(filea, 'rb')): + if pkt.stamp: + stampa = pkt.stamp + break + for pkt in jpss_packet_stream(io.open(fileb, 'rb')): + if pkt.stamp: + stampb = pkt.stamp + break + return cmp(stampa, stampb) + + +def make_streams(*filepaths): + return [ + jpss_packet_stream(io.open(f, 'rb')) + for f in sorted(filepaths, cmp=_cmp_by_first_pkt) + ] diff --git a/edosl0util/cli.py b/edosl0util/cli.py index 51059c4c270a52698dce30964fc5899fbea61474..c07a7792112bd56cb85aee246f5f978689ff2536 100644 --- a/edosl0util/cli.py +++ b/edosl0util/cli.py @@ -58,17 +58,19 @@ def cmd_split(): def cmd_info(): parser = _default_parser() + parser.add_argument('-a', '--aqua', action='store_true') parser.add_argument('filepath') args = parser.parse_args() _configure_logging(args) num_packets = 0 - packets = stream.PacketStream(io.open(args.filepath, 'rb')) - first = datetime(3000, 1, 1) - last = datetime(1970, 1, 1) + if not args.aqua: + packets = stream.jpss_packet_stream(io.open(args.filepath, 'rb')) + else: + packets = stream.aqua_packet_stream(io.open(args.filepath, 'rb')) while True: try: - packet = packets.next() + packets.next() num_packets += 1 except stream.PacketTooShort as err: LOG.warn("corrupt packet stream after %d packets: %s", @@ -76,9 +78,6 @@ def cmd_info(): break except StopIteration: break - if packet.stamp: - first = min(packet.stamp, first) - last = max(packet.stamp, last) total = 0 first, last, info = packets.info() LOG.info("First: %s", first) @@ -92,18 +91,43 @@ def cmd_info(): def cmd_merge(): parser = _default_parser() parser.add_argument('-o', '--output', default='out.pds') + def interval(v): + dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S') + return [dt(x) for x in v.split(',')] + parser.add_argument( + '-t', '--trunc-to', type=interval, + help=('Truncate to the interval given as coma separated timestamps of ' + 'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the ' + 'end time is exclusive.')) parser.add_argument('pds', nargs='+') args = parser.parse_args() + _configure_logging(args) - merge.merge_files(args.pds, args.output) + index = set() # will remove duplicates + for filepath in args.pds: + LOG.debug('indexing %s', filepath) + fobj = io.open(filepath, 'rb') + index.update(merge.read_packet_index(fobj)) + + LOG.info('sorting index with %d pointers', len(index)) + index = sorted(index) + LOG.info('writing index to %s', args.output) + with io.open(args.output, 'wb') as fobj: + for ptr in index: + if args.trunc_to: + interval = args.trunc_to + if ptr.stamp >= interval[0] and ptr.stamp < interval[1]: + fobj.write(ptr.bytes()) + else: + fobj.write(ptr.bytes()) def cmd_rdr2l0(): parser = _default_parser() parser.add_argument('-o', '--output') parser.add_argument('-f', '--skipfill', action='store_true') - parser.add_argument('sensor', choices=('viirs', 'atms', 'cris')) + parser.add_argument('sensor', choices=('atms', 'cris')) parser.add_argument('rdr') args = parser.parse_args() _configure_logging(args) diff --git a/edosl0util/headers.py b/edosl0util/headers.py index d4067ad266003995cf4398014ad22f2558eb3258..bb017f7fd12a7c66908805554852393e975e4551 100644 --- a/edosl0util/headers.py +++ b/edosl0util/headers.py @@ -53,6 +53,9 @@ class PrimaryHeader(BaseStruct): ] +PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader) + + class Timecode(BaseStruct): """ Secondary header timecode baseclass. @@ -169,12 +172,14 @@ class JpssFirstSecondaryHeader(BaseStruct): ] +_jpss_headers = { + GROUP_FIRST: JpssFirstSecondaryHeader, + GROUP_CONTINUING: None, + GROUP_LAST: None, + GROUP_STANDALONE: JpssSecondaryHeader, +} def jpss_header_lookup(primary_header): - grouping = primary_header.sequence_grouping - if grouping == GROUP_FIRST: - return JpssFirstSecondaryHeader - elif grouping == GROUP_STANDALONE: - return JpssSecondaryHeader + return _jpss_headers.get(primary_header.sequence_grouping) def amsu_headers(): diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index 70b9607d7edceb769f518e37e8e14bc7a203cd5a..d8ef6a862c126e2d70f363d0754739d9c3ab2315 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -182,17 +182,6 @@ def sort_packets_by_apid(packets, order=None): return sorted(packets, key=lambda p: p.packet.apid) -def sort_packets_edos(rdr, packets): - """ - Sort packets by timestamp and order the apids the same as EDOS NASA L0 - files. - """ - order = None - if rdr.header.sensor.lower() == 'viirs': - order = [826, 821] + range(0, 826) - return sort_packets_by_obs_time(sort_packets_by_apid(packets, order=order)) - - def convert_to_nasa_l0(sensor, filename, skipfill=False): """ Convert a JSPP RDR to a NASA L0 PDS file. @@ -201,8 +190,7 @@ def convert_to_nasa_l0(sensor, filename, skipfill=False): LOG.debug(rdr.header) for apid in rdr.apids: LOG.debug(apid) - packets = sort_packets_edos(rdr, rdr.packets()) - for packet in packets: + for packet in rdr.packets(): if packet.tracker.fill_percent != 0: LOG.debug('fill: %s', packet.tracker) if skipfill: diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 2478fe874d4326789130a395f71000474feab493..dcc67cf58e30fc3889a38a0497e55346b3ad381b 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -1,95 +1,129 @@ -# encoding: utf-8 -__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." +""" +1. Cat PDS files together +2. Index packets + - drop any leading hanging packets + - +3. Sort index +4. Write + +""" import io -import sys +import os import logging +from datetime import datetime from collections import deque -from edosl0util.stream import ( - make_streams, - NonConsecutiveSeqId, - PacketTooShort -) +from edosl0util import headers +from edosl0util.stream import PacketStream, BasicStream LOG = logging.getLogger(__name__) -class InvalidPacketGroup(Exception): - """ - If a there exists packets in a group with different apids or if the group - does not end in a last packet. - """ - - -def _group_packets(stream): +class _Ptr(object): """ - Returns a generator that yields all packets until a timestamps, i.e., a - packet group. + Represents one or more packets that share the same time stamp and apid. """ - packet = stream.next() - while not packet.stamp: - yield packet + def __init__(self, fobj, stamp, apid, offset, size): + self.fobj = fobj + self.stamp = stamp + self.apid = apid + self.offset = offset + self.size = size + self.count = 1 + + def __repr__(self): + attrs = ' '.join( + '{}={}'.format(k, v) + for k, v in sorted(vars(self).items()) + if not k.startswith('_')) + + return '<{:s} {:s}>'.format(self.__class__.__name__, attrs) + + def __cmp__(self, that): + return cmp( + (self.stamp, self.apid), + (that.stamp, that.apid) + ) + + # instances with same stamp/apid will compare the same + def __hash__(self): + return hash((self.stamp, self.apid)) + + def bytes(self): + self.fobj.seek(self.offset, os.SEEK_SET) + return self.fobj.read(self.size) + + +def read_packet_index(fobj): + lookup = headers.jpss_header_lookup + stream = PacketStream(BasicStream(fobj, lookup, with_data=False)) + index = deque() + + try: + # drop any leading hanging packets + count = 0 packet = stream.next() - # put the next packet with a stamp so it will be next - stream.push_back(packet) - - -def merge(streams, output=sys.stdout): - last_packet = None - # streams are removed as they are exhausted - streams = list(streams) - while streams: - stream = streams.pop(0) - try: - if last_packet is not None: - LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid) - stream.seek_to(last_packet.stamp, last_packet.apid) - stream.seek_to_next_stamp() - - # Do until `next` causes StopIteration - while True: - packets_to_write = deque() + while not packet.stamp: + packet = stream.next() + count += 1 + if count: + LOG.info('dropped %d leading packets', count) + + while True: + ptr = _Ptr( + fobj, + stamp=packet.stamp, + apid=packet.apid, + offset=packet.offset, + size=packet.size, + ) + index.append(ptr) + # collect all packets for this stamp/group + packet = stream.next() + while not packet.stamp: + ptr.size += packet.size + ptr.count += 1 packet = stream.next() - if packet.is_standalone(): - packets_to_write.append(packet) - - elif packet.is_first(): # packet group - group = deque([packet]) - group.extend(_group_packets(stream)) - if not group[0].is_first() \ - and group[-1].is_last() \ - and group[0].apid == group[-1].apid: - raise InvalidPacketGroup() - packets_to_write.extend(group) - - else: - LOG.debug("skipping hanging packet: %s", packet) - - # First packet always has a stamp because it's either - # standalone or part of a valid group - last_packet = packets_to_write[0] - while packets_to_write: - pkt = packets_to_write.popleft() - output.write(pkt.bytes()) - - except NonConsecutiveSeqId: - streams.append(stream) # stream still usable - LOG.debug('missing sequence id, next stream') - - except InvalidPacketGroup: - streams.append(stream) # stream still usable - LOG.debug("invalid group, switching streams:%s", group) - - except PacketTooShort as err: - LOG.error("corrupt stream, removing: %s", err) - - except StopIteration: - LOG.debug("end-of-stream %s", stream) - - -def merge_files(filepaths, destpath): - filepaths, streams = make_streams(filepaths, fail_on_missing=True) - for i in range(0, len(filepaths)): - LOG.debug("stream %s filepath %s", streams[i], filepaths[i]) - merge(streams, output=io.open(destpath, 'wb')) + + except StopIteration: + pass + + return index + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('-o', '--output', default='out.pds') + dtarg = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S') + def interval(v): + dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S') + return [dt(x) for x in v.split(',')] + parser.add_argument( + '-t', '--trunc-to', type=interval, + help=('Truncate to the interval given as coma separated timestamps of ' + 'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the ' + 'end time is exclusive.')) + parser.add_argument('pds', nargs='+') + args = parser.parse_args() + + logging.basicConfig(level=logging.DEBUG, format='%(message)s') + + index = set() # will remove duplicates + for filepath in args.pds: + LOG.debug('indexing %s', filepath) + fobj = io.open(filepath, 'rb') + index.update(read_packet_index(fobj)) + + LOG.info('sorting index with %d pointers', len(index)) + index = sorted(index) + LOG.info('writing index to %s', args.output) + with io.open(args.output, 'wb') as fobj: + for ptr in index: + if args.trunc_to: + interval = args.trunc_to + if ptr.stamp >= interval[0] and ptr.stamp < interval[1]: + fobj.write(ptr.bytes()) + else: + fobj.write(ptr.bytes()) diff --git a/edosl0util/split.py b/edosl0util/split.py index 7b6a0bde6c8f8e3284316fdd0089b7833e2be297..aff0442b52b79cc60d48d6b3365bf5ddf971a353 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -6,10 +6,10 @@ import io from datetime import datetime from edosl0util.timecode import unixtime -from edosl0util.stream import PacketStream +from edosl0util.stream import jpss_packet_stream -def split_stream(fobj, minutes): +def split_stream(stream, minutes): """Split a VIIRS L0 PDS data stream into data blobs based on their scan time mod the number of minutes provided. @@ -20,8 +20,7 @@ def split_stream(fobj, minutes): cur_bucket = 0 # cur time bucket of size 'minutes' pkt_count = 0 - original_offset = fobj.tell() - for pkt in PacketStream(fobj): + for pkt in stream: # do the bucketing based on secondary header timestamps if pkt.stamp: hdrtime = unixtime(pkt.stamp) @@ -31,8 +30,7 @@ def split_stream(fobj, minutes): cur_bucket = pkt_bucket if pkt_bucket > cur_bucket: - offset = fobj.tell() - original_offset - yield cur_bucket, offset, pkt_count, buf + yield cur_bucket, pkt_count, buf pkt_count = 0 buf = bytearray() cur_bucket = pkt_bucket @@ -41,8 +39,7 @@ def split_stream(fobj, minutes): buf += pkt.bytes() pkt_count += 1 - offset = fobj.tell() - original_offset - yield cur_bucket, offset, pkt_count, buf + yield cur_bucket, pkt_count, buf def _replace_pdsname_stamp(filename, stamp): @@ -75,8 +72,7 @@ def split_file(filepath, minutes, destdir): """ Split a level0 PDS file into X minutes files by filename. - :param filepath: Path to a Level0 PDS file. It is assumed the file as a - standard level 0 PDS filename. + :param filepath: Path to a Level0 PDS file, with a standard L0 PDS filename. :param minutes: Number of minutes per bucket. Buckets always start at the top of the hour. For example, a bucket size of 6 will create 10 6-min buckets starting at minutes 0, 6, 12, etc ... @@ -87,8 +83,8 @@ def split_file(filepath, minutes, destdir): :raises RuntimeError: If a file exists with the same name of a bucket file. """ destdir = destdir or '.' - stream = split_stream(io.open(filepath, 'rb'), minutes) - for timestamp, offset, pkts, blob in stream: + stream = split_stream(jpss_packet_stream(io.open(filepath, 'rb')), minutes) + for timestamp, pkts, blob in stream: stamp = datetime.utcfromtimestamp(timestamp) dirname, filename = os.path.split(filepath) newname = _filename_for_splitfile(filename, stamp, minutes) diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 2c7c075ed6b6ea949e0ff0b9223a08991fc54312..e23225a130dd2efe8ef75a3b99de6dbf4783c990 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,7 +1,8 @@ import io +import os import logging import ctypes as c -from collections import deque, defaultdict +from collections import deque, defaultdict, namedtuple from edosl0util.headers import ( PrimaryHeader, @@ -31,9 +32,6 @@ class PacketTooShort(Error): After this error occurs the stream is no longer usable because data offsets are no longer reliable. """ - def __init__(self, header=None): - self.primary_header = header - self.args = (self.primary_header,) class NonConsecutiveSeqId(Error): @@ -42,12 +40,13 @@ class NonConsecutiveSeqId(Error): """ -class SimpleStream(object): - """ - Generator that yields PrimaryHeaders and data. Files are read using mmap. - """ - def __init__(self, fobj): +class BasicStream(object): + Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data']) + + def __init__(self, fobj, header_lookup=None, with_data=True): self.fobj = fobj + self.header_lookup = header_lookup + self.with_data = with_data def __iter__(self): return self @@ -55,91 +54,79 @@ class SimpleStream(object): def __next__(self): return self.next() - def next(self): - psize = c.sizeof(PrimaryHeader) - buf = self.fobj.read(psize) - if not buf: + def _read(self, size): + buf = self.fobj.read(size) + if not buf: # EOF raise StopIteration() - if len(buf) < psize: - raise PacketTooShort(header=None) + if len(buf) != size: + raise PacketTooShort( + 'expected to read {:d} bytes, got {:d}' + .format(size, len(buf))) + return buf + + def read_primary_header(self): + buf = self._read(headers.PRIMARY_HEADER_SIZE) h1 = PrimaryHeader.from_buffer_copy(buf) - # read user data - size = h1.data_length_minus1 + 1 - buf = self.fobj.read(size) - if len(buf) < size: - raise PacketTooShort(header=h1) - return h1, buf - + return h1, headers.PRIMARY_HEADER_SIZE -class FullStream(SimpleStream): - """ - Iterable returning primary and secondary header and any left over bytes - of "user" data. - """ - def __init__(self, header_lookup, fobj): - """ - :param header_lookup: function that takes a primary header and returns - a secondary header struct to use, or None. - """ - self.header_lookup = header_lookup - super(FullStream, self).__init__(fobj) + def read_secondary_header(self, ph): + H2Impl = self.header_lookup(ph) if self.header_lookup else None + if H2Impl is None: + return bytes(), 0 + h2size = c.sizeof(H2Impl) + buf = self._read(h2size) + return H2Impl.from_buffer_copy(buf), h2size def next(self): - h1, buf = super(FullStream, self).next() - h2 = '' - if h1.secondary_header_flag: - H2Impl = self.header_lookup(h1) - if H2Impl: - hsize = c.sizeof(H2Impl) - h2 = H2Impl.from_buffer_copy(buf) - return h1, h2, buf[hsize:] - return h1, h2, buf - - -def jpss_full_stream(fobj): - """ - `FullStream` impl for JPSS secondary headers. - """ - return FullStream(headers.jpss_header_lookup, fobj) - - -def aqua_full_stream(fobj): - """ - `FullStream` impl for Aqua secondary headers. - """ - return FullStream(headers.aqua_header_lookup, fobj) + offset = self.fobj.tell() + h1, h1size = self.read_primary_header() + h2, h2size = self.read_secondary_header(h1) + # data length includes h2size + total_size = h1size + h1.data_length_minus1 + 1 + data_size = h1.data_length_minus1 + 1 - h2size + if self.with_data: + data = self._read(data_size) + else: + data = None + self.fobj.seek(data_size, os.SEEK_CUR) + return self.Tracker(h1, h2, total_size, offset, data) class Packet(object): - def __init__(self, h1, h2, data): - self.data = data + def __init__(self, h1, h2, data, data_size=None, offset=None): self.primary_header = h1 self.secondary_header = h2 + self.data = data + self.size = data_size + self.offset = offset + self.apid = h1.apid + self.seqid = self.primary_header.source_sequence_count def __str__(self): - return '<Packet apid=%d seqid=%d stamp=%s>' % \ - (self.apid, self.seqid, self.stamp) + return '<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>' % \ + (self.apid, self.seqid, self.stamp, self.size, self.offset) __repr__ = __str__ @property - def apid(self): - return self.primary_header.apid - - @property - def seqid(self): - return self.primary_header.source_sequence_count + def stamp(self): + return ( + self.secondary_header and + hasattr(self.secondary_header, 'timecode') and + self.secondary_header.timecode.asdatetime() or None) @property - def stamp(self): - h2 = self.secondary_header - return h2.timecode.asdatetime() if hasattr(h2, 'timecode') else None + def timestamp(self): + return ( + self.secondary_header and + hasattr(self.secondary_header, 'timecode') and + self.secondary_header.timecode.astimestamp() or None) def bytes(self): return buffer(self.primary_header) + \ buffer(self.secondary_header) + self.data def is_group(self): - return False + return self.is_first() or self.is_continuing() or self.is_last() def is_first(self): return self.primary_header.sequence_grouping == GROUP_FIRST @@ -155,24 +142,17 @@ class Packet(object): class PacketStream(object): - """ - Iterates over all CCSDS data producing `Packet` tuples. Only as much data is - necessary to generate a single packet is read into memory at a time. - """ SEQID_NOTSET = -1 - def __init__(self, fobj, fail_on_missing=False): - """ - :param fobj: File-like object to read stream from - """ - self._stream = jpss_full_stream(fobj) + def __init__(self, data_stream, fail_on_missing=False): + self._stream = data_stream self._seek_cache = deque() self._first = None self._last = None self._apid_info = defaultdict( - lambda: {'count': 0, - 'last_seqid': self.SEQID_NOTSET, - 'num_missing': 0}) + lambda: {'count': 0, + 'last_seqid': self.SEQID_NOTSET, + 'num_missing': 0}) self._fail_on_missing = fail_on_missing def __iter__(self): @@ -189,8 +169,8 @@ class PacketStream(object): # return any items on the seek cache before reading more if len(self._seek_cache): return self._seek_cache.popleft() - - packet = Packet(*self._stream.next()) + h1, h2, data_size, offset, data = self._stream.next() + packet = Packet(h1, h2, data, data_size=data_size, offset=offset) have_missing = self._update_info(packet) if have_missing and self._fail_on_missing: self.push_back(packet) @@ -227,9 +207,6 @@ class PacketStream(object): """ Seek forward such that the next packet provided will be the first packet having a timestamp available in the secondary header >= stamp and apid. - - :raises: StopIteration if the end of the stream is reached before the - criteria can be met. """ # seek past partial groups packet = self.next() @@ -251,36 +228,12 @@ class PacketStream(object): packet = self.next() self.push_back(packet) - def seek_to_next_stamp(self): - """ - Seek to the next packet with a timestamp available in the secondary - header. For standalone packets this is essentially the same as `next`. - For packet groups it is essentially a "next group" function. - """ - packet = self.next() # get past current packet - packet = self.next() - while not packet.stamp: - packet = self.next() - self.push_back(packet) +def jpss_packet_stream(fobj, **kwargs): + stream = BasicStream(fobj, headers.jpss_header_lookup) + return PacketStream(stream, **kwargs) -def _cmp_by_first_pkt(filea, fileb): - """ - Compare 2 L0 files by their first available timestamp. Used to sort L0 files - by time. - """ - stampa = stampb = None - for pkt in PacketStream(io.open(filea, 'rb')): - if pkt.stamp: - stampa = pkt.stamp - break - for pkt in PacketStream(io.open(fileb, 'rb')): - if pkt.stamp: - stampb = pkt.stamp - break - return cmp(stampa, stampb) - - -def make_streams(filepaths, **kwargs): - filepaths.sort(_cmp_by_first_pkt) - return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths] + +def aqua_packet_stream(fobj, **kwargs): + stream = BasicStream(fobj, headers.aqua_header_lookup) + return PacketStream(stream, **kwargs) diff --git a/edosl0util/trunc.py b/edosl0util/trunc.py index 7d944b2e8e143b9463a89f8305a96de2e069a543..fdf7080ca05c274cffa53b51729650ee668d012f 100644 --- a/edosl0util/trunc.py +++ b/edosl0util/trunc.py @@ -2,12 +2,11 @@ __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." import io -from edosl0util.stream import PacketStream +from edosl0util.stream import jpss_packet_stream def trunc_stream(stream, start, end): stream.seek_to(start) - stream.seek_to_next_stamp() pkt = stream.next() while pkt.stamp <= end: @@ -21,5 +20,5 @@ def trunc_stream(stream, start, end): def trunc_file(filename, start, end): - stream = PacketStream(io.open(filename, 'rb')) + stream = jpss_packet_stream(io.open(filename, 'rb')) return trunc_stream(stream, start, end)