diff --git a/README b/README.md similarity index 72% rename from README rename to README.md index 9051216825c2ffb43ea0709b89e34a895e939bd7..b9beb1b1b2a810fa41d379770c4ef3a0749330d6 100644 --- a/README +++ b/README.md @@ -1,15 +1,18 @@ -# EdosL0Util +# EdosL0Util (BETA) Tools for mangling EDOS L0 PDS files. ### Features: - * API for streaming packets - * CLI tools for: - - merging multiple PDS files - - splitting pds into X minute files - - truncating files to specified time range - - inspecting PDS info - * Dumping JPSS H5 RDRs to L0 PDS files +* API for streaming packets +* CLI tools for: + - merging multiple PDS files + - splitting pds into X minute files + - truncating files to specified time range + - inspecting PDS info +* Dumping JPSS H5 RDRs to L0 PDS files for VIIRS, CrIS, and ATMS + +There is also support for creating a packet stream from Aqua L0 PDS files, but +it has not been tested much. Most functionallity assumes EDOS L0 PDS files. ### Packet streaming @@ -81,5 +84,25 @@ L0 PDS files. * rdr2l0 +### Creating L0 PDS Files from RDRs +The ``rdr2l0`` tool will dump out an RDR in an order not suitable for use as a +NASA L0 PDS file. To generate a L0 PDS file actually requires 2 steps: +``` +$> rdr2l0 --diary --skipfill RNSCA-RVIRS_npp_d20151018_t2359214_e0000467_b20595_c20151019015559980487_noaa_ops.h5 +$> edosl0merge -o out.pds RNSCA-RVIRS_npp_d20151018_t2359214_e0000467_b20595_c20151019015559980487_noaa_ops.h5.science.pds +``` +The resulting file ``out.pds`` will have packets sorted by time and apid with +any duplicates removed. + +The typical case would be to dump multiple RDR files to pds then merge the +resulting PDS files into a 6 minute granule like so: +``` +$> rdr2l0 --diary --skipfill RNSCA-RVIRS_npp_d20151018_t000*.h5 +$> edosl0merge -o out.pds --trunc_to "2015-10-18 00:00:00,2015-10-18 00:06:00" *.science.pds +``` +Which will create a proper 6 minute NASA L0 PDS file with any fill packets +filtered out. + + If you have quesions or comments related to this software contac brucef@ssec.wisc.edu. diff --git a/edosl0util/_old_merge.py b/edosl0util/_old_merge.py new file mode 100644 index 0000000000000000000000000000000000000000..87b01eb95660bf81c0d9a63badd18ab49dcdb91b --- /dev/null +++ b/edosl0util/_old_merge.py @@ -0,0 +1,183 @@ +# encoding: utf-8 +__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." + +import io +import sys +import logging +from collections import deque + +from edosl0util.stream import ( + jpss_packet_stream, + NonConsecutiveSeqId, + PacketTooShort +) + +LOG = logging.getLogger(__name__) + + +class InvalidPacketGroup(Exception): + """ + If a there exists packets in a group with different apids or if the group + does not end in a last packet. + """ + + +def _group_packets(stream): + """ + Returns a generator that yields all packets until a timestamps, i.e., a + packet group. + """ + packet = stream.next() + while not packet.stamp: + yield packet + packet = stream.next() + # put the next packet with a stamp so it will be next + stream.push_back(packet) + + +class StreamManager(object): + + def __init__(self, streams): + self.streams = list(streams) + self.current_idx = 0 + self.valid = [True]*len(self.streams) + self.tried = [False]*len(self.streams) + self.handling_error = False + + def invalid(self, stream): + idx = self.streams.index(stream) + self.valid[idx] = False + self.current_idx = 0 + + end_of_stream = invalid + + def have_valid(self): + return any(self.valid) + + def set_error(self, stream): + idx = self.streams.index(stream) + self.tried[idx] = True + + missing_seqid = set_error + invalid_group = set_error + + def next(self): + if self.handling_error: + if all(self.tried): + # tried every stream, can't recover, move on + self.tried = [False]*len(self.stream) + self.handling_error = False + else: + # there is a stream we haven't tried, use it + self.current_idx = self.tried.index(False) + self.tried[self.current_idx] = True + return self.streams[self.current_idx] + + self.current_idx = self.valid.index(True, self.current_idx) + return self.streams[self.current_idx] + + +def next_stamp_and_apid(stream, stamp, apid): + LOG.debug("seeking to %s apid %s", stamp, apid) + stream.seek_to(stamp, apid) + stream.seek_to_next_stamp() + + +def next_stamp(stream, stamp): + LOG.debug("seeking to %s", stamp) + stream.seek_to(stamp) + + +def merge(streams, output=sys.stdout): + last_packet = None + + streams = StreamManager(streams) + stream = streams.next() + while streams.have_valid(): + LOG.debug("stream %s", stream) + LOG.debug("streams valid: %s", streams.valid) + LOG.debug("streams handling_error:%s tried:%s", streams.handling_error, streams.tried) + LOG.debug("streams current_idx: %s", streams.current_idx) + try: + # Do until `next` causes StopIteration + while True: + packets_to_write = deque() + packet = stream.next() + if packet.is_standalone(): + packets_to_write.append(packet) + + elif packet.is_first(): # packet group + group = deque([packet]) + group.extend(_group_packets(stream)) + if not group[0].is_first() \ + and group[-1].is_last() \ + and group[0].apid == group[-1].apid: + raise InvalidPacketGroup() + packets_to_write.extend(group) + + else: + LOG.debug("skipping hanging packet: %s", packet) + + # First packet always has a stamp because it's either + # standalone or part of a valid group + last_packet = packets_to_write[0] + while packets_to_write: + pkt = packets_to_write.popleft() + output.write(pkt.bytes()) + + except NonConsecutiveSeqId: + LOG.debug('missing sequence id, next stream') + streams.missing_seqid(stream) + + LOG.debug("seeking to %s apid %s", last_packet.stamp, last_packet.apid) + stream = streams.next() + + except InvalidPacketGroup: + LOG.debug("invalid group, next stream") + streams.invalid_group(stream) + stream = streams.next() + next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid) + + except PacketTooShort as err: + LOG.error("corrupt stream, removing: %s", err) + streams.invalid(stream) + stream = streams.next() + next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid) + + except StopIteration: + LOG.debug("end-of-stream %s", stream) + streams.end_of_stream(stream) + if streams.have_valid(): + stream = streams.next() + next_stamp(stream, last_packet.stamp) + + +def merge_files(filepaths, destpath): + filepaths, streams = make_streams(filepaths, fail_on_missing=True) + for i in range(0, len(filepaths)): + LOG.debug("stream %s filepath %s", streams[i], filepaths[i]) + merge(streams, output=io.open(destpath, 'wb')) + + +def _cmp_by_first_pkt(filea, fileb): + """ + Compare 2 L0 files by their first available timestamp. Used to sort L0 files + by time. + """ + stampa = stampb = None + for pkt in jpss_packet_stream(io.open(filea, 'rb')): + if pkt.stamp: + stampa = pkt.stamp + break + for pkt in jpss_packet_stream(io.open(fileb, 'rb')): + if pkt.stamp: + stampb = pkt.stamp + break + return cmp(stampa, stampb) + + +def make_streams(*filepaths): + return [ + jpss_packet_stream(io.open(f, 'rb')) + for f in sorted(filepaths, cmp=_cmp_by_first_pkt) + ] diff --git a/edosl0util/cli.py b/edosl0util/cli.py index 51059c4c270a52698dce30964fc5899fbea61474..2532f5015e57c01196fb8da15036b9ba892c446c 100644 --- a/edosl0util/cli.py +++ b/edosl0util/cli.py @@ -58,17 +58,19 @@ def cmd_split(): def cmd_info(): parser = _default_parser() + parser.add_argument('-a', '--aqua', action='store_true') parser.add_argument('filepath') args = parser.parse_args() _configure_logging(args) num_packets = 0 - packets = stream.PacketStream(io.open(args.filepath, 'rb')) - first = datetime(3000, 1, 1) - last = datetime(1970, 1, 1) + if not args.aqua: + packets = stream.jpss_packet_stream(io.open(args.filepath, 'rb')) + else: + packets = stream.aqua_packet_stream(io.open(args.filepath, 'rb')) while True: try: - packet = packets.next() + packets.next() num_packets += 1 except stream.PacketTooShort as err: LOG.warn("corrupt packet stream after %d packets: %s", @@ -76,9 +78,6 @@ def cmd_info(): break except StopIteration: break - if packet.stamp: - first = min(packet.stamp, first) - last = max(packet.stamp, last) total = 0 first, last, info = packets.info() LOG.info("First: %s", first) @@ -92,23 +91,42 @@ def cmd_info(): def cmd_merge(): parser = _default_parser() parser.add_argument('-o', '--output', default='out.pds') + def interval(v): + dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S') + return [dt(x) for x in v.split(',')] + parser.add_argument( + '-t', '--trunc-to', type=interval, + help=('Truncate to the interval given as coma separated timestamps of ' + 'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the ' + 'end time is exclusive.')) parser.add_argument('pds', nargs='+') args = parser.parse_args() + _configure_logging(args) - merge.merge_files(args.pds, args.output) + streams = [stream.jpss_packet_stream(io.open(f, 'rb')) for f in args.pds] + merge.merge( + streams, output=io.open(args.output, 'wb'), trunc_to=args.trunc_to) def cmd_rdr2l0(): + """ + Extract CCSDS packets from a JPSS RDR. They are extracted in the order in + which they are listed in the APID list and in packet tracker order, i.e., + Not in time/apid order. Files will be named using the input h5 name with + .science.pds and .diary[0,8,11].pds appended. + """ parser = _default_parser() - parser.add_argument('-o', '--output') - parser.add_argument('-f', '--skipfill', action='store_true') - parser.add_argument('sensor', choices=('viirs', 'atms', 'cris')) + parser.description = cmd_rdr2l0.__doc__ + parser.add_argument( + '-d', '--diary', action='store_true', + help='Write diary[0,8,11].pds if available') + parser.add_argument( + '-f', '--skipfill', action='store_true', + help=('Skip any packets that are marked in the packet tracker as ' + 'containing fill')) parser.add_argument('rdr') args = parser.parse_args() _configure_logging(args) - output = args.output or args.rdr + '.pds' - with io.open(output, 'wb') as fptr: - for packet in jpssrdr.convert_to_nasa_l0(args.sensor, args.rdr): - fptr.write(packet) + jpssrdr.write_rdr_datasets(args.rdr, args.diary) diff --git a/edosl0util/headers.py b/edosl0util/headers.py index d4067ad266003995cf4398014ad22f2558eb3258..bb017f7fd12a7c66908805554852393e975e4551 100644 --- a/edosl0util/headers.py +++ b/edosl0util/headers.py @@ -53,6 +53,9 @@ class PrimaryHeader(BaseStruct): ] +PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader) + + class Timecode(BaseStruct): """ Secondary header timecode baseclass. @@ -169,12 +172,14 @@ class JpssFirstSecondaryHeader(BaseStruct): ] +_jpss_headers = { + GROUP_FIRST: JpssFirstSecondaryHeader, + GROUP_CONTINUING: None, + GROUP_LAST: None, + GROUP_STANDALONE: JpssSecondaryHeader, +} def jpss_header_lookup(primary_header): - grouping = primary_header.sequence_grouping - if grouping == GROUP_FIRST: - return JpssFirstSecondaryHeader - elif grouping == GROUP_STANDALONE: - return JpssSecondaryHeader + return _jpss_headers.get(primary_header.sequence_grouping) def amsu_headers(): diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index 70b9607d7edceb769f518e37e8e14bc7a203cd5a..a217fae272350d01a365444e5c4998abe5ffcdda 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -9,8 +9,10 @@ Code for reading/writing/manipulating JPSS Common RDR files as documented in: """ __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." +import os import logging import ctypes as c +from datetime import datetime from collections import namedtuple import numpy as np @@ -90,13 +92,17 @@ Packet = namedtuple('Packet', ('tracker', 'packet')) class CommonRdr(namedtuple('CommonRdr', ('buf', 'header', 'apids'))): + """ + A single common rdr as found in a RawApplicationPackets_X dataset. + """ def packets_for_apid(self, apid): """ Return a generator that yields tuples of (tracker, packet) for the given `Apid`. """ - return _packets_for_apid(self.buf, self.header, apid) + for tracker, packet in _packets_for_apid(self.buf, self.header, apid): + yield Packet(tracker, packet) def packets(self): """ @@ -104,8 +110,8 @@ class CommonRdr(namedtuple('CommonRdr', ('buf', 'header', 'apids'))): apid list. """ for apid in self.apids: - for tracker, packet in self.packets_for_apid(apid): - yield Packet(tracker, packet) + for packet in self.packets_for_apid(apid): + yield packet def _packets_for_apid(buf, header, apid): @@ -137,31 +143,54 @@ def _read_apid_list(header, buf): offset += c.sizeof(Apid) -def read_common_rdrs(sensor, filepath): - """ - Generate CommonRdr-s for each dataset(granule) in `filelpath` - """ - for buf in read_rdr_datasets(sensor, filepath): - header = StaticHeader.from_buffer(buf) - apids = _read_apid_list(header, buf) - yield CommonRdr(buf, header, list(apids)) +def _sorted_packet_dataset_names(names): + names = [k for k in names if k.startswith('RawApplicationPackets_')] + cmp_names = lambda *tup: cmp(*(int(x.split('_')[-1]) for x in tup)) # noqa + return sorted(names, cmp=cmp_names) -def read_rdr_datasets(sensor, filepath): - """ - Generate byte arrays of granule RawApplicationPackets in granule number - order. - """ - sensor = sensor.upper() +def _generate_packet_datasets(group): + dsnames = group.keys() + for name in _sorted_packet_dataset_names(dsnames): + ds = group[name] + yield np.array(ds) + + +def _find_science_group(fobj): + for sensor in ['viirs', 'cris', 'atms']: + group = fobj.get('/All_Data/{}-SCIENCE-RDR_All'.format(sensor.upper())) + if group: + return group + + +def _rdrs_for_packet_dataset(group): + if group: + for buf in _generate_packet_datasets(group): + header = StaticHeader.from_buffer(buf) + apids = _read_apid_list(header, buf) + yield CommonRdr(buf, header, list(apids)) + + +def rdr_datasets(filepath): fobj = H5File(filepath) - grp = fobj['/All_Data/{}-SCIENCE-RDR_All'.format(sensor)] + rdr = {} + group = _find_science_group(fobj) + rdr['science'] = _rdrs_for_packet_dataset(group) + group = fobj.get('/All_Data/SPACECRAFT-DIARY-RDR_All') + rdr['diary'] = _rdrs_for_packet_dataset(group) + return rdr - dsnames = [k for k in grp.keys() if k.startswith('RawApplicationPackets_')] - # compare ds names by index after '_' - cmp_names = lambda *tup: cmp(*(int(x.split('_')[-1]) for x in tup)) # noqa - for name in sorted(dsnames, cmp=cmp_names): - ds = grp[name] - yield np.array(ds) + +def rdr_ccsds_packets(rdr, skipfill=False): + """ + Get all CCSDS packets from an `CommonRdr` in packet tracker order. + """ + for packet in rdr.packets(): + if packet.tracker.fill_percent != 0: + LOG.debug('fill: %s', packet.tracker) + if skipfill: + continue + yield packet.packet def sort_packets_by_obs_time(packets): @@ -182,29 +211,25 @@ def sort_packets_by_apid(packets, order=None): return sorted(packets, key=lambda p: p.packet.apid) -def sort_packets_edos(rdr, packets): - """ - Sort packets by timestamp and order the apids the same as EDOS NASA L0 - files. - """ - order = None - if rdr.header.sensor.lower() == 'viirs': - order = [826, 821] + range(0, 826) - return sort_packets_by_obs_time(sort_packets_by_apid(packets, order=order)) - - -def convert_to_nasa_l0(sensor, filename, skipfill=False): - """ - Convert a JSPP RDR to a NASA L0 PDS file. - """ - for idx, rdr in enumerate(read_common_rdrs(sensor, filename)): - LOG.debug(rdr.header) - for apid in rdr.apids: - LOG.debug(apid) - packets = sort_packets_edos(rdr, rdr.packets()) - for packet in packets: - if packet.tracker.fill_percent != 0: - LOG.debug('fill: %s', packet.tracker) - if skipfill: +def write_rdr_datasets(filepath, diary=True, skipfill=False): + def write_packets(pkts, dest): + with open(dest, 'wb') as fptr: + for pkt in pkts: + if pkt.tracker.fill_percent != 0 and skipfill: continue - yield packet.packet + fptr.write(pkt.packet) + rdrname = os.path.basename(filepath) + rdrs = rdr_datasets(filepath) + for idx, rdr in enumerate(rdrs['science']): + fname = '{}.science.pds'.format(rdrname) + hdr = rdr.header + LOG.debug('writing gran %d %s-%s-%s', idx, hdr.satellite, hdr.sensor, hdr.type_id) + write_packets(rdr.packets(), fname) + + if diary: + for idx, rdr in enumerate(rdrs['diary']): + for apid in rdr.apids: + fname = '{}.diary{}.pds'.format(rdrname, apid.value) + hdr = rdr.header + LOG.debug('writing gran %d %s-%s-%s %d', idx, hdr.satellite, hdr.sensor, hdr.type_id, apid.value) + write_packets(rdr.packets_for_apid(apid), fname) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 384fe4c8e332c87b53ac61bc4463987a73eae883..ee68ee7d228b9e2180df4d5971ccb33d7693bc15 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -1,96 +1,124 @@ -# encoding: utf-8 -__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." +""" -import io -import sys +1. Cat PDS files together +2. Index packets + - drop any leading hanging packets + - +3. Sort index +4. Write + +""" +import os import logging from collections import deque -from edosl0util.stream import ( - make_streams, - NonConsecutiveSeqId, - PacketTooShort -) - LOG = logging.getLogger(__name__) -def _group_packets(stream): +class _Ptr(object): """ - Returns a generator that yields all packets between timestamps. + Represents one or more packets that share the same time stamp and apid. """ - packet = stream.next() - while not packet.stamp: - yield packet + def __init__(self, fobj, stamp, apid, offset, size): + self.fobj = fobj + self.stamp = stamp + self.apid = apid + self.offset = offset + self.size = size + self.count = 1 + + def __repr__(self): + attrs = ' '.join( + '{}={}'.format(k, v) + for k, v in sorted(vars(self).items()) + if not k.startswith('_')) + + return '<{:s} {:s}>'.format(self.__class__.__name__, attrs) + + def __cmp__(self, that): + return cmp( + (self.stamp, self.apid), + (that.stamp, that.apid) + ) + + # instances with same stamp/apid will compare the same + def __hash__(self): + return hash((self.stamp, self.apid)) + + def bytes(self): + self.fobj.seek(self.offset, os.SEEK_SET) + return self.fobj.read(self.size) + + +def read_packet_index(stream): + index = deque() + try: + # drop any leading hanging packets + count = 0 packet = stream.next() - # put the next packet with a stamp so it will be next - stream.push_back(packet) - - -def _is_valid_group(packets): - return packets[0].is_first() \ - and packets[-1].is_last() \ - and packets[0].apid == packets[-1].apid - - -def merge(streams, output=sys.stdout): - last_packet = None - - # streams are removed as they are exhausted - while streams: - for stream in streams: - try: - if last_packet is not None: - LOG.debug("seeking to %s, %s", last_packet.stamp, last_packet.apid) - stream.seek_to(last_packet.stamp, last_packet.apid) - stream.seek_to_next_stamp() - - # Do until `next` causes StopIteration - while True: - packets = deque() - packet = stream.next() - - if packet.is_standalone(): - packets.append(packet) - - elif packet.is_first(): # packet group - group = deque([packet]) - group.extend(_group_packets(stream)) - if _is_valid_group(group): - packets.extend(group) - elif group[0].is_first(): - last_packet = group[0] - break - else: - LOG.debug("invalid group, switching streams:%s", group) - break - - else: - LOG.debug("skipping hanging packet: %s", packet) - - # First packet always has a stamp because it's either - # standalone or part of a valid group - last_packet = packets[0] - - while packets: - pkt = packets.popleft() - LOG.debug('writing %s: %s', stream, pkt) - output.write(pkt.bytes()) - - except PacketTooShort as err: - LOG.error("corrupt stream, removing: %s", err) - streams.remove(stream) - - except NonConsecutiveSeqId: - LOG.debug('missing sequence id, next stream') - - except StopIteration: - streams.remove(stream) - LOG.debug("end-of-stream %s", stream) - continue - - -def merge_files(filepaths, destpath): - filepaths, streams = make_streams(filepaths, fail_on_missing=True) - LOG.debug("merge order %s", filepaths) - merge(streams, output=io.open(destpath, 'wb')) + while not packet.stamp: + packet = stream.next() + count += 1 + if count: + LOG.info('dropped %d leading packets', count) + + while True: + ptr = _Ptr( + stream.file, + stamp=packet.stamp, + apid=packet.apid, + offset=packet.offset, + size=packet.size, + ) + index.append(ptr) + # collect all packets for this stamp/group + packet = stream.next() + while not packet.stamp: + ptr.size += packet.size + ptr.count += 1 + packet = stream.next() + + except StopIteration: + pass + + return index + + +def _sort_by_time_apid(index, order=None): + if order: + index = sorted(index, key=lambda p: order.index(p.apid)) + else: + index = sorted(index, key=lambda p: p.apid) + return sorted(index, key=lambda p: p.stamp) + + +def merge(streams, output, trunc_to=None, apid_order=None): + """ + Merge packets from multiple streams to an output file. Duplicate packets + will be filtered and packets will be sorted by time and the apid order + provided. + + :param streams: List of `PacketStream`s such as returned by + `jpss_packet_stream`. + :param output: File-like object to write output packets to. + :keyword trunc_to: (start, end) datetime to truncate packets to. Start is + inclusive and end is exclusive. + :keyword apid_order: List of apid is the order in which they should appear + in the output. If provided ALL apids must be present, otherwise an + IndexError will occur. + """ + index = set() # will remove duplicates + for stream in streams: + LOG.debug('indexing %s', stream) + index.update(read_packet_index(stream)) + + LOG.debug('sorting index with %d pointers', len(index)) + index = _sort_by_time_apid(index, order=apid_order) + + LOG.debug('writing index to %s', output) + for ptr in index: + if trunc_to: + if ptr.stamp >= trunc_to[0] and ptr.stamp < trunc_to[1]: + output.write(ptr.bytes()) + else: + output.write(ptr.bytes()) diff --git a/edosl0util/split.py b/edosl0util/split.py index 7b6a0bde6c8f8e3284316fdd0089b7833e2be297..aff0442b52b79cc60d48d6b3365bf5ddf971a353 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -6,10 +6,10 @@ import io from datetime import datetime from edosl0util.timecode import unixtime -from edosl0util.stream import PacketStream +from edosl0util.stream import jpss_packet_stream -def split_stream(fobj, minutes): +def split_stream(stream, minutes): """Split a VIIRS L0 PDS data stream into data blobs based on their scan time mod the number of minutes provided. @@ -20,8 +20,7 @@ def split_stream(fobj, minutes): cur_bucket = 0 # cur time bucket of size 'minutes' pkt_count = 0 - original_offset = fobj.tell() - for pkt in PacketStream(fobj): + for pkt in stream: # do the bucketing based on secondary header timestamps if pkt.stamp: hdrtime = unixtime(pkt.stamp) @@ -31,8 +30,7 @@ def split_stream(fobj, minutes): cur_bucket = pkt_bucket if pkt_bucket > cur_bucket: - offset = fobj.tell() - original_offset - yield cur_bucket, offset, pkt_count, buf + yield cur_bucket, pkt_count, buf pkt_count = 0 buf = bytearray() cur_bucket = pkt_bucket @@ -41,8 +39,7 @@ def split_stream(fobj, minutes): buf += pkt.bytes() pkt_count += 1 - offset = fobj.tell() - original_offset - yield cur_bucket, offset, pkt_count, buf + yield cur_bucket, pkt_count, buf def _replace_pdsname_stamp(filename, stamp): @@ -75,8 +72,7 @@ def split_file(filepath, minutes, destdir): """ Split a level0 PDS file into X minutes files by filename. - :param filepath: Path to a Level0 PDS file. It is assumed the file as a - standard level 0 PDS filename. + :param filepath: Path to a Level0 PDS file, with a standard L0 PDS filename. :param minutes: Number of minutes per bucket. Buckets always start at the top of the hour. For example, a bucket size of 6 will create 10 6-min buckets starting at minutes 0, 6, 12, etc ... @@ -87,8 +83,8 @@ def split_file(filepath, minutes, destdir): :raises RuntimeError: If a file exists with the same name of a bucket file. """ destdir = destdir or '.' - stream = split_stream(io.open(filepath, 'rb'), minutes) - for timestamp, offset, pkts, blob in stream: + stream = split_stream(jpss_packet_stream(io.open(filepath, 'rb')), minutes) + for timestamp, pkts, blob in stream: stamp = datetime.utcfromtimestamp(timestamp) dirname, filename = os.path.split(filepath) newname = _filename_for_splitfile(filename, stamp, minutes) diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 2c7c075ed6b6ea949e0ff0b9223a08991fc54312..746d82143c9afc6f3bfd5e0a115f9d6e7c6b5704 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -1,7 +1,8 @@ -import io +import os +import errno import logging import ctypes as c -from collections import deque, defaultdict +from collections import deque, defaultdict, namedtuple from edosl0util.headers import ( PrimaryHeader, @@ -31,9 +32,6 @@ class PacketTooShort(Error): After this error occurs the stream is no longer usable because data offsets are no longer reliable. """ - def __init__(self, header=None): - self.primary_header = header - self.args = (self.primary_header,) class NonConsecutiveSeqId(Error): @@ -42,12 +40,23 @@ class NonConsecutiveSeqId(Error): """ -class SimpleStream(object): +class BasicStream(object): """ - Generator that yields PrimaryHeaders and data. Files are read using mmap. + Basic packet stream iterator that reads the primary and secondary headers and + maintains offsets and read sizes. """ - def __init__(self, fobj): - self.fobj = fobj + Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data']) + + def __init__(self, fobj, header_lookup=None, with_data=True): + self.file = fobj + self.header_lookup = header_lookup + self.with_data = with_data + try: + self._offset = self.file.tell() + except IOError as err: # handle illegal seek for pipes + if err.errno != errno.ESPIPE: + raise + self._offset = 0 def __iter__(self): return self @@ -55,91 +64,80 @@ class SimpleStream(object): def __next__(self): return self.next() - def next(self): - psize = c.sizeof(PrimaryHeader) - buf = self.fobj.read(psize) - if not buf: + def _read(self, size): + buf = self.file.read(size) + if not buf: # EOF raise StopIteration() - if len(buf) < psize: - raise PacketTooShort(header=None) + if len(buf) != size: + raise PacketTooShort( + 'expected to read {:d} bytes, got {:d}' + .format(size, len(buf))) + self._offset += size + return buf + + def read_primary_header(self): + buf = self._read(headers.PRIMARY_HEADER_SIZE) h1 = PrimaryHeader.from_buffer_copy(buf) - # read user data - size = h1.data_length_minus1 + 1 - buf = self.fobj.read(size) - if len(buf) < size: - raise PacketTooShort(header=h1) - return h1, buf + return h1, headers.PRIMARY_HEADER_SIZE - -class FullStream(SimpleStream): - """ - Iterable returning primary and secondary header and any left over bytes - of "user" data. - """ - def __init__(self, header_lookup, fobj): - """ - :param header_lookup: function that takes a primary header and returns - a secondary header struct to use, or None. - """ - self.header_lookup = header_lookup - super(FullStream, self).__init__(fobj) + def read_secondary_header(self, ph): + H2Impl = self.header_lookup(ph) if self.header_lookup else None + if H2Impl is None: + return bytes(), 0 + h2size = c.sizeof(H2Impl) + buf = self._read(h2size) + return H2Impl.from_buffer_copy(buf), h2size def next(self): - h1, buf = super(FullStream, self).next() - h2 = '' - if h1.secondary_header_flag: - H2Impl = self.header_lookup(h1) - if H2Impl: - hsize = c.sizeof(H2Impl) - h2 = H2Impl.from_buffer_copy(buf) - return h1, h2, buf[hsize:] - return h1, h2, buf - - -def jpss_full_stream(fobj): - """ - `FullStream` impl for JPSS secondary headers. - """ - return FullStream(headers.jpss_header_lookup, fobj) - - -def aqua_full_stream(fobj): - """ - `FullStream` impl for Aqua secondary headers. - """ - return FullStream(headers.aqua_header_lookup, fobj) + offset = self._offset + h1, h1size = self.read_primary_header() + h2, h2size = self.read_secondary_header(h1) + # data length includes h2size + total_size = h1size + h1.data_length_minus1 + 1 + data_size = h1.data_length_minus1 + 1 - h2size + if self.with_data: + data = self._read(data_size) + else: + data = None + self.fobj.seek(data_size, os.SEEK_CUR) + return self.Tracker(h1, h2, total_size, offset, data) class Packet(object): - def __init__(self, h1, h2, data): - self.data = data + def __init__(self, h1, h2, data, data_size=None, offset=None): self.primary_header = h1 self.secondary_header = h2 + self.data = data + self.size = data_size + self.offset = offset + self.apid = h1.apid + self.seqid = self.primary_header.source_sequence_count def __str__(self): - return '<Packet apid=%d seqid=%d stamp=%s>' % \ - (self.apid, self.seqid, self.stamp) + return '<Packet apid=%d seqid=%d stamp=%s size=%s offset=%s>' % \ + (self.apid, self.seqid, self.stamp, self.size, self.offset) __repr__ = __str__ @property - def apid(self): - return self.primary_header.apid + def stamp(self): + return ( + self.secondary_header and + hasattr(self.secondary_header, 'timecode') and + self.secondary_header.timecode.asdatetime() or None) @property - def seqid(self): - return self.primary_header.source_sequence_count - - @property - def stamp(self): - h2 = self.secondary_header - return h2.timecode.asdatetime() if hasattr(h2, 'timecode') else None + def timestamp(self): + return ( + self.secondary_header and + hasattr(self.secondary_header, 'timecode') and + self.secondary_header.timecode.astimestamp() or None) def bytes(self): return buffer(self.primary_header) + \ buffer(self.secondary_header) + self.data def is_group(self): - return False + return self.is_first() or self.is_continuing() or self.is_last() def is_first(self): return self.primary_header.sequence_grouping == GROUP_FIRST @@ -155,26 +153,23 @@ class Packet(object): class PacketStream(object): - """ - Iterates over all CCSDS data producing `Packet` tuples. Only as much data is - necessary to generate a single packet is read into memory at a time. - """ SEQID_NOTSET = -1 - def __init__(self, fobj, fail_on_missing=False): - """ - :param fobj: File-like object to read stream from - """ - self._stream = jpss_full_stream(fobj) + def __init__(self, data_stream, fail_on_missing=False): + self._stream = data_stream self._seek_cache = deque() self._first = None self._last = None self._apid_info = defaultdict( - lambda: {'count': 0, - 'last_seqid': self.SEQID_NOTSET, - 'num_missing': 0}) + lambda: {'count': 0, + 'last_seqid': self.SEQID_NOTSET, + 'num_missing': 0}) self._fail_on_missing = fail_on_missing + def __repr__(self): + filepath = getattr(self.file, 'name', None) + return '<{} file={}>'.format(self.__class__.__name__, filepath) + def __iter__(self): return self @@ -182,6 +177,10 @@ class PacketStream(object): def __next__(self): return self.next() + @property + def file(self): + return self._stream.file + def push_back(self, packet): self._seek_cache.append(packet) @@ -189,8 +188,8 @@ class PacketStream(object): # return any items on the seek cache before reading more if len(self._seek_cache): return self._seek_cache.popleft() - - packet = Packet(*self._stream.next()) + h1, h2, data_size, offset, data = self._stream.next() + packet = Packet(h1, h2, data, data_size=data_size, offset=offset) have_missing = self._update_info(packet) if have_missing and self._fail_on_missing: self.push_back(packet) @@ -227,9 +226,6 @@ class PacketStream(object): """ Seek forward such that the next packet provided will be the first packet having a timestamp available in the secondary header >= stamp and apid. - - :raises: StopIteration if the end of the stream is reached before the - criteria can be met. """ # seek past partial groups packet = self.next() @@ -251,36 +247,12 @@ class PacketStream(object): packet = self.next() self.push_back(packet) - def seek_to_next_stamp(self): - """ - Seek to the next packet with a timestamp available in the secondary - header. For standalone packets this is essentially the same as `next`. - For packet groups it is essentially a "next group" function. - """ - packet = self.next() # get past current packet - packet = self.next() - while not packet.stamp: - packet = self.next() - self.push_back(packet) +def jpss_packet_stream(fobj, **kwargs): + stream = BasicStream(fobj, headers.jpss_header_lookup) + return PacketStream(stream, **kwargs) -def _cmp_by_first_pkt(filea, fileb): - """ - Compare 2 L0 files by their first available timestamp. Used to sort L0 files - by time. - """ - stampa = stampb = None - for pkt in PacketStream(io.open(filea, 'rb')): - if pkt.stamp: - stampa = pkt.stamp - break - for pkt in PacketStream(io.open(fileb, 'rb')): - if pkt.stamp: - stampb = pkt.stamp - break - return cmp(stampa, stampb) - - -def make_streams(filepaths, **kwargs): - filepaths.sort(_cmp_by_first_pkt) - return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths] + +def aqua_packet_stream(fobj, **kwargs): + stream = BasicStream(fobj, headers.aqua_header_lookup) + return PacketStream(stream, **kwargs) diff --git a/edosl0util/trunc.py b/edosl0util/trunc.py index 7d944b2e8e143b9463a89f8305a96de2e069a543..fdf7080ca05c274cffa53b51729650ee668d012f 100644 --- a/edosl0util/trunc.py +++ b/edosl0util/trunc.py @@ -2,12 +2,11 @@ __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." import io -from edosl0util.stream import PacketStream +from edosl0util.stream import jpss_packet_stream def trunc_stream(stream, start, end): stream.seek_to(start) - stream.seek_to_next_stamp() pkt = stream.next() while pkt.stamp <= end: @@ -21,5 +20,5 @@ def trunc_stream(stream, start, end): def trunc_file(filename, start, end): - stream = PacketStream(io.open(filename, 'rb')) + stream = jpss_packet_stream(io.open(filename, 'rb')) return trunc_stream(stream, start, end)