Skip to content
Snippets Groups Projects
Commit baa7fbb7 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Fix few small bugs.

Fixes issue #1.
parent 0579b00b
No related branches found
No related tags found
No related merge requests found
...@@ -42,6 +42,7 @@ def merge(streams, output=sys.stdout): ...@@ -42,6 +42,7 @@ def merge(streams, output=sys.stdout):
LOG.debug("seeking to %s, %s", last_stamp, last_apid) LOG.debug("seeking to %s, %s", last_stamp, last_apid)
stream.seek(last_stamp, last_apid) stream.seek(last_stamp, last_apid)
# until `next` causes StopIteration
while True: while True:
packets = deque() packets = deque()
...@@ -69,9 +70,6 @@ def merge(streams, output=sys.stdout): ...@@ -69,9 +70,6 @@ def merge(streams, output=sys.stdout):
LOG.debug("invalid group, switching streams:%s", group) LOG.debug("invalid group, switching streams:%s", group)
break break
if not packets:
continue
# first packet always has a stamp because it's either standalone or # first packet always has a stamp because it's either standalone or
# part of a valid group # part of a valid group
last_stamp = packets[0].stamp last_stamp = packets[0].stamp
...@@ -90,5 +88,5 @@ def merge(streams, output=sys.stdout): ...@@ -90,5 +88,5 @@ def merge(streams, output=sys.stdout):
def merge_files(filepaths, destpath): def merge_files(filepaths, destpath):
streams = (PacketStream(open(f)) for f in filepaths) streams = [PacketStream(open(f)) for f in filepaths]
merge(streams, output=open(destpath, 'wb')) merge(streams, output=open(destpath, 'wb'))
...@@ -43,10 +43,13 @@ def split_stream(fobj, minutes): ...@@ -43,10 +43,13 @@ def split_stream(fobj, minutes):
def _replace_pdsname_stamp(filename, stamp): def _replace_pdsname_stamp(filename, stamp):
# P1570769AAAAAAAAAAAAAS15208032721000.PDS # P1570769AAAAAAAAAAAAAS15208032721001.PDS
pat = '{}{}{}'.format(filename[:22], #
'%y%j%H%M%S', # NOTE: It seems that EDOS uses the file_id column for fractional seconds.
filename[-7:]) # We just zero this out since the bucket should be on even seconds.
pat = '{}{}0{}'.format(filename[:22],
'%y%j%H%M%S',
filename[-6:])
return stamp.strftime(pat) return stamp.strftime(pat)
......
...@@ -4,8 +4,6 @@ from collections import deque ...@@ -4,8 +4,6 @@ from collections import deque
from edos.ccsds import ( from edos.ccsds import (
c, c,
PrimaryHeader, PrimaryHeader,
JpssFirstSecondaryHeader,
JpssSecondaryHeader,
GROUP_FIRST, GROUP_FIRST,
GROUP_CONTINUING, GROUP_CONTINUING,
GROUP_LAST, GROUP_LAST,
...@@ -99,7 +97,7 @@ class PacketStream(object): ...@@ -99,7 +97,7 @@ class PacketStream(object):
self._header_lookup = header_lookup self._header_lookup = header_lookup
def __str__(self): def __str__(self):
return '<PacketStream pos=%d %s>' % (self.tell(), self._input) return '<PacketStream pos=%d %s>' % (self._input.tell(), self._input)
def __iter__(self): def __iter__(self):
return self return self
...@@ -125,7 +123,7 @@ class PacketStream(object): ...@@ -125,7 +123,7 @@ class PacketStream(object):
""" """
# return any items on the seek cache before reading more # return any items on the seek cache before reading more
if len(self._seek_cache): if len(self._seek_cache):
return self._seek_cache.pop() return self._seek_cache.popleft()
buf = self._read(c.sizeof(PrimaryHeader)) buf = self._read(c.sizeof(PrimaryHeader))
data = buf data = buf
...@@ -164,16 +162,22 @@ class PacketStream(object): ...@@ -164,16 +162,22 @@ class PacketStream(object):
:raises: StopIteration if the end of the stream is reached before the :raises: StopIteration if the end of the stream is reached before the
criteria can be met. criteria can be met.
""" """
# seek past partial groups
packet = self.next() packet = self.next()
while not packet.stamp: while not packet.stamp:
packet = self.next() packet = self.next()
# seek to first packet > `stamp`
current_stamp = packet.stamp current_stamp = packet.stamp
while current_stamp < stamp: while current_stamp <= stamp:
packet = self.next() packet = self.next()
current_stamp = packet.stamp or current_stamp current_stamp = packet.stamp or current_stamp
# put back so next packet is the first packet > `stamp`
self.push_back(packet) self.push_back(packet)
if apid is not None: if apid is not None:
packet = self.next()
while packet.apid != apid: while packet.apid != apid:
packet = self.next() packet = self.next()
self.push_back(packet) self.push_back(packet)
...@@ -198,4 +202,4 @@ def _cmp_by_first_pkt(filea, fileb): ...@@ -198,4 +202,4 @@ def _cmp_by_first_pkt(filea, fileb):
def make_streams(filepaths): def make_streams(filepaths):
filepaths.sort(_cmp_by_first_pkt) filepaths.sort(_cmp_by_first_pkt)
return [PacketStream(io.open(f, 'rb')) for f in filepaths] return [PacketStream(open(f, 'rb')) for f in filepaths]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment