# encoding: utf-8 """ Structures for CCSDS Primary and Secondary headers See: * ICD_Space_Ground_Aqua.pdf - GSFC 422-11-19-03 - http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf * CCSDS Space Packet Protocol - http://public.ccsds.org/publications/archive/133x0b1c2.pdf * CCSDS Time Code Formats - http://public.ccsds.org/publications/archive/301x0b4e1.pdf """ __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." import ctypes as c from datetime import datetime from edosl0util.timecode import cds_to_dt GROUP_FIRST = 0b01 GROUP_LAST = 0b10 GROUP_CONTINUING = 0b00 GROUP_STANDALONE = 0b11 class BaseStruct(c.BigEndianStructure): _pack_ = 1 def __str__(self): return ( "<" + " ".join("%s=%s" % (f[0], getattr(self, f[0])) for f in self._fields_) + " >" ) def __repr__(self): fields = ", ".join( "%s=%s" % (f[0], repr(getattr(self, f[0]))) for f in self._fields_ ) return "<%s (%s)>" % (self.__class__.__name__, fields) def __eq__(self, other): return all(getattr(self, f[0]) == getattr(other, f[0]) for f in self._fields_) class PrimaryHeader(BaseStruct): """ CCSDS Primary Header. """ _fields_ = [ ("version_number", c.c_uint8, 3), # == 0 ("type_indicator", c.c_uint8, 1), ("secondary_header_flag", c.c_uint8, 1), ("apid", c.c_uint16, 11), ("sequence_grouping", c.c_uint16, 2), ("source_sequence_count", c.c_uint16, 14), ("data_length_minus1", c.c_uint16), # octet count minus one ] PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader) class Timecode(BaseStruct): """ Secondary header timecode baseclass. """ def __repr__(self): return str(self.asdatetime()) def day_segmented_timecode(self): raise NotImplementedError() def asdatetime(self): """ CDS Timecode a UTC datetime object. """ return cds_to_dt(*self.day_segmented_timecode()) class AquaCucTimecode(Timecode): """ EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an agency defined timecode which is in this case (I think) specific to Aqua. NOTE: I'm betting this will work for Terra as well but I do not have any documentation for Terra. FIXME: Not tested or validated!! """ _fields_ = [ ("extension_flag", c.c_uint8, 1), ("timecode_epoch", c.c_uint8, 3), ("coarse_time_count", c.c_uint8, 2), ("fine_time_count", c.c_uint8, 2), ("no_continuation_flag", c.c_uint8, 1), ("leap_seconds", c.c_uint8, 7), ("seconds", c.c_uint32), ("sub_seconds", c.c_uint16), ] EPOCH = datetime(1958, 1, 1) EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds() SUB_SECOND_UNITS = 15.2 def day_segmented_timecode(self): days = self.seconds / 86400 leap_ms = self.leap_seconds * 1e3 millis = 86400 * (days - int(days)) * 1e3 - leap_ms micros = self.SUB_SECOND_UNITS * self.sub_seconds return int(days), int(millis), int(micros) class DaySegmentedTimecode(Timecode): """ CCSDS Day Segmented Timecode """ _pack_ = 1 _fields_ = [ ("days", c.c_uint16), ("milliseconds", c.c_uint32), ("microseconds", c.c_uint16), ] def day_segmented_timecode(self): return (self.days, self.milliseconds, self.microseconds) class AquaGirdSecondaryHeader(BaseStruct): _fields_ = [("flags", c.c_uint8), ("timecode", AquaCucTimecode)] class AquaGiisSecondaryHeader(BaseStruct): _fields_ = [ ("timecode", DaySegmentedTimecode), ("quicklook_flag", c.c_uint8, 1), ("user_flags", c.c_uint8, 7), ] class AquaSpacecraftBusSecondaryHeader(BaseStruct): _fields_ = [("timecode", AquaCucTimecode)] class JpssSecondaryHeader(BaseStruct): """ Secondary header using Day Segmented timecodes. """ _pack_ = 1 _fields_ = [("timecode", DaySegmentedTimecode)] class JpssFirstSecondaryHeader(BaseStruct): """ Secondary header using Day Segmented timecodes and with a packet count. """ _pack_ = 1 _fields_ = [ ("timecode", DaySegmentedTimecode), ("packet_count", c.c_uint8), ("_spare", c.c_uint8), ] class ViirsPacketId(BaseStruct): _pack_ = 1 _fields_ = [("scan_number", c.c_uint32), ("packet_time", DaySegmentedTimecode)] _jpss_headers = { GROUP_FIRST: JpssFirstSecondaryHeader, GROUP_CONTINUING: None, GROUP_LAST: None, GROUP_STANDALONE: JpssSecondaryHeader, } def jpss_header_lookup(primary_header): return _jpss_headers.get(primary_header.sequence_grouping) def amsu_headers(): apids = [ # AMSU-A1 257, 259, 260, 261, 262, # AMSU-A2 288, 289, 290, ] flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE] return {(apid, flag): AquaGirdSecondaryHeader for apid in apids for flag in flags} def airs_headers(): return { # AIRS (404, GROUP_STANDALONE): AquaGirdSecondaryHeader, (405, GROUP_STANDALONE): AquaGirdSecondaryHeader, (406, GROUP_STANDALONE): AquaGirdSecondaryHeader, (407, GROUP_STANDALONE): AquaGirdSecondaryHeader, (414, GROUP_STANDALONE): AquaGirdSecondaryHeader, (415, GROUP_STANDALONE): AquaGirdSecondaryHeader, (416, GROUP_STANDALONE): AquaGirdSecondaryHeader, (417, GROUP_STANDALONE): AquaGirdSecondaryHeader, } def hsb_headers(): return { # HSB (342, GROUP_STANDALONE): AquaGirdSecondaryHeader } def modis_headers(): return { # MODIS (64, GROUP_STANDALONE): AquaGiisSecondaryHeader, (64, GROUP_FIRST): AquaGiisSecondaryHeader, (64, GROUP_LAST): AquaGiisSecondaryHeader, (64, GROUP_CONTINUING): AquaGiisSecondaryHeader, (127, GROUP_STANDALONE): AquaGiisSecondaryHeader, } def ceres_headers(): apids = ( # CERES+Y 141, 142, 143, 144, # CERES-Y 157, 158, 159, 160, ) groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE) return {(a, g): AquaGiisSecondaryHeader for a in apids for g in groupings} def gbad_headers(): return { # GBAD (957, GROUP_STANDALONE): AquaSpacecraftBusSecondaryHeader } def aqua_headers(): """ Aqua headers are looked up via their APID and grouping. """ headers = {} headers.update(amsu_headers()) headers.update(airs_headers()) headers.update(hsb_headers()) headers.update(modis_headers()) headers.update(ceres_headers()) headers.update(gbad_headers()) return headers def aqua_header_lookup(primary_header): apid = primary_header.apid grouping = primary_header.sequence_grouping return _aqua_headers.get((apid, grouping)) _aqua_headers = aqua_headers()