Skip to content
Snippets Groups Projects
Commit 69356fc1 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Add/use stream module.

parent 86405792
No related branches found
No related tags found
No related merge requests found
"""
Console script entry points for CLI tools.
"""
from datetime import datetime
from jpssl0util import split, dump
from jpssl0util.timecode import cds_stamp
def cmd_split():
from argparse import ArgumentParser
parser = ArgumentParser()
parser = ArgumentParser(description=split.__doc__)
parser.add_argument('--minutes', type=int, default=6)
parser.add_argument('--output-format', default='%y%j%H%M%S001.PDS')
parser.add_argument('filepath')
......@@ -21,19 +23,24 @@ def cmd_split():
def cmd_dump():
from argparse import ArgumentParser
parser = ArgumentParser()
parser = ArgumentParser(description=dump.__doc__)
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('pds')
args = parser.parse_args()
def handler(h1, h2):
print(h1)
if h2:
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
print("\t%s : %s" % (stamp, h2))
def handler(pkt):
print(pkt.h1)
if pkt.h2:
print("\t%s : %s" % (pkt.stamp, pkt.h2))
stats = dump.dump(args.pds, handler=handler)
stats = dump.dump(args.pds, handler=(handler if args.verbose else None))
print("First Packet: {}".format(stats['first_pkt']))
print("Last Packet: {}".format(stats['last_pkt']))
print("Packet Count: {:d}".format(stats['packet_count']))
print("Group Count: {:d}".format(stats['group_count']))
apids = sorted(stats['apid_counts'])
print("Missing Count: {:d}".format(stats['missing_count']))
apids = sorted(stats['apids'])
for apid in apids:
print("APID {:d} Count: {:d}".format(apid, stats['apid_counts'][apid]))
count = stats['apids'][apid]['count']
missing = stats['apids'][apid]['missing']
print("APID {:d} Count:{:d} Missing:{:d}".format(apid, count, missing))
#!/usr/bin/env python
from collections import defaultdict
from datetime import datetime
from edos import ccsds
from edos.ccsds import GROUP_FIRST
from jpssl0util.timecode import cds_stamp
from jpssl0util.stream import packetstream
MAX_SEQ_ID = 2**14
def dump(filepath, handler=None):
"""
Dump PDS file statistics. The entire file will be iterated, which for larger
PDS files may take a while.
"""
first_pkt = datetime(3000, 1, 1)
last_pkt = datetime(1900, 1, 1)
packets = 0
groups = 0
missing = 0
# sequence nums are per APID
apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0})
for pkt in packetstream(filepath):
packets += 1
if pkt.h1.sequence_grouping == GROUP_FIRST:
groups += 1
apids[pkt.h1.apid]['count'] += 1
seq = pkt.h1.source_sequence_count
expected_seq = (apids[pkt.h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID
if seq != expected_seq:
missing += 1
apids[pkt.h1.apid]['missing'] += 1
apids[pkt.h1.apid]['prev_seq'] = seq
pktcnt = 0
groupcnt = 0
apidcnts = defaultdict(lambda: 0)
for h1, h2 in ccsds.walk(filepath):
pktcnt += 1
if h1.sequence_grouping == ccsds.GROUP_FIRST:
groupcnt += 1
apidcnts[h1.apid] += 1
if pkt.stamp:
first_pkt = min(pkt.stamp, first_pkt)
last_pkt = max(pkt.stamp, last_pkt)
handler(h1, h2)
try:
handler and handler(pkt)
except Exception:
pass
return dict(
packet_count = pktcnt,
group_count = groupcnt,
apid_counts = apidcnts
first_pkt=first_pkt,
last_pkt=last_pkt,
packet_count=packets,
group_count=groups,
missing_count=missing,
apids=apids
)
\ No newline at end of file
#!/usr/bin/env python
"""
TODO:
- Deal with leapseconds, use grain?
"""
from io import open
import io
from cStringIO import StringIO
from datetime import datetime, timedelta
from edos import ccsds
from jpssl0util.timecode import cds_stamp, unixtime
def first_pkt_in_scan(h1):
return h1.sequence_grouping == ccsds.GROUP_FIRST
def standalone_pkt(h1):
return h1.sequence_grouping == ccsds.GROUP_STANDALONE
from jpssl0util.stream import packetstream
def time_to_roll(time, minutes):
......@@ -32,31 +15,24 @@ def split(filepath, minutes):
A single output file is buffered in memory until it is written.
"""
with open(filepath, 'rb') as datain:
buf = StringIO() # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes'
cursor = 0 # cur pos in file
headers = ccsds.walk(filepath)
buf = StringIO() # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes'
for h1, h2 in headers:
if first_pkt_in_scan(h1) or standalone_pkt(h1):
stamp = cds_stamp(h2.day, h2.millisecods, h2.microseconds)
hdrtime = unixtime(stamp)
for pkt in packetstream(filepath):
# do the bucketing based on secondary header timestamps
if pkt.h2:
stamp = cds_stamp(pkt.h2.day, pkt.h2.milliseconds, pkt.h2.microseconds)
hdrtime = unixtime(stamp)
pkt_bucket = hdrtime - hdrtime % (minutes * 60)
if cur_bucket == 0:
cur_bucket = pkt_bucket
if cursor != 0 and pkt_bucket > cur_bucket:
yield cur_bucket, buf.getvalue()
buf = StringIO()
pkt_bucket = hdrtime - hdrtime % (minutes * 60)
if cur_bucket == 0:
cur_bucket = pkt_bucket
datain.seek(cursor)
# data_length_minus1 + 1 == len(h2) + len(data)
read_size = ccsds.c.sizeof(h1) + h1.data_length_minus1 + 1
buf.write(datain.read(read_size))
cursor += read_size
if pkt_bucket > cur_bucket:
yield cur_bucket, buf.getvalue()
buf = StringIO()
cur_bucket = pkt_bucket
buf.write(pkt.data)
yield cur_bucket, buf.getvalue()
\ No newline at end of file
yield cur_bucket, buf.getvalue()
\ No newline at end of file
import io
from collections import namedtuple
from edos.ccsds import (
c,
PrimaryHeader,
JpssFirstSecondaryHeader,
JpssSecondaryHeader,
GROUP_FIRST,
GROUP_STANDALONE
)
from jpssl0util.timecode import cds_stamp
Packet = namedtuple('Packet', ('h1', 'h2', 'stamp', 'data'))
secondary_header_impls = {
GROUP_FIRST: JpssFirstSecondaryHeader,
GROUP_STANDALONE: JpssSecondaryHeader
}
def packetstream(filepath):
"""
Return a generator of `Packet` tuples that will contain the primary header
struct and all the packet data, including the header data.
"""
with io.open(filepath, mode='rb', buffering=0) as datain:
buf = datain.read(c.sizeof(PrimaryHeader))
while buf:
data = buf
h1 = PrimaryHeader.from_buffer_copy(buf)
buf = datain.read(h1.data_length_minus1 + 1)
if not buf:
break
data += buf
stamp = None
h2 = None # only first/standalone have h2 impls
H2Impl = secondary_header_impls.get(h1.sequence_grouping)
if H2Impl:
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
yield Packet(h1, h2, stamp, data)
buf = datain.read(c.sizeof(PrimaryHeader))
"""
pdsfiles = sort_pdsfiles_by_pkttime(filenames)
apid = None
seqnum = None
for pdsfile in pdsfiles:
try:
pdsfile.seekto(apid, seqnum)
for packet in pdsfile.read():
output.write(packet)
except MissingPacket as err:
apid = err.apid
seqnum = err.seqnum
print("missing packet apid:{:d} seqnum:{:d} in {:s}".format(err.apid, err.seqnum, pdsfile))
except NoMorePackets:
print("{} has no more packets".format(pdsfile))
pdsfiles.pop(pdsfile)
"""
......@@ -6,10 +6,16 @@ UNIX_EPOCH = datetime(1970, 1, 1)
def unixtime(dt):
"""
Datetime to Unix timestamp.
"""
if dt > UNIX_EPOCH:
return (dt - UNIX_EPOCH).total_seconds()
return (UNIX_EPOCH - dt).total_seconds()
def cds_stamp(days, millis, micros, epoch=JPSS_EPOCH):
"""
CCSDS Day Segmented timecode to UTC datetime.
"""
return JPSS_EPOCH + timedelta(days=days, microseconds=1000 * millis + micros)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment