diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 16ac3b3c29935517ebc9c7f555f273d9ed6e0679..740161d106fd4154847d885f65881529d799d3cf 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -42,6 +42,7 @@ def merge(streams, output=sys.stdout): LOG.debug("seeking to %s, %s", last_stamp, last_apid) stream.seek(last_stamp, last_apid) + # until `next` causes StopIteration while True: packets = deque() @@ -69,9 +70,6 @@ def merge(streams, output=sys.stdout): LOG.debug("invalid group, switching streams:%s", group) break - if not packets: - continue - # first packet always has a stamp because it's either standalone or # part of a valid group last_stamp = packets[0].stamp @@ -90,5 +88,5 @@ def merge(streams, output=sys.stdout): def merge_files(filepaths, destpath): - streams = (PacketStream(open(f)) for f in filepaths) + streams = [PacketStream(open(f)) for f in filepaths] merge(streams, output=open(destpath, 'wb')) diff --git a/edosl0util/split.py b/edosl0util/split.py index 6fb0cc6ed41d00440c1c55e9b104bea87b23d04d..018f67faa8d3f6079eea6ffdce8e76fcda175289 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -43,10 +43,13 @@ def split_stream(fobj, minutes): def _replace_pdsname_stamp(filename, stamp): - # P1570769AAAAAAAAAAAAAS15208032721000.PDS - pat = '{}{}{}'.format(filename[:22], - '%y%j%H%M%S', - filename[-7:]) + # P1570769AAAAAAAAAAAAAS15208032721001.PDS + # + # NOTE: It seems that EDOS uses the file_id column for fractional seconds. + # We just zero this out since the bucket should be on even seconds. + pat = '{}{}0{}'.format(filename[:22], + '%y%j%H%M%S', + filename[-6:]) return stamp.strftime(pat) diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 977b9443bab2691eccb79d052b26b5916cb188cb..e06237f2ca4574de97cffcee1e7d95fee82cea38 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -4,8 +4,6 @@ from collections import deque from edos.ccsds import ( c, PrimaryHeader, - JpssFirstSecondaryHeader, - JpssSecondaryHeader, GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, @@ -99,7 +97,7 @@ class PacketStream(object): self._header_lookup = header_lookup def __str__(self): - return '<PacketStream pos=%d %s>' % (self.tell(), self._input) + return '<PacketStream pos=%d %s>' % (self._input.tell(), self._input) def __iter__(self): return self @@ -125,7 +123,7 @@ class PacketStream(object): """ # return any items on the seek cache before reading more if len(self._seek_cache): - return self._seek_cache.pop() + return self._seek_cache.popleft() buf = self._read(c.sizeof(PrimaryHeader)) data = buf @@ -164,16 +162,22 @@ class PacketStream(object): :raises: StopIteration if the end of the stream is reached before the criteria can be met. """ + # seek past partial groups packet = self.next() while not packet.stamp: packet = self.next() + + # seek to first packet > `stamp` current_stamp = packet.stamp - while current_stamp < stamp: + while current_stamp <= stamp: packet = self.next() current_stamp = packet.stamp or current_stamp + + # put back so next packet is the first packet > `stamp` self.push_back(packet) if apid is not None: + packet = self.next() while packet.apid != apid: packet = self.next() self.push_back(packet) @@ -198,4 +202,4 @@ def _cmp_by_first_pkt(filea, fileb): def make_streams(filepaths): filepaths.sort(_cmp_by_first_pkt) - return [PacketStream(io.open(f, 'rb')) for f in filepaths] + return [PacketStream(open(f, 'rb')) for f in filepaths]