Skip to content
Snippets Groups Projects
Commit 0563b238 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Lots of awesome stuff

parent 34ce9136
No related branches found
No related tags found
No related merge requests found
include .version include edosl0util/.repostate
...@@ -3,24 +3,38 @@ Console script entry points for CLI tools. ...@@ -3,24 +3,38 @@ Console script entry points for CLI tools.
""" """
import io import io
import os import os
import logging
from datetime import datetime from datetime import datetime
from collections import defaultdict
from edosl0util import split, trunc, stream from edosl0util import split, trunc, stream, merge, jpssrdr
LOG = logging
def _timestamp(v): def _timestamp(v):
return datetime.strptime(v, '%Y-%m-%d %H:%M:%S') return datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
def _default_parser():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true')
return parser
def _configure_logging(args):
level = logging.DEBUG if getattr(args, 'verbose', False) else logging.WARN
logging.basicConfig(level=level, format='%(message)s')
def cmd_trunc(): def cmd_trunc():
from argparse import ArgumentParser parser = _default_parser()
parser = ArgumentParser()
parser.add_argument('-o', '--output') parser.add_argument('-o', '--output')
parser.add_argument('filename') parser.add_argument('filename')
parser.add_argument('start', type=_timestamp) parser.add_argument('start', type=_timestamp, help='YYYY-MM-DD HH:MM:SS')
parser.add_argument('end', type=_timestamp) parser.add_argument('end', type=_timestamp, help='YYYY-MM-DD HH:MM:SS')
args = parser.parse_args() args = parser.parse_args()
_configure_logging(args)
output = args.output or os.path.basename(args.filename) + '.trunc' output = args.output or os.path.basename(args.filename) + '.trunc'
with io.open(output, 'wb') as fptr: with io.open(output, 'wb') as fptr:
...@@ -29,38 +43,58 @@ def cmd_trunc(): ...@@ -29,38 +43,58 @@ def cmd_trunc():
def cmd_split(): def cmd_split():
from argparse import ArgumentParser parser = _default_parser()
parser = ArgumentParser()
parser.add_argument('--minutes', type=int, default=6) parser.add_argument('--minutes', type=int, default=6)
parser.add_argument('filepath') parser.add_argument('filepath')
args = parser.parse_args() args = parser.parse_args()
_configure_logging(args)
for stamp, fpath in split.split_file(args.filepath, args.minutes, os.getcwd()): for stamp, fpath in split.split_file(args.filepath, args.minutes, os.getcwd()):
print('wrote bucket {} to {}'.format(stamp.isoformat(), fpath)) LOG.info('wrote bucket {} to {}'.format(stamp.isoformat(), fpath))
def cmd_info(): def cmd_info():
from argparse import ArgumentParser parser = _default_parser()
parser = ArgumentParser(description=split.__doc__)
parser.add_argument('filepath') parser.add_argument('filepath')
args = parser.parse_args() args = parser.parse_args()
_configure_logging(args)
packets = stream.PacketStream(io.open(args.filepath, 'rb')) packets = stream.PacketStream(io.open(args.filepath, 'rb'))
first = datetime(3000, 1, 1)
first = datetime(2016, 1, 1) last = datetime(1970, 1, 1)
last = datetime(2000, 1, 1) for packet in packets:
apids = defaultdict(lambda: {'count': 0}) if packet.stamp:
for p in packets: first = min(packet.stamp, first)
if p.stamp: last = max(packet.stamp, last)
first = min(first, p.stamp)
last = max(last, p.stamp)
apids[p.apid]['count'] += 1
print "First:", first
print "Last:", last
total = 0 total = 0
for apid in sorted(apids): LOG.info("First: %s", first)
count = apids[apid]['count'] LOG.info("Last: %s", last)
total += count for apid, dat in packets.info().items():
print "{}: {}".format(apid, count) total += dat['count']
print "{} total packets".format(total) LOG.info("%d: count=%d missing=%d", apid, dat['count'], dat['num_missing'])
LOG.info("{} total packets".format(total))
def cmd_merge():
parser = _default_parser()
parser.add_argument('-o', '--output', default='out.pds')
parser.add_argument('pds', nargs='+')
args = parser.parse_args()
_configure_logging(args)
merge.merge_files(args.pds, args.output)
def cmd_rdr2l0():
parser = _default_parser()
parser.add_argument('-o', '--output')
parser.add_argument('-f', '--skipfill', action='store_true')
parser.add_argument('sensor', choices=('viirs', 'atms', 'cris'))
parser.add_argument('rdr')
args = parser.parse_args()
_configure_logging(args)
output = args.output or args.rdr + '.pds'
with io.open(output, 'wb') as fptr:
for packet in jpssrdr.convert_to_nasa_l0(args.sensor, args.rdr):
fptr.write(packet)
...@@ -207,25 +207,3 @@ def convert_to_nasa_l0(sensor, filename, skipfill=False): ...@@ -207,25 +207,3 @@ def convert_to_nasa_l0(sensor, filename, skipfill=False):
if skipfill: if skipfill:
continue continue
yield packet.packet yield packet.packet
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-v', action='store_true')
parser.add_argument('-o', '--output')
parser.add_argument('-f', '--skipfill', action='store_true')
parser.add_argument('sensor', choices=('viirs', 'atms', 'cris'))
parser.add_argument('rdr')
args = parser.parse_args()
if args.v:
lvl = logging.DEBUG
else:
lvl = logging.WARN
logging.basicConfig(level=lvl, format='%(message)s')
output = args.output or args.rdr + '.pds'
with io.open(output, 'wb') as fptr:
for packet in convert_to_nasa_l0(args.sensor, args.rdr):
fptr.write(packet)
...@@ -2,12 +2,9 @@ import io ...@@ -2,12 +2,9 @@ import io
import sys import sys
import logging import logging
from datetime import datetime from datetime import datetime
from collections import deque from collections import deque, defaultdict
from edosl0util.stream import ( from edosl0util.stream import make_streams
MissingPackets,
make_streams,
)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -33,28 +30,38 @@ def _is_valid_group(packets): ...@@ -33,28 +30,38 @@ def _is_valid_group(packets):
def merge(streams, output=sys.stdout): def merge(streams, output=sys.stdout):
last_apid = None last_apid = None
last_stamp = datetime(1900, 1, 1) last_stamp = datetime(1900, 1, 1)
apid_cache = {} # num pkts missing per apid
missing = defaultdict(lambda: 0)
# streams are removed as they are exhausted # streams are removed as they are exhausted
while streams: while streams:
for stream in streams: for stream in streams:
LOG.debug(stream)
try: try:
LOG.debug("seeking to %s, %s", last_stamp, last_apid) LOG.debug("seeking to %s, %s", last_stamp, last_apid)
stream.seek(last_stamp, last_apid) stream.seek(last_stamp, last_apid)
# until `next` causes StopIteration # Do until `next` causes StopIteration
while True: while True:
packets = deque() packets = deque()
packet = stream.next() packet = stream.next()
# If the num missing is incremented there is a missing
# packet. If missing put the current packet back for
# fetching again and defer to the next stream.
new_missing = stream.info()[packet.apid]['num_missing']
if new_missing > missing[packet.apid]:
missing[packet.apid] = new_missing
stream.push_back(packet)
LOG.debug("missing packets, switching streams: %s", packet)
break # defer to next stream
if packet.is_standalone(): if packet.is_standalone():
packets.append(packet) packets.append(packet)
# Packet Group: Collect all packets between packets with
# timestamps. Missing packets and packets that form an
# invalid group will be skipped.
else: else:
# Collect all packets between packets with timestamps.
# Missing packets and packets that form an invalid group
# will be skipped.
group = deque([packet]) group = deque([packet])
group.extend(_group_packets(stream)) group.extend(_group_packets(stream))
if _is_valid_group(group): if _is_valid_group(group):
...@@ -63,13 +70,13 @@ def merge(streams, output=sys.stdout): ...@@ -63,13 +70,13 @@ def merge(streams, output=sys.stdout):
elif group[0].is_first(): elif group[0].is_first():
last_stamp = group[0].stamp last_stamp = group[0].stamp
last_apid = group[0].apid last_apid = group[0].apid
break break # defer to next stream
else: else:
# yield to next stream, perhaps they have a valid # yield to next stream, perhaps they have a valid
# group. # group.
LOG.debug("invalid group, switching streams:%s", group) LOG.debug("invalid group, switching streams:%s", group)
break break # defer to next stream
# first packet always has a stamp because it's either standalone or # first packet always has a stamp because it's either standalone or
# part of a valid group # part of a valid group
...@@ -78,13 +85,11 @@ def merge(streams, output=sys.stdout): ...@@ -78,13 +85,11 @@ def merge(streams, output=sys.stdout):
while packets: while packets:
pkt = packets.popleft() pkt = packets.popleft()
LOG.debug('writing %s: %s', stream, pkt)
output.write(pkt.primary_header) output.write(pkt.primary_header)
output.write(pkt.secondary_header) output.write(pkt.secondary_header)
output.write(pkt.data) output.write(pkt.data)
except MissingPackets as err:
LOG.debug("missing packets, switching streams: %s", err.args)
except StopIteration: except StopIteration:
streams.remove(stream) streams.remove(stream)
LOG.debug("end-of-stream %s", stream) LOG.debug("end-of-stream %s", stream)
......
...@@ -121,15 +121,16 @@ class PacketStream(object): ...@@ -121,15 +121,16 @@ class PacketStream(object):
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time. necessary to generate a single packet is read into memory at a time.
""" """
SEQID_NOTSET = -1
def __init__(self, fobj): def __init__(self, fobj):
""" """
:param fobj: File-like object to read stream from :param fobj: File-like object to read stream from
:keyword fail_on_missing: If True raise a MissingPackets error when packets
are missing.
""" """
self._stream = jpss_full_stream(fobj) self._stream = jpss_full_stream(fobj)
self._seek_cache = deque() self._seek_cache = deque()
self._apid_info = defaultdict(lambda: {'count': 0, 'last_seqid': -1, 'missing': 0}) self._apid_info = defaultdict(
lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0})
def __iter__(self): def __iter__(self):
return self return self
...@@ -160,15 +161,18 @@ class PacketStream(object): ...@@ -160,15 +161,18 @@ class PacketStream(object):
return packet return packet
def _update_info(self, packet): def _update_info(self, packet):
have_missing = False
apid = self._apid_info[packet.apid] apid = self._apid_info[packet.apid]
apid['count'] += 1 apid['count'] += 1
if apid['last_seqid'] != -1: if apid['last_seqid'] != self.SEQID_NOTSET:
if packet.seqid > apid['last_seqid']: if packet.seqid > apid['last_seqid']:
missing = packet.seqid - apid['last_seqid'] - 1 num_missing = packet.seqid - apid['last_seqid'] - 1
else: else:
missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID num_missing = packet.seqid - apid['last_seqid'] + MAX_SEQ_ID
apid['missing'] += missing have_missing = num_missing != apid['num_missing']
apid['num_missing'] += num_missing
apid['last_seqid'] = packet.seqid apid['last_seqid'] = packet.seqid
return have_missing
def info(self): def info(self):
return self._apid_info return self._apid_info
......
...@@ -3,14 +3,9 @@ from setuptools import setup, find_packages ...@@ -3,14 +3,9 @@ from setuptools import setup, find_packages
setup( setup(
name='EdosL0Util', name='EdosL0Util',
description='Utilities for working with EDOS L0 PDS files', description='Utilities for working with EDOS L0 PDS files',
version=':pygittools:', version='0.4.dev8',
pygitmeta=True,
packages=find_packages(), packages=find_packages(),
setup_requires=[
'PyGitTools>=0.1.3'
],
install_requires=[ install_requires=[
'edos',
'h5py', 'h5py',
], ],
entry_points=""" entry_points="""
...@@ -18,5 +13,7 @@ setup( ...@@ -18,5 +13,7 @@ setup(
edosl0split = edosl0util.cli:cmd_split edosl0split = edosl0util.cli:cmd_split
edosl0trunc = edosl0util.cli:cmd_trunc edosl0trunc = edosl0util.cli:cmd_trunc
edosl0info = edosl0util.cli:cmd_info edosl0info = edosl0util.cli:cmd_info
edosl0merge = edosl0util.cli:cmd_merge
rdr2l0 = edosl0util.cli:cmd_rdr2l0
""" """
) )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment