Skip to content
Snippets Groups Projects
headers.py 6.89 KiB
# encoding: utf-8
"""
Structures for CCSDS Primary and Secondary headers

See:

    * ICD_Space_Ground_Aqua.pdf - GSFC 422-11-19-03
        - http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf
    * CCSDS Space Packet Protocol
        - http://public.ccsds.org/publications/archive/133x0b1c2.pdf
    * CCSDS Time Code Formats
        - http://public.ccsds.org/publications/archive/301x0b4e1.pdf
"""
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."

import ctypes as c
from datetime import datetime

from edosl0util.timecode import cds_to_dt

GROUP_FIRST = 0b01
GROUP_LAST = 0b10
GROUP_CONTINUING = 0b00
GROUP_STANDALONE = 0b11


class BaseStruct(c.BigEndianStructure):
    _pack_ = 1

    def __str__(self):
        return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >"

    def __repr__(self):
        fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_)
        return '<%s (%s)>' % (self.__class__.__name__, fields)

    def __eq__(self, other):
        return all(getattr(self, f[0]) == getattr(other, f[0]) for f in self._fields_)


class PrimaryHeader(BaseStruct):
    """
    CCSDS Primary Header.
    """
    _fields_ = [
        ('version_number', c.c_uint8, 3),  # == 0
        ('type_indicator', c.c_uint8, 1),
        ('secondary_header_flag', c.c_uint8, 1),
        ('apid', c.c_uint16, 11),
        ('sequence_grouping', c.c_uint16, 2),
        ('source_sequence_count', c.c_uint16, 14),
        ('data_length_minus1', c.c_uint16)  # octet count minus one
    ]


PRIMARY_HEADER_SIZE = c.sizeof(PrimaryHeader)


class Timecode(BaseStruct):
    """
    Secondary header timecode baseclass.
    """

    def __repr__(self):
        return str(self.asdatetime())

    def day_segmented_timecode(self):
        raise NotImplementedError()

    def asdatetime(self):
        """
        CDS Timecode a UTC datetime object.
        """
        return cds_to_dt(*self.day_segmented_timecode())


class AquaCucTimecode(Timecode):
    """
    EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an
    agency defined timecode which is in this case (I think) specific to Aqua.

    NOTE: I'm betting this will work for Terra as well but I do not have any
          documentation for Terra.

    FIXME: Not tested or validated!!
    """
    _fields_ = [
        ('extension_flag', c.c_uint8, 1),
        ('timecode_epoch', c.c_uint8, 3),
        ('coarse_time_count', c.c_uint8, 2),
        ('fine_time_count', c.c_uint8, 2),
        ('no_continuation_flag', c.c_uint8, 1),
        ('leap_seconds', c.c_uint8, 7),
        ('seconds', c.c_uint32),
        ('sub_seconds', c.c_uint16)
    ]

    EPOCH = datetime(1958, 1, 1)
    EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
    SUB_SECOND_UNITS = 15.2

    def day_segmented_timecode(self):
        micros = self.SUB_SECOND_UNITS * self.sub_seconds
        seconds = self.seconds + self.leap_seconds
        return ((seconds - self.leap_seconds) // 86400,
                micros // 1e3,
                micros % 1e3)


class DaySegmentedTimecode(Timecode):
    """
    CCSDS Day Segmented Timecode
    """
    _pack_ = 1
    _fields_ = [
        ('days', c.c_uint16),
        ('milliseconds', c.c_uint32),
        ('microseconds', c.c_uint16)
    ]

    def day_segmented_timecode(self):
        return (self.days, self.milliseconds, self.microseconds)


class AquaGirdSecondaryHeader(BaseStruct):
    _fields_ = [
        ('flags', c.c_uint8),
        ('timecode', AquaCucTimecode),
    ]


class AquaGiisSecondaryHeader(BaseStruct):
    _fields_ = [
        ('timecode', DaySegmentedTimecode),
        ('quicklook_flag', c.c_uint8, 1),
        ('user_flags', c.c_uint8, 7)
    ]


class AquaSpacecraftBusSecondaryHeader(BaseStruct):
    _fields_ = [
        ('timecode', AquaCucTimecode)
    ]


class JpssSecondaryHeader(BaseStruct):
    """
    Secondary header using Day Segmented timecodes.
    """
    _pack_ = 1
    _fields_ = [
        ('timecode', DaySegmentedTimecode)
    ]


class JpssFirstSecondaryHeader(BaseStruct):
    """
    Secondary header using Day Segmented timecodes and with a packet count.
    """
    _pack_ = 1
    _fields_ = [
        ('timecode', DaySegmentedTimecode),
        ('packet_count', c.c_uint8),
        ('_spare', c.c_uint8)
    ]


class ViirsPacketId(BaseStruct):
    _pack_ = 1
    _fields_ = [
        ('scan_number', c.c_uint32),
        ('packet_time', DaySegmentedTimecode),
    ]


_jpss_headers = {
    GROUP_FIRST: JpssFirstSecondaryHeader,
    GROUP_CONTINUING: None,
    GROUP_LAST: None,
    GROUP_STANDALONE: JpssSecondaryHeader,
}


def jpss_header_lookup(primary_header):
    return _jpss_headers.get(primary_header.sequence_grouping)


def amsu_headers():
    apids = [
        # AMSU-A1
        257, 259, 260, 261, 262,
        # AMSU-A2
        288, 289, 290
    ]
    flags = [GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE]
    return {(apid, flag): AquaGirdSecondaryHeader
            for apid in apids
            for flag in flags}


def airs_headers():
    return {
        # AIRS
        (404, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (405, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (406, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (407, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (414, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (415, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (416, GROUP_STANDALONE): AquaGirdSecondaryHeader,
        (417, GROUP_STANDALONE): AquaGirdSecondaryHeader,
    }


def hsb_headers():
    return {
        # HSB
        (342, GROUP_STANDALONE): AquaGirdSecondaryHeader,
    }


def modis_headers():
    return {
        # MODIS
        (64, GROUP_STANDALONE): AquaGiisSecondaryHeader,
        (64, GROUP_FIRST): AquaGiisSecondaryHeader,
        (64, GROUP_LAST): AquaGiisSecondaryHeader,
        (64, GROUP_CONTINUING): AquaGiisSecondaryHeader,
        (127, GROUP_STANDALONE): AquaGiisSecondaryHeader,
    }


def ceres_headers():
    apids = (
        # CERES+Y
        141, 142, 143, 144,
        # CERES-Y
        157, 158, 159, 160
    )
    groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE)
    return {(a, g): AquaGiisSecondaryHeader for a in apids for g in groupings}


def gbad_headers():
    return {
        # GBAD
        (957, GROUP_STANDALONE): AquaSpacecraftBusSecondaryHeader
    }


def aqua_headers():
    """
    Aqua headers are looked up via their APID and grouping.
    """
    headers = {}
    headers.update(amsu_headers())
    headers.update(airs_headers())
    headers.update(hsb_headers())
    headers.update(modis_headers())
    headers.update(ceres_headers())
    headers.update(gbad_headers())
    return headers


def aqua_header_lookup(primary_header):
    apid = primary_header.apid
    grouping = primary_header.sequence_grouping
    return _aqua_headers.get((apid, grouping))


_aqua_headers = aqua_headers()