Skip to content
Snippets Groups Projects
Commit dbd71de6 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Add support for Aqua

parent 1c6eac48
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,7 @@ Console script entry points for CLI tools.
"""
from datetime import datetime
from edosl0util import split, dump
from edosl0util import split
def cmd_split():
......@@ -14,33 +14,8 @@ def cmd_split():
parser.add_argument('filepath')
args = parser.parse_args()
for time, blob in split.split(args.filepath, args.minutes):
for time, offset, count, blob in split.split(open(args.filepath), args.minutes):
stamp = datetime.utcfromtimestamp(time)
filepath = stamp.strftime(args.output_format)
print("writing %s" % filepath)
print("writing %d packets from offset %d to %s" % (count, offset, filepath))
open(filepath, 'wb', buffering=0).write(blob)
def cmd_dump():
from argparse import ArgumentParser
parser = ArgumentParser(description=dump.__doc__)
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('pds')
args = parser.parse_args()
def handler(pkt):
print(pkt.primary_header)
if pkt.secondary_header:
print("\t%s : %s" % (pkt.stamp, pkt.secondary_header))
stats = dump.dump(args.pds, handler=(handler if args.verbose else None))
print("First Packet: {}".format(stats['first_pkt']))
print("Last Packet: {}".format(stats['last_pkt']))
print("Packet Count: {:d}".format(stats['packet_count']))
print("Group Count: {:d}".format(stats['group_count']))
print("Missing Count: {:d}".format(stats['missing_count']))
apids = sorted(stats['apids'])
for apid in apids:
count = stats['apids'][apid]['count']
missing = stats['apids'][apid]['missing']
print("APID {:d} Count:{:d} Missing:{:d}".format(apid, count, missing))
......@@ -18,7 +18,6 @@ def dump(filepath, handler=None):
last_pkt = datetime(1900, 1, 1)
packets = 0
groups = 0
missing = 0
# sequence nums are per APID
apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0})
for pkt in PacketStream(open(filepath)):
......@@ -29,10 +28,6 @@ def dump(filepath, handler=None):
apids[h1.apid]['count'] += 1
seq = h1.source_sequence_count
expected_seq = (apids[h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID
if seq != expected_seq:
missing += 1
apids[h1.apid]['missing'] += 1
apids[h1.apid]['prev_seq'] = seq
if pkt.stamp:
......@@ -49,6 +44,5 @@ def dump(filepath, handler=None):
last_pkt=last_pkt,
packet_count=packets,
group_count=groups,
missing_count=missing,
apids=apids
)
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
CCSDS BaseStruct implementations for AQUA.
See:
ICD_Space_Ground_Aqua.pdf
GSFC 422-11-19-03
Last known location:
http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf
"""
import ctypes as c
from datetime import datetime, timedelta
from edos.ccsds import (
GROUP_FIRST,
GROUP_LAST,
GROUP_CONTINUING,
GROUP_STANDALONE
)
from edosl0util.timecode import cds_stamp
class BaseStruct(c.BigEndianStructure):
_pack_ = 1
def __str__(self):
return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >"
def __repr__(self):
fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_)
return '<%s (%s)>' % (self.__class__.__name__, fields)
class AquaCucTimecode(BaseStruct):
"""
EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an
agency defined timecode which is in this case (I think) specific to Aqua.
"""
_fields_ = [
('extension_flag', c.c_uint8, 1), # 1
('timecode_epoch', c.c_uint8, 3), # 010 == Jan 1, 1958
('coarse_time_count', c.c_uint8, 2), # 11 == 4 octets
('fine_time_count', c.c_uint8, 2), # 10 == 2 octets
('no_continuation_flag', c.c_uint8, 1), # 0
('leap_seconds', c.c_uint8, 7),
('seconds', c.c_uint32),
('sub_seconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
SUB_SECOND_UNITS = 15.2
def asdatetime(self):
"""
Return converted to UTC where leap seconds are as defined in `leap_seconds`.
FIXME: Verify this conversion is correct, specfically the use of
SUB_SECOND_UNIT.
"""
seconds = self.seconds + self.leap_seconds
micros = self.SUB_SECOND_UNITS * self.sub_seconds
return self.EPOCH + timedelta(seconds=seconds, microseconds=micros)
class DaySegmentedTimecode(BaseStruct):
_pack_ = 1
_fields_ = [
('days', c.c_uint16),
('milliseconds', c.c_uint32),
('microseconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
def asdatetime(self):
return cds_stamp(self.days, self.milliseconds, self.microseconds)
class GirdSecondaryHeader(BaseStruct):
_pack_ = 1
_fields_ = [
('flags', c.c_uint8),
('timecode', AquaCucTimecode),
]
class GiisSecondaryHeader(BaseStruct):
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode),
('quicklook_flag', c.c_uint8, 1),
('user_flags', c.c_uint8, 7)
]
class SpacecraftBusSecondaryHeader(BaseStruct):
_fields_ = [
('timecode', AquaCucTimecode)
]
def amsu_headers():
return {
# AMSU-A1
(257, GROUP_CONTINUING): GirdSecondaryHeader,
(259, GROUP_CONTINUING): GirdSecondaryHeader,
(260, GROUP_CONTINUING): GirdSecondaryHeader,
(261, GROUP_CONTINUING): GirdSecondaryHeader,
(261, GROUP_CONTINUING): GirdSecondaryHeader,
(262, GROUP_CONTINUING): GirdSecondaryHeader,
# AMSU-A2
(288, GROUP_CONTINUING): GirdSecondaryHeader,
(289, GROUP_CONTINUING): GirdSecondaryHeader,
(290, GROUP_STANDALONE): GirdSecondaryHeader,
}
def airs_headers():
return {
# AIRS
(404, GROUP_STANDALONE): GirdSecondaryHeader,
(405, GROUP_STANDALONE): GirdSecondaryHeader,
(406, GROUP_STANDALONE): GirdSecondaryHeader,
(407, GROUP_STANDALONE): GirdSecondaryHeader,
(414, GROUP_STANDALONE): GirdSecondaryHeader,
(415, GROUP_STANDALONE): GirdSecondaryHeader,
(416, GROUP_STANDALONE): GirdSecondaryHeader,
(417, GROUP_STANDALONE): GirdSecondaryHeader,
}
def hsb_headers():
return {
# HSB
(342, GROUP_STANDALONE): GirdSecondaryHeader,
}
def modis_headers():
return {
# MODIS
(64, GROUP_STANDALONE): GiisSecondaryHeader,
(64, GROUP_FIRST): GiisSecondaryHeader,
(64, GROUP_LAST): GiisSecondaryHeader,
(64, GROUP_CONTINUING): GiisSecondaryHeader,
(127, GROUP_STANDALONE): GiisSecondaryHeader,
}
def ceres_headers():
apids = (
# CERES+Y
141, 142, 143, 144,
# CERES-Y
157, 158, 159, 160
)
groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE)
return {(a, g): GiisSecondaryHeader for a in apids for g in groupings}
def gbad_headers():
return {
# GBAD
(957, GROUP_STANDALONE): SpacecraftBusSecondaryHeader
}
def aqua_headers():
"""
Aqua headers are looked up via their APID and grouping.
"""
headers = {}
headers.update(amsu_headers())
headers.update(airs_headers())
headers.update(hsb_headers())
headers.update(modis_headers())
headers.update(ceres_headers())
headers.update(gbad_headers())
return headers
class JpssSecondaryHeader(BaseStruct):
"""Secondary Header for a JSPP CCSDS packet that is not part of a packet
sequence.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode)
]
class JpssFirstSecondaryHeader(BaseStruct):
"""Secondary Header for a JSPP CCSDS packet that is the first packet in a
packet sequence. Following packets that are part of this sequence will not
have a secondary header.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode),
('packet_count', c.c_uint8),
('_spare', c.c_uint8)
]
def jpss_header_lookup(primary_header):
grouping = primary_header.sequence_grouping
if grouping == GROUP_FIRST:
return JpssFirstSecondaryHeader
elif grouping == GROUP_CONTINUING:
return JpssSecondaryHeader
elif grouping == GROUP_LAST:
return JpssSecondaryHeader
elif grouping == GROUP_STANDALONE:
return JpssSecondaryHeader
_aqua_headers = aqua_headers()
def aqua_header_lookup(primary_header):
apid = primary_header.apid
grouping = primary_header.sequence_grouping
return _aqua_headers.get((apid, grouping))
......@@ -31,28 +31,6 @@ def _is_valid_group(packets):
and packets[0].apid == packets[-1].apid
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(open(filea)):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(open(fileb)):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def make_streams(filepaths):
filepaths.sort(_cmp_by_first_pkt)
return [PacketStream(io.open(f, 'rb')) for f in filepaths]
def merge(streams, output=sys.stdout):
last_apid = None
last_stamp = datetime(1900, 1, 1)
......
......@@ -4,16 +4,16 @@ from edosl0util.timecode import unixtime
from edosl0util.stream import PacketStream
def split(filepath, minutes):
def split(fobj, minutes):
"""Split a VIIRS L0 PDS file into files based on their scan time mod the
number of minutes provided.
A single output file is buffered in memory until it is written.
"""
buf = array.array('c') # buffer for a single data file until it is written
buf = array.array('B') # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes'
for pkt in PacketStream(open(filepath)):
pkt_count = 0
original_offset = fobj.tell()
for pkt in PacketStream(fobj):
# do the bucketing based on secondary header timestamps
if pkt.stamp:
hdrtime = unixtime(pkt.stamp)
......@@ -23,10 +23,15 @@ def split(filepath, minutes):
cur_bucket = pkt_bucket
if pkt_bucket > cur_bucket:
yield cur_bucket, buf.tostring()
offset = fobj.tell() - original_offset
yield cur_bucket, offset, pkt_count, buf.tostring()
pkt_count = 0
buf = array.array('c')
cur_bucket = pkt_bucket
# this is an append operation
buf.fromstring(pkt.blob)
pkt_count += 1
yield cur_bucket, buf.tostring()
\ No newline at end of file
offset = fobj.tell() - original_offset
yield cur_bucket, offset, pkt_count, buf.tostring()
\ No newline at end of file
......@@ -12,15 +12,10 @@ from edos.ccsds import (
GROUP_STANDALONE
)
from edosl0util.timecode import cds_stamp
from edosl0util import headers
LOG = logging.getLogger(__name__)
secondary_header_impls = {
GROUP_FIRST: JpssFirstSecondaryHeader,
GROUP_STANDALONE: JpssSecondaryHeader
}
class MissingPackets(Exception):
"""
......@@ -77,16 +72,31 @@ class SequenceId(object):
return self.val
def default_header_lookup(primary_header):
return (
headers.aqua_header_lookup(primary_header) or
headers.jpss_header_lookup(primary_header)
)
class PacketStream(object):
"""
Iterates over all CCSDS data producing `Packet` tuples. Only as much data is
necessary to generate a single packet is read into memory at a time.
"""
def __init__(self, fobj, fail_on_missing=False):
def __init__(self, fobj, fail_on_missing=False, header_lookup=default_header_lookup):
"""
:param fobj: File-like object to read stream from
:keyword fail_on_missing: If True raise a MissingPackets error when packets
are missing.
:keyword header_lookup: A function that takes a PrimaryHeader and returns
a secondary header implementation that will be used.
"""
self._input = fobj
self._seek_cache = deque()
self._seqids = {}
self._fail_on_missing = fail_on_missing
self._header_lookup = header_lookup
def __str__(self):
return '<PacketStream pos=%d %s>' % (self.tell(), self._input)
......@@ -103,9 +113,6 @@ class PacketStream(object):
raise StopIteration
return data
def tell(self):
return self._input.tell()
def next(self):
"""
Return the next packet in the stream.
......@@ -130,10 +137,11 @@ class PacketStream(object):
stamp = None
h2 = None # only first/standalone have h2 impls
H2Impl = secondary_header_impls.get(h1.sequence_grouping)
H2Impl = self._header_lookup(h1)
if H2Impl:
h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)])
stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds)
if hasattr(h2, 'timecode') and hasattr(h2.timecode, 'asdatetime'):
stamp = h2.timecode.asdatetime()
packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2)
......@@ -169,3 +177,25 @@ class PacketStream(object):
while packet.apid != apid:
packet = self.next()
self.push_back(packet)
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in PacketStream(open(filea)):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in PacketStream(open(fileb)):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def make_streams(filepaths):
filepaths.sort(_cmp_by_first_pkt)
return [PacketStream(io.open(f, 'rb')) for f in filepaths]
......@@ -15,6 +15,5 @@ setup(
entry_points="""
[console_scripts]
edosl0split = edosl0util.cli:cmd_split
edosl0dump = edosl0util.cli:cmd_dump
"""
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment