# encoding: utf-8 """ Structures for CCSDS Primary and Secondary headers See: * ICD_Space_Ground_Aqua.pdf - GSFC 422-11-19-03 - http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf * CCSDS Space Packet Protocol - http://public.ccsds.org/publications/archive/133x0b1c2.pdf * CCSDS Time Code Formats - http://public.ccsds.org/publications/archive/301x0b4e1.pdf """ __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." import ctypes as c from datetime import datetime from edosl0util.timecode import cds_to_dt GROUP_FIRST = 0b01 GROUP_LAST = 0b10 GROUP_CONTINUING = 0b00 GROUP_STANDALONE = 0b11 class BaseStruct(c.BigEndianStructure): _pack_ = 1 def __str__(self): return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >" def __repr__(self): fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_) return '<%s (%s)>' % (self.__class__.__name__, fields) def __eq__(self, other): return all(getattr(self, f[0]) == getattr(other, f[0]) for f in self._fields_) class PrimaryHeader(BaseStruct): """ CCSDS Primary Header. """ _fields_ = [ ('version_number', c.c_uint8, 3), # == 0 ('type_indicator', c.c_uint8, 1), ('secondary_header_flag', c.c_uint8, 1), ('apid', c.c_uint16, 11), ('sequence_grouping', c.c_uint16, 2), ('source_sequence_count', c.c_uint16, 14), ('data_length_minus1', c.c_uint16) # octet count minus one ] PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader) class Timecode(BaseStruct): """ Secondary header timecode baseclass. """ def __repr__(self): return str(self.asdatetime()) def day_segmented_timecode(self): raise NotImplementedError() def asdatetime(self): """ CDS Timecode a UTC datetime object. """ return cds_to_dt(*self.day_segmented_timecode()) class AquaCucTimecode(Timecode): """ EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an agency defined timecode which is in this case (I think) specific to Aqua. NOTE: I'm betting this will work for Terra as well but I do not have any documentation for Terra. FIXME: Not tested or validated!! """ _fields_ = [ ('extension_flag', c.c_uint8, 1), ('timecode_epoch', c.c_uint8, 3), ('coarse_time_count', c.c_uint8, 2), ('fine_time_count', c.c_uint8, 2), ('no_continuation_flag', c.c_uint8, 1), ('leap_seconds', c.c_uint8, 7), ('seconds', c.c_uint32), ('sub_seconds', c.c_uint16) ] EPOCH = datetime(1958, 1, 1) EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds() SUB_SECOND_UNITS = 15.2 def day_segmented_timecode(self): micros = self.SUB_SECOND_UNITS * self.sub_seconds seconds = self.seconds + self.leap_seconds return ((seconds - self.leap_seconds) // 86400, micros // 1e3, micros % 1e3) class DaySegmentedTimecode(Timecode): """ CCSDS Day Segmented Timecode """ _pack_ = 1 _fields_ = [ ('days', c.c_uint16), ('milliseconds', c.c_uint32), ('microseconds', c.c_uint16) ] def day_segmented_timecode(self): return (self.days, self.milliseconds, self.microseconds) class AquaGirdSecondaryHeader(BaseStruct): _fields_ = [ ('flags', c.c_uint8), ('timecode', AquaCucTimecode), ] class AquaGiisSecondaryHeader(BaseStruct): _fields_ = [ ('timecode', DaySegmentedTimecode), ('quicklook_flag', c.c_uint8, 1), ('user_flags', c.c_uint8, 7) ] class AquaSpacecraftBusSecondaryHeader(BaseStruct): _fields_ = [ ('timecode', AquaCucTimecode) ] class JpssSecondaryHeader(BaseStruct): """ Secondary header using Day Segmented timecodes. """ _pack_ = 1 _fields_ = [ ('timecode', DaySegmentedTimecode) ] class JpssFirstSecondaryHeader(BaseStruct): """ Secondary header using Day Segmented timecodes and with a packet count. """ _pack_ = 1 _fields_ = [ ('timecode', DaySegmentedTimecode), ('packet_count', c.c_uint8), ('_spare', c.c_uint8) ] class ViirsPacketId(BaseStruct): _pack_ = 1 _fields_ = [ ('scan_number', c.c_uint32), ('packet_time', DaySegmentedTimecode), ] _jpss_headers = { GROUP_FIRST: JpssFirstSecondaryHeader, GROUP_CONTINUING: None, GROUP_LAST: None, GROUP_STANDALONE: JpssSecondaryHeader, } def jpss_header_lookup(primary_header): return _jpss_headers.get(primary_header.sequence_grouping) def amsu_headers(): apids = [ # AMSU-A1 257, 259, 260, 261, 262, # AMSU-A2 288, 289, 290 ] flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE] return {(apid, flag): AquaGirdSecondaryHeader for apid in apids for flag in flags} def airs_headers(): return { # AIRS (404, GROUP_STANDALONE): AquaGirdSecondaryHeader, (405, GROUP_STANDALONE): AquaGirdSecondaryHeader, (406, GROUP_STANDALONE): AquaGirdSecondaryHeader, (407, GROUP_STANDALONE): AquaGirdSecondaryHeader, (414, GROUP_STANDALONE): AquaGirdSecondaryHeader, (415, GROUP_STANDALONE): AquaGirdSecondaryHeader, (416, GROUP_STANDALONE): AquaGirdSecondaryHeader, (417, GROUP_STANDALONE): AquaGirdSecondaryHeader, } def hsb_headers(): return { # HSB (342, GROUP_STANDALONE): AquaGirdSecondaryHeader, } def modis_headers(): return { # MODIS (64, GROUP_STANDALONE): AquaGiisSecondaryHeader, (64, GROUP_FIRST): AquaGiisSecondaryHeader, (64, GROUP_LAST): AquaGiisSecondaryHeader, (64, GROUP_CONTINUING): AquaGiisSecondaryHeader, (127, GROUP_STANDALONE): AquaGiisSecondaryHeader, } def ceres_headers(): apids = ( # CERES+Y 141, 142, 143, 144, # CERES-Y 157, 158, 159, 160 ) groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE) return {(a, g): AquaGiisSecondaryHeader for a in apids for g in groupings} def gbad_headers(): return { # GBAD (957, GROUP_STANDALONE): AquaSpacecraftBusSecondaryHeader } def aqua_headers(): """ Aqua headers are looked up via their APID and grouping. """ headers = {} headers.update(amsu_headers()) headers.update(airs_headers()) headers.update(hsb_headers()) headers.update(modis_headers()) headers.update(ceres_headers()) headers.update(gbad_headers()) return headers def aqua_header_lookup(primary_header): apid = primary_header.apid grouping = primary_header.sequence_grouping return _aqua_headers.get((apid, grouping)) _aqua_headers = aqua_headers()