diff --git a/edosl0util/cli.py b/edosl0util/cli.py index 021e0c923f581096506b75a559fb26a820bdfbc9..4cfb50c870ea7b1de7e974821199660c3080b63e 100644 --- a/edosl0util/cli.py +++ b/edosl0util/cli.py @@ -1,6 +1,7 @@ """ Console script entry points for CLI tools. """ +import io import os from datetime import datetime from collections import defaultdict @@ -22,9 +23,9 @@ def cmd_trunc(): args = parser.parse_args() output = args.output or os.path.basename(args.filename) + '.trunc' - with open(output, 'wb') as fptr: + with io.open(output, 'wb') as fptr: for pkt in trunc.trunc_file(args.filename, args.start, args.end): - fptr.write(pkt.blob) + fptr.write(pkt.bytes()) def cmd_split(): @@ -44,7 +45,7 @@ def cmd_info(): parser.add_argument('filepath') args = parser.parse_args() - packets = stream.PacketStream(open(args.filepath)) + packets = stream.PacketStream(io.open(args.filepath, 'rb')) first = datetime(2016, 1, 1) last = datetime(2000, 1, 1) diff --git a/edosl0util/dump.py b/edosl0util/dump.py deleted file mode 100644 index 0fecf934183e298aedb44ba2bbc93a7681c032d0..0000000000000000000000000000000000000000 --- a/edosl0util/dump.py +++ /dev/null @@ -1,48 +0,0 @@ -from collections import defaultdict -from datetime import datetime - -from edos.ccsds import GROUP_FIRST - -from edosl0util.stream import PacketStream - - -MAX_SEQ_ID = 2**14 - - -def dump(filepath, handler=None): - """ - Dump PDS file statistics. The entire file will be iterated, which for larger - PDS files may take a while. - """ - first_pkt = datetime(3000, 1, 1) - last_pkt = datetime(1900, 1, 1) - packets = 0 - groups = 0 - # sequence nums are per APID - apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) - for pkt in PacketStream(open(filepath)): - packets += 1 - h1 = pkt._primary_header - if h1.sequence_grouping == GROUP_FIRST: - groups += 1 - apids[h1.apid]['count'] += 1 - - seq = h1.source_sequence_count - apids[h1.apid]['prev_seq'] = seq - - if pkt.stamp: - first_pkt = min(pkt.stamp, first_pkt) - last_pkt = max(pkt.stamp, last_pkt) - - try: - handler and handler(pkt) - except Exception: - pass - - return dict( - first_pkt=first_pkt, - last_pkt=last_pkt, - packet_count=packets, - group_count=groups, - apids=apids - ) \ No newline at end of file diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index 9ebd7e89f6d118ce10bbcdc1fa12b778c4200068..6b78f2287f0385bf237ea3b5b6e649829371e193 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -7,6 +7,7 @@ Code for reading/writing/manipulating JPSS Common RDR files as documented in: http://jointmission.gsfc.nasa.gov/sciencedocs/2015-06/474-00001-02_JPSS-CDFCB-X-Vol-II_0123B.pdf """ +import io import logging import ctypes as c from collections import namedtuple @@ -68,9 +69,6 @@ def _make_packet_impl(size): """ Create a struct for a CCSDS packet. The struct size is dynamic so we need a new class for each packet. - - XXX: Many packets will have similar sizes so perhaps we should cache - packet impls based on their size. """ # size minus the CCSDS primary header size data_size = size - 6 @@ -123,10 +121,7 @@ def _packets_for_apid(buf, header, apid): continue p_off = header.ap_storage_offset + tracker.offset - pkt_impl = _pkt_impl_cache.get(tracker.size) - if not pkt_impl: - pkt_impl = _make_packet_impl(tracker.size) - _pkt_impl_cache[tracker.size] = pkt_impl + pkt_impl = _make_packet_impl(tracker.size) pkt = pkt_impl.from_buffer(buf, p_off) assert c.sizeof(pkt) == tracker.size @@ -241,6 +236,6 @@ if __name__ == '__main__': logging.basicConfig(level=lvl, format='%(message)s') output = args.output or args.rdr + '.pds' - with open(output, 'wb') as fptr: + with io.open(output, 'wb') as fptr: for packet in convert_to_nasa_l0(args.sensor, args.rdr): fptr.write(packet) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 61c43622e796bae5f62db35ed871608692f7c817..dd2a448738c74987038e6e8f8bd25f3aac6b023a 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -1,3 +1,4 @@ +import io import sys import logging from datetime import datetime @@ -79,7 +80,10 @@ def merge(streams, output=sys.stdout): last_apid = packets[0].apid while packets: - output.write(packets.popleft().blob) + pkt = packets.popleft() + output.write(pkt.primary_header) + output.write(pkt.secondary_header) + output.write(pkt.data) except MissingPackets as err: LOG.debug("missing packets, switching streams: %s", err.args) @@ -92,4 +96,4 @@ def merge(streams, output=sys.stdout): def merge_files(filepaths, destpath): streams = make_streams(filepaths) - merge(streams, output=open(destpath, 'wb')) + merge(streams, output=io.open(destpath, 'wb')) diff --git a/edosl0util/split.py b/edosl0util/split.py index 46c2985db9a888cac326b2a31c1ad3051bb5bc69..b39454b13756c671757a5dbfd6b65394096e46c4 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -1,4 +1,5 @@ import os +import io import array from datetime import datetime @@ -35,7 +36,7 @@ def split_stream(fobj, minutes): cur_bucket = pkt_bucket # this is an append operation - buf.fromstring(pkt.blob) + buf.fromstring(pkt.data) pkt_count += 1 offset = fobj.tell() - original_offset @@ -84,7 +85,7 @@ def split_file(filepath, minutes, destdir): :raises RuntimeError: If a file exists with the same name of a bucket file. """ destdir = destdir or '.' - stream = split_stream(open(filepath), minutes) + stream = split_stream(io.open(filepath), minutes) for timestamp, offset, pkts, blob in stream: stamp = datetime.utcfromtimestamp(timestamp) dirname, filename = os.path.split(filepath) @@ -95,6 +96,6 @@ def split_file(filepath, minutes, destdir): ('File already exists. ' 'Bucket file possibly colliding with input file.'), dstpath) - with open(dstpath, 'wb') as fptr: + with io.open(dstpath, 'wb') as fptr: fptr.write(blob) yield stamp, fptr.name diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 1f90377b9bb8f742de31fe699af60cda8aa9436d..4e148ca0fe1a4fe2141e493849ac1ee923f24476 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -15,68 +15,103 @@ from edosl0util import headers LOG = logging.getLogger(__name__) -class MissingPackets(Exception): +def simple_stream(fobj=None, data=None): """ - Sequence ID for an APID are not sequential. It is expected that sequence ids - are sequential, rolling over at SequenceId.MAX. + Generator that yields PrimaryHeaders and data. Files are read using mmap. """ + if fobj is not None: + data = fobj + psize = c.sizeof(PrimaryHeader) + while True: + buf = data.read(psize) + if len(buf) < psize: + return + h1 = PrimaryHeader.from_buffer_copy(buf) + # read user data + size = h1.data_length_minus1 + 1 + buf = data.read(size) + if len(buf) < size: + return + yield h1, buf + + +def _secondary_stream(header_lookup, fobj=None, data=None): + """ + Generator that yields primary and secondary header and any left over bytes + of "user" data. + + :param header_lookup: function that takes a primary header and returns + a secondary header struct to use, or None. + """ + stream = simple_stream(fobj, data) + for h1, buf in stream: + h2 = '' + if h1.secondary_header_flag: + H2Impl = header_lookup(h1) + if H2Impl: + hsize = c.sizeof(H2Impl) + h2 = H2Impl.from_buffer_copy(buf) + yield h1, h2, buf[hsize:] + continue + yield h1, h2, buf + + +def jpss_secondary_stream(fobj=None, data=None): + """ + `_secondary_stream` impl for JPSS secondary headers. + """ + return _secondary_stream(headers.jpss_header_lookup, fobj, data) + + +def aqua_secondary_stream(fobj=None, data=None): + """ + `_secondary_stream` impl for Aqua secondary headers. + """ + return _secondary_stream(headers.aqua_header_lookup, fobj, data) class Packet(object): - def __init__(self, apid, seqid, stamp, blob, pheader, sheader): - self.apid = apid - self.seqid = seqid - self.stamp = stamp - self.blob = blob - self._primary_header = pheader - self._secondary_header = sheader + def __init__(self, h1, h2, data): + self.data = data + self.primary_header = h1 + self.secondary_header = h2 def __str__(self): return '<Packet apid=%d seqid=%d stamp=%s>' % \ (self.apid, self.seqid, self.stamp) __repr__ = __str__ + @property + def apid(self): + return self.primary_header.apid + + @property + def seqid(self): + return self.primary_header.sequence_grouping + + @property + def stamp(self): + h2 = self.secondary_header + return h2.timecode.asdatetime() if hasattr(h2, 'timecode') else None + + def bytes(self): + return buffer(self.primary_header) + \ + buffer(self.secondary_header) + self.data + def is_group(self): return False def is_first(self): - return self._primary_header.sequence_grouping == GROUP_FIRST + return self.primary_header.sequence_grouping == GROUP_FIRST def is_continuine(self): - return self._primary_header.sequence_grouping == GROUP_CONTINUING + return self.primary_header.sequence_grouping == GROUP_CONTINUING def is_last(self): - return self._primary_header.sequence_grouping == GROUP_LAST + return self.primary_header.sequence_grouping == GROUP_LAST def is_standalone(self): - return self._primary_header.sequence_grouping == GROUP_STANDALONE - - -class SequenceId(object): - MAX = 2**14 - - def __init__(self, initial=0): - self._val = initial - - def __iter__(self): - return self - - @property - def val(self): - return self._val - - def next(self): - self._val = (self._val + 1) % self.MAX - return self.val - - -def default_header_lookup(primary_header): - if not primary_header.secondary_header_flag: - return None - return ( - headers.aqua_header_lookup(primary_header) or - headers.jpss_header_lookup(primary_header) - ) + return self.primary_header.sequence_grouping == GROUP_STANDALONE class PacketStream(object): @@ -84,22 +119,15 @@ class PacketStream(object): Iterates over all CCSDS data producing `Packet` tuples. Only as much data is necessary to generate a single packet is read into memory at a time. """ - def __init__(self, fobj, fail_on_missing=False, header_lookup=default_header_lookup): + def __init__(self, fobj, fail_on_missing=False): """ :param fobj: File-like object to read stream from :keyword fail_on_missing: If True raise a MissingPackets error when packets are missing. - :keyword header_lookup: A function that takes a PrimaryHeader and returns - a secondary header implementation that will be used. """ - self._input = fobj + self._stream = jpss_secondary_stream(fobj) self._seek_cache = deque() - self._seqids = {} self._fail_on_missing = fail_on_missing - self._header_lookup = header_lookup - - def __str__(self): - return '<PacketStream pos=%d %s>' % (self._input.tell(), self._input) def __iter__(self): return self @@ -107,12 +135,6 @@ class PacketStream(object): def push_back(self, packet): self._seek_cache.append(packet) - def _read(self, size): - data = self._input.read(size) - if not data: - raise StopIteration - return data - def next(self): """ Return the next packet in the stream. @@ -127,33 +149,7 @@ class PacketStream(object): if len(self._seek_cache): return self._seek_cache.popleft() - buf = self._read(c.sizeof(PrimaryHeader)) - data = buf - - h1 = PrimaryHeader.from_buffer_copy(buf) - - buf = self._read(h1.data_length_minus1 + 1) - data += buf - - stamp = None - h2 = None # only first/standalone have h2 impls - H2Impl = self._header_lookup(h1) - if H2Impl: - h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)]) - if hasattr(h2, 'timecode') and hasattr(h2.timecode, 'asdatetime'): - stamp = h2.timecode.asdatetime() - - packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2) - - # initialize a new seqid counter - if packet.apid not in self._seqids: - self._seqids[packet.apid] = SequenceId(packet.seqid) - else: - next_seqid = self._seqids[packet.apid].next() - if self._fail_on_missing and packet.seqid != next_seqid: - raise MissingPackets("non-sequential sequence ids", - packet.seqid, next_seqid) - return packet + return Packet(*self._stream.next()) def seek(self, stamp, apid=None): """ @@ -191,11 +187,11 @@ def _cmp_by_first_pkt(filea, fileb): by time. """ stampa = stampb = None - for pkt in PacketStream(open(filea)): + for pkt in PacketStream(open(filea, 'rb')): if pkt.stamp: stampa = pkt.stamp break - for pkt in PacketStream(open(fileb)): + for pkt in PacketStream(open(fileb, 'rb')): if pkt.stamp: stampb = pkt.stamp break diff --git a/edosl0util/trunc.py b/edosl0util/trunc.py index 38cdf80dbe89a8cd9c459495eaa04bf846fb0faa..f6762410de7804000ab632af316e44cc085f3be2 100644 --- a/edosl0util/trunc.py +++ b/edosl0util/trunc.py @@ -1,4 +1,4 @@ - +import io from edosl0util.stream import PacketStream @@ -17,5 +17,5 @@ def trunc_stream(stream, start, end): def trunc_file(filename, start, end): - stream = PacketStream(open(filename)) + stream = PacketStream(io.open(filename, 'rb')) return trunc_stream(stream, start, end)