Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
headers.py 6.17 KiB
# -*- coding: utf-8 -*-
"""
CCSDS BaseStruct implementations for AQUA.
See:
ICD_Space_Ground_Aqua.pdf
GSFC 422-11-19-03
Last known location:
http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf
"""
import ctypes as c
from datetime import datetime, timedelta
from edos.ccsds import (
GROUP_FIRST,
GROUP_LAST,
GROUP_CONTINUING,
GROUP_STANDALONE
)
from edosl0util.timecode import cds_stamp, cds_to_timestamp
class BaseStruct(c.BigEndianStructure):
_pack_ = 1
def __str__(self):
return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >"
def __repr__(self):
fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_)
return '<%s (%s)>' % (self.__class__.__name__, fields)
class AquaCucTimecode(BaseStruct):
"""
EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an
agency defined timecode which is in this case (I think) specific to Aqua.
"""
_fields_ = [
('extension_flag', c.c_uint8, 1), # 1
('timecode_epoch', c.c_uint8, 3), # 010 == Jan 1, 1958
('coarse_time_count', c.c_uint8, 2), # 11 == 4 octets
('fine_time_count', c.c_uint8, 2), # 10 == 2 octets
('no_continuation_flag', c.c_uint8, 1), # 0
('leap_seconds', c.c_uint8, 7),
('seconds', c.c_uint32),
('sub_seconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
SUB_SECOND_UNITS = 15.2
def astimestamp(self):
tc = self.timecode
return cds_to_timestamp(tc.days, tc.milliseconds, tc.microseconds, self.EPOCH_SECS)
def asdatetime(self):
"""
Return converted to UTC where leap seconds are as defined in `leap_seconds`.
FIXME: Verify this conversion is correct, specfically the use of
SUB_SECOND_UNIT.
"""
seconds = self.seconds + self.leap_seconds
micros = self.SUB_SECOND_UNITS * self.sub_seconds
return self.EPOCH + timedelta(seconds=seconds, microseconds=micros)
class DaySegmentedTimecode(BaseStruct):
_pack_ = 1
_fields_ = [
('days', c.c_uint16),
('milliseconds', c.c_uint32),
('microseconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
def astimestamp(self):
return cds_to_timestamp(self.days, self.milliseconds, self.microseconds, self.EPOCH_SECS)
def asdatetime(self):
return cds_stamp(self.days, self.milliseconds, self.microseconds)
class GirdSecondaryHeader(BaseStruct):
_pack_ = 1
_fields_ = [
('flags', c.c_uint8),
('timecode', AquaCucTimecode),
]
class GiisSecondaryHeader(BaseStruct):
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode),
('quicklook_flag', c.c_uint8, 1),
('user_flags', c.c_uint8, 7)
]
class SpacecraftBusSecondaryHeader(BaseStruct):
_fields_ = [
('timecode', AquaCucTimecode)
]
def amsu_headers():
apids = [
# AMSU-A1
257, 259, 260, 261, 262,
# AMSU-A2
288, 289, 290
]
flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE]
return {(apid, flag): GirdSecondaryHeader
for apid in apids
for flag in flags}
def airs_headers():
return {
# AIRS
(404, GROUP_STANDALONE): GirdSecondaryHeader,
(405, GROUP_STANDALONE): GirdSecondaryHeader,
(406, GROUP_STANDALONE): GirdSecondaryHeader,
(407, GROUP_STANDALONE): GirdSecondaryHeader,
(414, GROUP_STANDALONE): GirdSecondaryHeader,
(415, GROUP_STANDALONE): GirdSecondaryHeader,
(416, GROUP_STANDALONE): GirdSecondaryHeader,
(417, GROUP_STANDALONE): GirdSecondaryHeader,
}
def hsb_headers():
return {
# HSB
(342, GROUP_STANDALONE): GirdSecondaryHeader,
}
def modis_headers():
return {
# MODIS
(64, GROUP_STANDALONE): GiisSecondaryHeader,
(64, GROUP_FIRST): GiisSecondaryHeader,
(64, GROUP_LAST): GiisSecondaryHeader,
(64, GROUP_CONTINUING): GiisSecondaryHeader,
(127, GROUP_STANDALONE): GiisSecondaryHeader,
}
def ceres_headers():
apids = (
# CERES+Y
141, 142, 143, 144,
# CERES-Y
157, 158, 159, 160
)
groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE)
return {(a, g): GiisSecondaryHeader for a in apids for g in groupings}
def gbad_headers():
return {
# GBAD
(957, GROUP_STANDALONE): SpacecraftBusSecondaryHeader
}
def aqua_headers():
"""
Aqua headers are looked up via their APID and grouping.
"""
headers = {}
headers.update(amsu_headers())
headers.update(airs_headers())
headers.update(hsb_headers())
headers.update(modis_headers())
headers.update(ceres_headers())
headers.update(gbad_headers())
return headers
class JpssSecondaryHeader(BaseStruct):
"""Secondary Header for a JSPP CCSDS packet that is not part of a packet
sequence.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode)
]
class JpssFirstSecondaryHeader(BaseStruct):
"""Secondary Header for a JSPP CCSDS packet that is the first packet in a
packet sequence. Following packets that are part of this sequence will not
have a secondary header.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode),
('packet_count', c.c_uint8),
('_spare', c.c_uint8)
]
def jpss_header_lookup(primary_header):
grouping = primary_header.sequence_grouping
if grouping == GROUP_FIRST:
return JpssFirstSecondaryHeader
#elif grouping == GROUP_CONTINUING:
# return JpssSecondaryHeader
#elif grouping == GROUP_LAST:
# return JpssSecondaryHeader
elif grouping == GROUP_STANDALONE:
return JpssSecondaryHeader
_aqua_headers = aqua_headers()
def aqua_header_lookup(primary_header): # noqa
apid = primary_header.apid
grouping = primary_header.sequence_grouping
return _aqua_headers.get((apid, grouping))