Something went wrong on our end
-
Greg Quinn authoredGreg Quinn authored
headers.py 7.13 KiB
# encoding: utf-8
"""
Structures for CCSDS Primary and Secondary headers
See:
* ICD_Space_Ground_Aqua.pdf - GSFC 422-11-19-03
- http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf
* CCSDS Space Packet Protocol
- http://public.ccsds.org/publications/archive/133x0b1c2.pdf
* CCSDS Time Code Formats
- http://public.ccsds.org/publications/archive/301x0b4e1.pdf
"""
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
import ctypes as c
from datetime import datetime, timedelta
from edosl0util.timecode import cds_stamp, cds_to_timestamp
GROUP_FIRST = 0b01
GROUP_LAST = 0b10
GROUP_CONTINUING = 0b00
GROUP_STANDALONE = 0b11
class BaseStruct(c.BigEndianStructure):
_pack_ = 1
def __str__(self):
return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >"
def __repr__(self):
fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_)
return '<%s (%s)>' % (self.__class__.__name__, fields)
def __eq__(self, other):
return all(getattr(self, f[0]) == getattr(other, f[0]) for f in self._fields_)
class PrimaryHeader(BaseStruct):
"""
CCSDS Primary Header.
"""
_fields_ = [
('version_number', c.c_uint8, 3), # == 0
('type_indicator', c.c_uint8, 1),
('secondary_header_flag', c.c_uint8, 1),
('apid', c.c_uint16, 11),
('sequence_grouping', c.c_uint16, 2),
('source_sequence_count', c.c_uint16, 14),
('data_length_minus1', c.c_uint16) # octet count minus one
]
PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader)
class Timecode(BaseStruct):
"""
Secondary header timecode baseclass.
"""
def astimestamp(self):
raise NotImplementedError()
def asdatetime(self):
raise NotImplementedError()
class AquaCucTimecode(Timecode):
"""
EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an
agency defined timecode which is in this case (I think) specific to Aqua.
NOTE: I'm betting this will work for Terra as well but I do not have any
documentation for Terra.
FIXME: Not tested or validated!!
"""
_fields_ = [
('extension_flag', c.c_uint8, 1),
('timecode_epoch', c.c_uint8, 3),
('coarse_time_count', c.c_uint8, 2),
('fine_time_count', c.c_uint8, 2),
('no_continuation_flag', c.c_uint8, 1),
('leap_seconds', c.c_uint8, 7),
('seconds', c.c_uint32),
('sub_seconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
SUB_SECOND_UNITS = 15.2
def astimestamp(self):
tc = self.timecode
return cds_to_timestamp(tc.days, tc.milliseconds, tc.microseconds, self.EPOCH_SECS)
def asdatetime(self):
"""
Return converted to UTC where leap seconds are as defined in `leap_seconds`.
FIXME: Verify this conversion is correct, specfically the use of
SUB_SECOND_UNIT.
"""
seconds = self.seconds + self.leap_seconds
micros = self.SUB_SECOND_UNITS * self.sub_seconds
return self.EPOCH + timedelta(seconds=seconds, microseconds=micros)
class DaySegmentedTimecode(Timecode):
"""
CCSDS Day Segmented Timecode
"""
_pack_ = 1
_fields_ = [
('days', c.c_uint16),
('milliseconds', c.c_uint32),
('microseconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
def astimestamp(self):
return cds_to_timestamp(self.days, self.milliseconds, self.microseconds, self.EPOCH_SECS)
def asdatetime(self):
return cds_stamp(self.days, self.milliseconds, self.microseconds)
class AquaGirdSecondaryHeader(BaseStruct):
_fields_ = [
('flags', c.c_uint8),
('timecode', AquaCucTimecode),
]
class AquaGiisSecondaryHeader(BaseStruct):
_fields_ = [
('timecode', DaySegmentedTimecode),
('quicklook_flag', c.c_uint8, 1),
('user_flags', c.c_uint8, 7)
]
class AquaSpacecraftBusSecondaryHeader(BaseStruct):
_fields_ = [
('timecode', AquaCucTimecode)
]
class JpssSecondaryHeader(BaseStruct):
"""
Secondary header using Day Segmented timecodes.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode)
]
class JpssFirstSecondaryHeader(BaseStruct):
"""
Secondary header using Day Segmented timecodes and with a packet count.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode),
('packet_count', c.c_uint8),
('_spare', c.c_uint8)
]
_jpss_headers = {
GROUP_FIRST: JpssFirstSecondaryHeader,
GROUP_CONTINUING: None,
GROUP_LAST: None,
GROUP_STANDALONE: JpssSecondaryHeader,
}
def jpss_header_lookup(primary_header):
return _jpss_headers.get(primary_header.sequence_grouping)
def amsu_headers():
apids = [
# AMSU-A1
257, 259, 260, 261, 262,
# AMSU-A2
288, 289, 290
]
flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE]
return {(apid, flag): AquaGirdSecondaryHeader
for apid in apids
for flag in flags}
def airs_headers():
return {
# AIRS
(404, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(405, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(406, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(407, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(414, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(415, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(416, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(417, GROUP_STANDALONE): AquaGirdSecondaryHeader,
}
def hsb_headers():
return {
# HSB
(342, GROUP_STANDALONE): AquaGirdSecondaryHeader,
}
def modis_headers():
return {
# MODIS
(64, GROUP_STANDALONE): AquaGiisSecondaryHeader,
(64, GROUP_FIRST): AquaGiisSecondaryHeader,
(64, GROUP_LAST): AquaGiisSecondaryHeader,
(64, GROUP_CONTINUING): AquaGiisSecondaryHeader,
(127, GROUP_STANDALONE): AquaGiisSecondaryHeader,
}
def ceres_headers():
apids = (
# CERES+Y
141, 142, 143, 144,
# CERES-Y
157, 158, 159, 160
)
groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE)
return {(a, g): AquaGiisSecondaryHeader for a in apids for g in groupings}
def gbad_headers():
return {
# GBAD
(957, GROUP_STANDALONE): AquaSpacecraftBusSecondaryHeader
}
def aqua_headers():
"""
Aqua headers are looked up via their APID and grouping.
"""
headers = {}
headers.update(amsu_headers())
headers.update(airs_headers())
headers.update(hsb_headers())
headers.update(modis_headers())
headers.update(ceres_headers())
headers.update(gbad_headers())
return headers
def aqua_header_lookup(primary_header):
apid = primary_header.apid
grouping = primary_header.sequence_grouping
return _aqua_headers.get((apid, grouping))
_aqua_headers = aqua_headers()