Skip to content
Snippets Groups Projects
Commit 82883d61 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Fix CLI merge utility.

parent a0bbba59
No related branches found
No related tags found
No related merge requests found
...@@ -104,23 +104,9 @@ def cmd_merge(): ...@@ -104,23 +104,9 @@ def cmd_merge():
_configure_logging(args) _configure_logging(args)
index = set() # will remove duplicates streams = [stream.jpss_packet_stream(io.open(f, 'rb')) for f in args.pds]
for filepath in args.pds: merge.merge(
LOG.debug('indexing %s', filepath) streams, output=io.open(args.output, 'wb'), trunc_to=args.trunc_to)
fobj = io.open(filepath, 'rb')
index.update(merge.read_packet_index(fobj))
LOG.info('sorting index with %d pointers', len(index))
index = sorted(index)
LOG.info('writing index to %s', args.output)
with io.open(args.output, 'wb') as fobj:
for ptr in index:
if args.trunc_to:
interval = args.trunc_to
if ptr.stamp >= interval[0] and ptr.stamp < interval[1]:
fobj.write(ptr.bytes())
else:
fobj.write(ptr.bytes())
def cmd_rdr2l0(): def cmd_rdr2l0():
......
...@@ -8,15 +8,10 @@ ...@@ -8,15 +8,10 @@
4. Write 4. Write
""" """
import io
import os import os
import logging import logging
from datetime import datetime
from collections import deque from collections import deque
from edosl0util import headers
from edosl0util.stream import PacketStream, BasicStream
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -55,11 +50,8 @@ class _Ptr(object): ...@@ -55,11 +50,8 @@ class _Ptr(object):
return self.fobj.read(self.size) return self.fobj.read(self.size)
def read_packet_index(fobj): def read_packet_index(stream):
lookup = headers.jpss_header_lookup
stream = PacketStream(BasicStream(fobj, lookup, with_data=False))
index = deque() index = deque()
try: try:
# drop any leading hanging packets # drop any leading hanging packets
count = 0 count = 0
...@@ -72,7 +64,7 @@ def read_packet_index(fobj): ...@@ -72,7 +64,7 @@ def read_packet_index(fobj):
while True: while True:
ptr = _Ptr( ptr = _Ptr(
fobj, stream.file,
stamp=packet.stamp, stamp=packet.stamp,
apid=packet.apid, apid=packet.apid,
offset=packet.offset, offset=packet.offset,
...@@ -92,38 +84,41 @@ def read_packet_index(fobj): ...@@ -92,38 +84,41 @@ def read_packet_index(fobj):
return index return index
if __name__ == '__main__': def _sort_by_time_apid(index, order=None):
import argparse if order:
parser = argparse.ArgumentParser() index = sorted(index, key=lambda p: order.index(p.apid))
parser.add_argument('-o', '--output', default='out.pds') else:
dtarg = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S') index = sorted(index, key=lambda p: p.apid)
def interval(v): return sorted(index, key=lambda p: p.stamp)
dt = lambda v: datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
return [dt(x) for x in v.split(',')]
parser.add_argument(
'-t', '--trunc-to', type=interval,
help=('Truncate to the interval given as coma separated timestamps of '
'the format YYYY-MM-DD HH:MM:SS. The begin time is inclusive, the '
'end time is exclusive.'))
parser.add_argument('pds', nargs='+')
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
def merge(streams, output, trunc_to=None, apid_order=None):
"""
Merge packets from multiple streams to an output file. Duplicate packets
will be filtered and packets will be sorted by time and the apid order
provided.
:param streams: List of `PacketStream`s such as returned by
`jpss_packet_stream`.
:param output: File-like object to write output packets to.
:keyword trunc_to: (start, end) datetime to truncate packets to. Start is
inclusive and end is exclusive.
:keyword apid_order: List of apid is the order in which they should appear
in the output. If provided ALL apids must be present, otherwise an
IndexError will occur.
"""
index = set() # will remove duplicates index = set() # will remove duplicates
for filepath in args.pds: for stream in streams:
LOG.debug('indexing %s', filepath) LOG.debug('indexing %s', stream)
fobj = io.open(filepath, 'rb') index.update(read_packet_index(stream))
index.update(read_packet_index(fobj))
LOG.debug('sorting index with %d pointers', len(index))
LOG.info('sorting index with %d pointers', len(index)) index = _sort_by_time_apid(index, order=apid_order)
index = sorted(index)
LOG.info('writing index to %s', args.output) LOG.debug('writing index to %s', output)
with io.open(args.output, 'wb') as fobj: for ptr in index:
for ptr in index: if trunc_to:
if args.trunc_to: if ptr.stamp >= trunc_to[0] and ptr.stamp < trunc_to[1]:
interval = args.trunc_to output.write(ptr.bytes())
if ptr.stamp >= interval[0] and ptr.stamp < interval[1]: else:
fobj.write(ptr.bytes()) output.write(ptr.bytes())
else:
fobj.write(ptr.bytes())
import io
import os import os
import logging import logging
import ctypes as c import ctypes as c
...@@ -44,7 +43,7 @@ class BasicStream(object): ...@@ -44,7 +43,7 @@ class BasicStream(object):
Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data']) Tracker = namedtuple('Tracker', ['h1', 'h2', 'size', 'offset', 'data'])
def __init__(self, fobj, header_lookup=None, with_data=True): def __init__(self, fobj, header_lookup=None, with_data=True):
self.fobj = fobj self.file = fobj
self.header_lookup = header_lookup self.header_lookup = header_lookup
self.with_data = with_data self.with_data = with_data
...@@ -55,7 +54,7 @@ class BasicStream(object): ...@@ -55,7 +54,7 @@ class BasicStream(object):
return self.next() return self.next()
def _read(self, size): def _read(self, size):
buf = self.fobj.read(size) buf = self.file.read(size)
if not buf: # EOF if not buf: # EOF
raise StopIteration() raise StopIteration()
if len(buf) != size: if len(buf) != size:
...@@ -78,7 +77,7 @@ class BasicStream(object): ...@@ -78,7 +77,7 @@ class BasicStream(object):
return H2Impl.from_buffer_copy(buf), h2size return H2Impl.from_buffer_copy(buf), h2size
def next(self): def next(self):
offset = self.fobj.tell() offset = self.file.tell()
h1, h1size = self.read_primary_header() h1, h1size = self.read_primary_header()
h2, h2size = self.read_secondary_header(h1) h2, h2size = self.read_secondary_header(h1)
# data length includes h2size # data length includes h2size
...@@ -155,6 +154,10 @@ class PacketStream(object): ...@@ -155,6 +154,10 @@ class PacketStream(object):
'num_missing': 0}) 'num_missing': 0})
self._fail_on_missing = fail_on_missing self._fail_on_missing = fail_on_missing
def __repr__(self):
filepath = getattr(self.file, 'name', None)
return '<{} file={}>'.format(self.__class__.__name__, filepath)
def __iter__(self): def __iter__(self):
return self return self
...@@ -162,6 +165,10 @@ class PacketStream(object): ...@@ -162,6 +165,10 @@ class PacketStream(object):
def __next__(self): def __next__(self):
return self.next() return self.next()
@property
def file(self):
return self._stream.file
def push_back(self, packet): def push_back(self, packet):
self._seek_cache.append(packet) self._seek_cache.append(packet)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment