Something went wrong on our end
-
Bruce Flynn authored
Using stamp could possibly cause issues due to UTC datetimes not being aware of leap seconds.
Bruce Flynn authoredUsing stamp could possibly cause issues due to UTC datetimes not being aware of leap seconds.
headers.py 6.89 KiB
# encoding: utf-8
"""
Structures for CCSDS Primary and Secondary headers
See:
* ICD_Space_Ground_Aqua.pdf - GSFC 422-11-19-03
- http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf
* CCSDS Space Packet Protocol
- http://public.ccsds.org/publications/archive/133x0b1c2.pdf
* CCSDS Time Code Formats
- http://public.ccsds.org/publications/archive/301x0b4e1.pdf
"""
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
import ctypes as c
from datetime import datetime
from edosl0util.timecode import cds_to_dt
GROUP_FIRST = 0b01
GROUP_LAST = 0b10
GROUP_CONTINUING = 0b00
GROUP_STANDALONE = 0b11
class BaseStruct(c.BigEndianStructure):
_pack_ = 1
def __str__(self):
return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >"
def __repr__(self):
fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_)
return '<%s (%s)>' % (self.__class__.__name__, fields)
def __eq__(self, other):
return all(getattr(self, f[0]) == getattr(other, f[0]) for f in self._fields_)
class PrimaryHeader(BaseStruct):
"""
CCSDS Primary Header.
"""
_fields_ = [
('version_number', c.c_uint8, 3), # == 0
('type_indicator', c.c_uint8, 1),
('secondary_header_flag', c.c_uint8, 1),
('apid', c.c_uint16, 11),
('sequence_grouping', c.c_uint16, 2),
('source_sequence_count', c.c_uint16, 14),
('data_length_minus1', c.c_uint16) # octet count minus one
]
PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader)
class Timecode(BaseStruct):
"""
Secondary header timecode baseclass.
"""
def __repr__(self):
return str(self.asdatetime())
def day_segmented_timecode(self):
raise NotImplementedError()
def asdatetime(self):
"""
CDS Timecode a UTC datetime object.
"""
return cds_to_dt(*self.day_segmented_timecode())
class AquaCucTimecode(Timecode):
"""
EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an
agency defined timecode which is in this case (I think) specific to Aqua.
NOTE: I'm betting this will work for Terra as well but I do not have any
documentation for Terra.
FIXME: Not tested or validated!!
"""
_fields_ = [
('extension_flag', c.c_uint8, 1),
('timecode_epoch', c.c_uint8, 3),
('coarse_time_count', c.c_uint8, 2),
('fine_time_count', c.c_uint8, 2),
('no_continuation_flag', c.c_uint8, 1),
('leap_seconds', c.c_uint8, 7),
('seconds', c.c_uint32),
('sub_seconds', c.c_uint16)
]
EPOCH = datetime(1958, 1, 1)
EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
SUB_SECOND_UNITS = 15.2
def day_segmented_timecode(self):
micros = self.SUB_SECOND_UNITS * self.sub_seconds
seconds = self.seconds + self.leap_seconds
return ((seconds - self.leap_seconds) // 86400,
micros // 1e3,
micros % 1e3)
class DaySegmentedTimecode(Timecode):
"""
CCSDS Day Segmented Timecode
"""
_pack_ = 1
_fields_ = [
('days', c.c_uint16),
('milliseconds', c.c_uint32),
('microseconds', c.c_uint16)
]
def day_segmented_timecode(self):
return (self.days, self.milliseconds, self.microseconds)
class AquaGirdSecondaryHeader(BaseStruct):
_fields_ = [
('flags', c.c_uint8),
('timecode', AquaCucTimecode),
]
class AquaGiisSecondaryHeader(BaseStruct):
_fields_ = [
('timecode', DaySegmentedTimecode),
('quicklook_flag', c.c_uint8, 1),
('user_flags', c.c_uint8, 7)
]
class AquaSpacecraftBusSecondaryHeader(BaseStruct):
_fields_ = [
('timecode', AquaCucTimecode)
]
class JpssSecondaryHeader(BaseStruct):
"""
Secondary header using Day Segmented timecodes.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode)
]
class JpssFirstSecondaryHeader(BaseStruct):
"""
Secondary header using Day Segmented timecodes and with a packet count.
"""
_pack_ = 1
_fields_ = [
('timecode', DaySegmentedTimecode),
('packet_count', c.c_uint8),
('_spare', c.c_uint8)
]
class ViirsPacketId(BaseStruct):
_pack_ = 1
_fields_ = [
('scan_number', c.c_uint32),
('packet_time', DaySegmentedTimecode),
]
_jpss_headers = {
GROUP_FIRST: JpssFirstSecondaryHeader,
GROUP_CONTINUING: None,
GROUP_LAST: None,
GROUP_STANDALONE: JpssSecondaryHeader,
}
def jpss_header_lookup(primary_header):
return _jpss_headers.get(primary_header.sequence_grouping)
def amsu_headers():
apids = [
# AMSU-A1
257, 259, 260, 261, 262,
# AMSU-A2
288, 289, 290
]
flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE]
return {(apid, flag): AquaGirdSecondaryHeader
for apid in apids
for flag in flags}
def airs_headers():
return {
# AIRS
(404, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(405, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(406, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(407, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(414, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(415, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(416, GROUP_STANDALONE): AquaGirdSecondaryHeader,
(417, GROUP_STANDALONE): AquaGirdSecondaryHeader,
}
def hsb_headers():
return {
# HSB
(342, GROUP_STANDALONE): AquaGirdSecondaryHeader,
}
def modis_headers():
return {
# MODIS
(64, GROUP_STANDALONE): AquaGiisSecondaryHeader,
(64, GROUP_FIRST): AquaGiisSecondaryHeader,
(64, GROUP_LAST): AquaGiisSecondaryHeader,
(64, GROUP_CONTINUING): AquaGiisSecondaryHeader,
(127, GROUP_STANDALONE): AquaGiisSecondaryHeader,
}
def ceres_headers():
apids = (
# CERES+Y
141, 142, 143, 144,
# CERES-Y
157, 158, 159, 160
)
groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE)
return {(a, g): AquaGiisSecondaryHeader for a in apids for g in groupings}
def gbad_headers():
return {
# GBAD
(957, GROUP_STANDALONE): AquaSpacecraftBusSecondaryHeader
}
def aqua_headers():
"""
Aqua headers are looked up via their APID and grouping.
"""
headers = {}
headers.update(amsu_headers())
headers.update(airs_headers())
headers.update(hsb_headers())
headers.update(modis_headers())
headers.update(ceres_headers())
headers.update(gbad_headers())
return headers
def aqua_header_lookup(primary_header):
apid = primary_header.apid
grouping = primary_header.sequence_grouping
return _aqua_headers.get((apid, grouping))
_aqua_headers = aqua_headers()