Skip to content
Snippets Groups Projects
Commit ca5fe382 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

.

parent ce419001
No related branches found
No related tags found
No related merge requests found
...@@ -29,9 +29,9 @@ def cmd_dump(): ...@@ -29,9 +29,9 @@ def cmd_dump():
args = parser.parse_args() args = parser.parse_args()
def handler(pkt): def handler(pkt):
print(pkt.h1) print(pkt.primary_header)
if pkt.h2: if pkt.secondary_header:
print("\t%s : %s" % (pkt.stamp, pkt.h2)) print("\t%s : %s" % (pkt.stamp, pkt.secondary_header))
stats = dump.dump(args.pds, handler=(handler if args.verbose else None)) stats = dump.dump(args.pds, handler=(handler if args.verbose else None))
print("First Packet: {}".format(stats['first_pkt'])) print("First Packet: {}".format(stats['first_pkt']))
......
...@@ -3,7 +3,7 @@ from datetime import datetime ...@@ -3,7 +3,7 @@ from datetime import datetime
from edos.ccsds import GROUP_FIRST from edos.ccsds import GROUP_FIRST
from edosl0util.stream import packetstream from edosl0util.stream import PacketStream
MAX_SEQ_ID = 2**14 MAX_SEQ_ID = 2**14
...@@ -21,18 +21,19 @@ def dump(filepath, handler=None): ...@@ -21,18 +21,19 @@ def dump(filepath, handler=None):
missing = 0 missing = 0
# sequence nums are per APID # sequence nums are per APID
apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0})
for pkt in packetstream(filepath): for pkt in PacketStream(filepath):
packets += 1 packets += 1
if pkt.h1.sequence_grouping == GROUP_FIRST: h1 = pkt.primary_header
if h1.sequence_grouping == GROUP_FIRST:
groups += 1 groups += 1
apids[pkt.h1.apid]['count'] += 1 apids[h1.apid]['count'] += 1
seq = pkt.h1.source_sequence_count seq = h1.source_sequence_count
expected_seq = (apids[pkt.h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID expected_seq = (apids[h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID
if seq != expected_seq: if seq != expected_seq:
missing += 1 missing += 1
apids[pkt.h1.apid]['missing'] += 1 apids[h1.apid]['missing'] += 1
apids[pkt.h1.apid]['prev_seq'] = seq apids[h1.apid]['prev_seq'] = seq
if pkt.stamp: if pkt.stamp:
first_pkt = min(pkt.stamp, first_pkt) first_pkt = min(pkt.stamp, first_pkt)
......
import io import array
from cStringIO import StringIO
from edosl0util.timecode import cds_stamp, unixtime from edosl0util.timecode import unixtime
from edosl0util.stream import packetstream from edosl0util.stream import PacketStream
def time_to_roll(time, minutes):
return time - time % (minutes * 60)
def split(filepath, minutes): def split(filepath, minutes):
...@@ -15,24 +10,23 @@ def split(filepath, minutes): ...@@ -15,24 +10,23 @@ def split(filepath, minutes):
A single output file is buffered in memory until it is written. A single output file is buffered in memory until it is written.
""" """
buf = StringIO() # buffer for a single data file until it is written buf = array.array('c') # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes' cur_bucket = 0 # cur time bucket of size 'minutes'
for pkt in packetstream(filepath): for pkt in PacketStream(filepath):
# do the bucketing based on secondary header timestamps # do the bucketing based on secondary header timestamps
if pkt.h2: if pkt.stamp:
stamp = cds_stamp(pkt.h2.day, pkt.h2.milliseconds, pkt.h2.microseconds) hdrtime = unixtime(pkt.stamp)
hdrtime = unixtime(stamp)
pkt_bucket = hdrtime - hdrtime % (minutes * 60) pkt_bucket = hdrtime - hdrtime % (minutes * 60)
if cur_bucket == 0: if cur_bucket == 0:
cur_bucket = pkt_bucket cur_bucket = pkt_bucket
if pkt_bucket > cur_bucket: if pkt_bucket > cur_bucket:
yield cur_bucket, buf.getvalue() yield cur_bucket, buf.tostring()
buf = StringIO() buf = array.array('c')
cur_bucket = pkt_bucket cur_bucket = pkt_bucket
buf.write(pkt.data) buf.fromstring(pkt.blob)
yield cur_bucket, buf.getvalue() yield cur_bucket, buf.tostring()
\ No newline at end of file \ No newline at end of file
import io import io
from collections import namedtuple from collections import namedtuple, deque, defaultdict
from edos.ccsds import ( from edos.ccsds import (
c, c,
...@@ -8,14 +8,13 @@ from edos.ccsds import ( ...@@ -8,14 +8,13 @@ from edos.ccsds import (
JpssFirstSecondaryHeader, JpssFirstSecondaryHeader,
JpssSecondaryHeader, JpssSecondaryHeader,
GROUP_FIRST, GROUP_FIRST,
GROUP_LAST,
GROUP_STANDALONE GROUP_STANDALONE
) )
from edosl0util.timecode import cds_stamp from edosl0util.timecode import cds_stamp
MAX_SEQ_ID = 2**14
Packet = namedtuple('Packet', ('h1', 'h2', 'stamp', 'data'))
secondary_header_impls = { secondary_header_impls = {
GROUP_FIRST: JpssFirstSecondaryHeader, GROUP_FIRST: JpssFirstSecondaryHeader,
...@@ -23,55 +22,202 @@ secondary_header_impls = { ...@@ -23,55 +22,202 @@ secondary_header_impls = {
} }
def packetstream(filepath): class Packet(namedtuple('Packet', ('primary_header', 'secondary_header',
'apid', 'seqid', 'stamp', 'blob'))):
def __str__(self):
return '<Packet apid=%s stamp=%s>' % (self.apid, self.stamp)
__repr__ = __str__
class PacketGroup(namedtuple('PacketGroup', ('apid', 'stamp', 'packets', 'blob'))):
def __str__(self):
return '<Group apid=%s stamp=%s packets=%d>' % (
self.apid, self.packets[0].stamp if self.packets else '', len(self.packets)
)
__repr__ = __str__
class PacketStream(object):
""" """
Return a generator of `Packet` tuples that will contain the primary header Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
struct and all the packet data, including the header data. necessary to generate a single packet is read into memory at a time.
""" """
with io.open(filepath, mode='rb', buffering=0) as datain: def __init__(self, filepath):
buf = datain.read(c.sizeof(PrimaryHeader)) self._filepath = filepath
while buf: self._input = io.open(filepath, mode='rb')
data = buf
h1 = PrimaryHeader.from_buffer_copy(buf) def __iter__(self):
return self
buf = datain.read(h1.data_length_minus1 + 1) def _read(self, size):
if not buf: data = self._input.read(size)
break if not data:
data += buf raise StopIteration
return data
def next(self):
buf = self._read(c.sizeof(PrimaryHeader))
data = buf
stamp = None h1 = PrimaryHeader.from_buffer_copy(buf)
h2 = None # only first/standalone have h2 impls
H2Impl = secondary_header_impls.get(h1.sequence_grouping)
if H2Impl:
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
yield Packet(h1, h2, stamp, data) buf = self._read(h1.data_length_minus1 + 1)
data += buf
buf = datain.read(c.sizeof(PrimaryHeader)) stamp = None
h2 = None # only first/standalone have h2 impls
H2Impl = secondary_header_impls.get(h1.sequence_grouping)
if H2Impl:
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
return Packet(h1, h2, h1.apid, h1.source_sequence_count, stamp, data)
"""
pdsfiles = sort_pdsfiles_by_pkttime(filenames) # FUTURE: Use an iter with a peek function so we don't have to keep items that
# were pushed back in memory.
class _IndecisiveIter(object):
"""
Iterator which allows you to put an item back. Warning, items that are pushed
back are kept in memory.
"""
def __init__(self, iterable):
self.iterable = iterable
self._cache = deque()
apid = None def push_back(self, item):
seqnum = None self._cache.append(item)
for pdsfile in pdsfiles:
try:
pdsfile.seekto(apid, seqnum)
for packet in pdsfile.read():
output.write(packet)
except MissingPacket as err: def __iter__(self):
apid = err.apid return self
seqnum = err.seqnum
print("missing packet apid:{:d} seqnum:{:d} in {:s}".format(err.apid, err.seqnum, pdsfile))
except NoMorePackets: def next(self):
print("{} has no more packets".format(pdsfile)) if len(self._cache):
pdsfiles.pop(pdsfile) return self._cache.pop()
return self.iterable.next()
""" class PacketCollector(object):
"""
Collects packets from a PacketStream into PacketGroup tuples, if applicable.
Grouping uses PrimaryHeader.sequence_grouping flag such that any packet
between GROUP_FIRST and GROUP_LAST are considered part of the same group.
"""
def __init__(self, stream):
self._stream = stream
def __iter__(self):
return self
def next(self):
packet = self._stream.next()
grouping = packet.primary_header.sequence_grouping
if grouping == GROUP_FIRST:
blob = bytearray(packet.blob)
grppkts = deque([packet])
while grouping != GROUP_LAST:
packet = self._stream.next()
assert not packet.stamp, "Expected only first packet in group to have stamp"
blob.extend(packet.blob)
grppkts.append(packet)
grouping = packet.primary_header.sequence_grouping
return PacketGroup(packet[0].apid, grppkts[0].stamp, grppkts, str(blob))
return packet
class PacketWalker(object):
"""
Provides additional features for a PacketStream, such as seeking and reading
packets until a missing sequence id is encountered.
"""
def __init__(self, stream):
self._stream = _IndecisiveIter(stream)
# seqnums for each apid for locating missing pkts
self._seqcache = {}
# stamps for each apid so we can seek
self._stampcache = defaultdict(lambda: datetime(1900, 1, 1))
self._finished = False
def _next_packet(self):
try:
packet = self._stream.next()
if packet.stamp:
self._stampcache[packet.apid] = packet.stamp
return packet
except StopIteration:
self._finished = True
raise
def end_of_stream(self):
return self._finished
def seek(self, stamp=None, apid=None):
"""
Seek forward to a position in the packet stream where stamp is >= the
secondary header time. If apid is provided additionally seek to that
apid.
"""
if self.end_of_stream():
return
try:
if stamp:
self._seek_to_stamp(stamp)
if apid is not None:
self._seek_to_apid(apid)
except StopIteration:
pass
def _seek_to_stamp(self, stamp):
while True:
packet = self._next_packet()
hdr = packet.primary_header
if packet.stamp and packet.stamp > self._stampcache[hdr.apid]:
self._stream.push_back(packet)
return
def _seek_to_apid(self, apid):
while True:
packet = self._next_packet()
hdr = packet.primary_header
if hdr.apid == apid:
self._stream.push_back(packet)
return
def _check_if_missing(self, packet):
"""
Return True if this packet does not follow in the current packet in
sequence number where sequence number is per APID.
"""
prev_seqid = self._seqcache.get(packet.apid)
seqid = packet.seqid
missing = False
if prev_seqid:
missing = seqid != ((prev_seqid + 1) % MAX_SEQ_ID)
self._seqcache[packet.apid] = seqid
return missing
def read(self):
"""
Read all remaining packets from current position.
"""
while not self._finished:
yield self._next_packet()
def read_until_missing(self):
"""
Read packets until the first missing sequence number is encountered. The
next packet read will be the first packet after the missing.
"""
while not self.end_of_stream():
packet = self._next_packet()
# checking for a seqid gap requires getting the packet after the
# gap, therefore we have to put the packet back to be retreived next
# time if a missing packet was detected
if self._check_if_missing(packet):
self._stream.push_back(packet)
break
yield packet
...@@ -14,7 +14,7 @@ setup( ...@@ -14,7 +14,7 @@ setup(
], ],
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
l0split = edosl0util.cli:cmd_split edosl0split = edosl0util.cli:cmd_split
edosl0dump = edosl0util.cli:cmd_dump edosl0dump = edosl0util.cli:cmd_dump
""" """
) )
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment