Skip to content
Snippets Groups Projects
rdrgen.py 32.7 KiB
Newer Older
Greg Quinn's avatar
Greg Quinn committed
import ctypes
Bruce Flynn's avatar
Bruce Flynn committed
import shutil
import tempfile
Greg Quinn's avatar
Greg Quinn committed
from collections import OrderedDict
from contextlib import contextmanager
Greg Quinn's avatar
Greg Quinn committed
from datetime import datetime
Greg Quinn's avatar
Greg Quinn committed

import attr
Greg Quinn's avatar
Greg Quinn committed
import h5py
Greg Quinn's avatar
Greg Quinn committed
import numpy as np

Greg Quinn's avatar
Greg Quinn committed
import edosl0util
Bruce Flynn's avatar
Bruce Flynn committed
from edosl0util.jpssrdr import Apid as ApidListItem
from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob
from edosl0util.stream import jpss_packet_stream, collect_groups
from edosl0util.timecode import cds_to_iet, iet_to_dt
LOG = logging.getLogger(__name__)


class Error(Exception):
    """RDR generation base error.
    """


class InvalidStream(Error):
    """ Packet stream is not valid for RDR
    """


class InvalidPacket(Error):
    """ Invalid packet for the current RDR.
    """

    def __init__(self, msg, packet):
        self.msg = msg
        self.packet = packet

Bruce Flynn's avatar
Bruce Flynn committed
def iter_pkts(l0_files):
    for l0_file in l0_files:
        with open(l0_file, "rb") as l0_file_obj:
            for pkt in jpss_packet_stream(l0_file_obj):
                yield pkt

def filter_group_orphans(s):
    """
    Filter out any packets in a group that do not fall withing the expected
Bruce Flynn's avatar
Bruce Flynn committed
    sequence id series and expected packets per group. This really only
    applies to VIIRS data since it's the only sensor that utilizes groups.

    This is useful for input RDR packet streams received from direct braodcast
    that may have sub-ideal packet sequences that RDR generation does not play
    well with.
    """
    viirs_apids = set(ViirsScienceApidInfo.apids)
Bruce Flynn's avatar
Bruce Flynn committed
    viirs_tracker = ViirsGroupedPacketTimeTracker()
    for p in iter(s):
Bruce Flynn's avatar
Bruce Flynn committed
        # If it's a VIIRS grouped packet, check for orphans
        if (
            p.apid in viirs_apids
            and ViirsScienceApidInfo.get_packets_per_scan(p.apid) > 1
        ):
            try:
                viirs_tracker.get_iet(p)
            except OrphanedViirsPacket:
                LOG.debug("dropping orphan %s", p)
Bruce Flynn's avatar
Bruce Flynn committed
        yield p
def filter_before(s, before):
    """
    Filter all packets before the provided time. This handles groupped packets
    by dropping the entire group if its first packet has a timestamp before
    the provided time.
    for grp in collect_groups(s):
        if grp[0].stamp < before:
            LOG.debug("dropping before[%s] %s", before, grp[0])
            continue
        yield from grp
Bruce Flynn's avatar
Bruce Flynn committed
def packets_to_rdrs(sat, l0_files, **kwargs):
Bruce Flynn's avatar
Bruce Flynn committed
    return build_rdr(sat, iter_pkts(l0_files), **kwargs)
Bruce Flynn's avatar
Bruce Flynn committed
def build_rdr(
    sat,
    pkt_iter,
    output_dir=".",
    aggr_type="idps",
    aggr_level=None,
    diary_cushion=10000000,
    attr_overrides={},
Bruce Flynn's avatar
Bruce Flynn committed
):
Greg Quinn's avatar
Greg Quinn committed
    """Construct RDR file(s) from L0 packets

    Default aggregation behavior uses file boundaries computed in the same way
    as for IDPS assuming the default aggregation level depending on the instrument
Greg Quinn's avatar
Greg Quinn committed
    (e.g. 85 sec for VIIRS, 8 min for CrIS). A different aggregation level can
    be given as an integer number of granule (e.g. 4 to get roughly 5.5 min files
    for VIIRS). Finally, aggr_type can be set to 'full' to request that a single
    file gets produced (thus aggregating all granules containing the given
    packets).
    """

    # divy packets up into temp files organized by granule
    file_mgr = BinnedTemporaryFileManager()
Bruce Flynn's avatar
Bruce Flynn committed
    try:
        get_jpss_packet_time = GetJpssPacketTime()
        gran_infos = set()  # (rdr_type, gran_iet) pairs
        for pkt in pkt_iter:
            rdr_type = get_rdr_type(pkt.apid)
            pkt_iet = get_jpss_packet_time(pkt)
            gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet)
            gran_info = (rdr_type, gran_iet)
            gran_infos.add(gran_info)
            file_mgr.add_data(gran_info, pkt.bytes())

        # determine what RDR files we'll be producing based on the packets we've seen
        rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos)
        primary_type, packaged_type = process_rdr_types(
            rdr_types, force_packaging=False
        )
        rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2))
        if aggr_type == "idps":
            aggr_level = aggr_level or primary_type.default_aggregation
            primary_aggr_iets = sorted(
                set(
                    get_aggregate_start(
                        sat, primary_type.gran_len, aggr_level, gran_iet
                    )
                    for (rdr_type, gran_iet) in gran_infos
                    if rdr_type is primary_type
                )
            )
        elif aggr_type == "full":
            # produce a single output file, ignoring IDPS-style aggregation boundaries
            assert aggr_level is None
            first_gran_iet = min(
                gran_iet
Bruce Flynn's avatar
Bruce Flynn committed
                for (rdr_type, gran_iet) in gran_infos
                if rdr_type is primary_type
            )
Bruce Flynn's avatar
Bruce Flynn committed
            last_gran_iet = max(
                gran_iet
                for (rdr_type, gran_iet) in gran_infos
                if rdr_type is primary_type
Bruce Flynn's avatar
Bruce Flynn committed
            )
Bruce Flynn's avatar
Bruce Flynn committed
            aggr_level = (last_gran_iet - first_gran_iet) // primary_type.gran_len + 1
            primary_aggr_iets = [first_gran_iet]
        else:
            raise ValueError("aggr_type must be idps or input")

        # now generate the RDRs
        rdr_files = []
        for aggr_iet in primary_aggr_iets:
            rdr_writer = RdrWriter(
                sat, rdr_types, aggr_iet, aggr_level, output_dir, **attr_overrides
Bruce Flynn's avatar
Bruce Flynn committed
            )
Bruce Flynn's avatar
Bruce Flynn committed
            rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level)
            gran_iets = [
                aggr_iet + i * primary_type.gran_len for i in range(aggr_level)
            ]
            for gran_iet in gran_iets:
                with file_mgr.process_file((primary_type, gran_iet)) as pkt_file:
                    pkts = list(jpss_packet_stream(pkt_file))
                blob = build_rdr_blob(sat, pkts, primary_type, gran_iet, strict=strict)
Bruce Flynn's avatar
Bruce Flynn committed
                rdr_writer.write_granule(primary_type, gran_iet, blob)
            if packaged_type:
                packaged_gran_iets = get_overlapping_granules(
                    sat,
                    packaged_type.gran_len,
                    aggr_iet - diary_cushion,
                    aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1,
                )
                rdr_writer.write_aggregate(
                    packaged_type, packaged_gran_iets[0], len(packaged_gran_iets)
                )
                for gran_iet in packaged_gran_iets:
                    with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file:
                        pkts = list(jpss_packet_stream(pkt_file))
                    blob = build_rdr_blob(
                        sat, pkts, packaged_type, gran_iet, strict=strict
                    )
Bruce Flynn's avatar
Bruce Flynn committed
                    rdr_writer.write_granule(packaged_type, gran_iet, blob)
            rdr_writer.close()
            rdr_files.append(rdr_writer.file_name)
    finally:
        file_mgr.clean_up()
Greg Quinn's avatar
Greg Quinn committed

    return rdr_files


def process_rdr_types(given_rdr_types, force_packaging):
    """Determine the RDR type we'll be making based on the packets we've been given

    Return both a primary_type and packaged_type. The primary type could indicate
    either a science or spacecraft RDR. The packaged type is used when spacecraft
    packets are to be packaged along with a science RDR; it will be None if that's
    not the case. forceA_packaging is a boolean that if set will force spacecraft
    RDR structures to be written out even if no spacecraft packets are present.
    """
    rdr_types = set(given_rdr_types)
    if not rdr_types:
Bruce Flynn's avatar
Bruce Flynn committed
        raise RdrgenError("No RDR types to generate!")
    sci_type = {t for t in rdr_types if t.type_id == "SCIENCE"}
    rdr_types -= sci_type
    if len(sci_type) > 1:
        raise RdrgenError(
Bruce Flynn's avatar
Bruce Flynn committed
            "Cannot process more than 1 science RDR type at once "
            + "(have {})".format(", ".join(t.short_name for t in sci_type))
        )
    diary_type = {t for t in rdr_types if t is SpacecraftDiaryRdrType}
    rdr_types -= diary_type
    if rdr_types:
        raise RdrgenError(
Bruce Flynn's avatar
Bruce Flynn committed
            "Unsupported RDR type(s): " + ", ".join(t.short_name for t in rdr_types)
        )
    if sci_type:
        primary_type, = sci_type
        packaged_type = (
Bruce Flynn's avatar
Bruce Flynn committed
            SpacecraftDiaryRdrType if (diary_type or force_packaging) else None
        )
    else:
        primary_type = SpacecraftDiaryRdrType
        packaged_type = None
    return primary_type, packaged_type
class BinnedTemporaryFileManager(object):
    """Manage a set of append-mode temporary files each labeled with a 'bin key'

    Limits the number of file handles kept open at a time.
    """
Bruce Flynn's avatar
Bruce Flynn committed
    def __init__(self, parent_dir=".", max_open_files=32):
        self.max_open_files = max_open_files
        self.dir = tempfile.mkdtemp(dir=parent_dir)
        self._file_paths = {}
        self._file_objs = OrderedDict()

    def add_data(self, bin_key, data):
        """Append some data to the temp file associated with bin_key

        Creates the file if needed.
        """
        file_obj = self._file_objs.pop(bin_key, None)
        if not file_obj:
            file_path = self._file_paths.get(bin_key)
            if file_path:
Bruce Flynn's avatar
Bruce Flynn committed
                file_obj = open(file_path, "a+b")
Bruce Flynn's avatar
Bruce Flynn committed
                file_obj = tempfile.NamedTemporaryFile(
                    dir=self.dir, delete=False, mode="wb"
                )
                file_path = file_obj.name
                self._file_paths[bin_key] = file_path
            if len(self._file_objs) == self.max_open_files:
                _, old_file_obj = self._file_objs.popitem(last=False)
                old_file_obj.close()
        self._file_objs[bin_key] = file_obj
        file_obj.write(data)

    @contextmanager
    def process_file(self, bin_key):
        """'Check out' a file for processing

        Returns a context manager that provides a read-mode file handle and
        removes the file after processing. Can be called with a bin_key that
        has no data yet which just results in an empty file.
        """
        file_obj = self._file_objs.pop(bin_key, None)
        if file_obj:
            file_obj.close()
Bruce Flynn's avatar
Bruce Flynn committed
        file_path = self._file_paths.pop(bin_key, "/dev/null")
        file_obj = open(file_path, "rb")
        try:
            yield file_obj
        finally:
            file_obj.close()
Bruce Flynn's avatar
Bruce Flynn committed
            if file_path != "/dev/null":
                os.remove(file_path)

    def clean_up(self):
        """Call after all files are processed to avoid leaving a dir sitting around"""
Bruce Flynn's avatar
Bruce Flynn committed
        shutil.rmtree(self.dir)
Bruce Flynn's avatar
Bruce Flynn committed
missions = {
    "npp": "S-NPP/JPSS",
    "snpp": "S-NPP/JPSS",
    "j01": "NOAA-20/JPSS",
    "jpss1": "NOAA-20/JPSS",
    "noaa20": "NOAA-20/JPSS",
}

class RdrWriter(object):
Bruce Flynn's avatar
Bruce Flynn committed
    def __init__(
        self,
        sat,
        rdr_types,
        aggr_iet,
        aggr_level,
        dir_path=".",
        distributor=None,
        origin=None,
        domain=None,
        compressed=False,
        orbit_num=0,
        creation_time=None,
        software_ver=None,
    ):
        self._sat = sat
        origin = origin or self.default_origin
        distributor = distributor or origin
        self._domain = domain or self.default_domain
        self._orbit_num = orbit_num
        self._creation_time = creation_time or datetime.now()
        self._software_ver = (
Bruce Flynn's avatar
Bruce Flynn committed
            software_ver or edosl0util.__name__ + "-" + edosl0util.__version__
        )

        aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len
        self.file_name = make_rdr_filename(
Bruce Flynn's avatar
Bruce Flynn committed
            rdr_types,
            self._sat,
            aggr_iet,
            aggr_end_iet,
            self._orbit_num,
            self._creation_time,
            origin,
            self._domain,
            compressed,
        )
        self._h5_file = h5py.File(os.path.join(dir_path, self.file_name), "w")
        self._write_skeleton(rdr_types, distributor, origin)
        self._aggr_starts = {}

    def _write_skeleton(self, rdr_types, distributor, origin):
Bruce Flynn's avatar
Bruce Flynn committed
        self._set_h5_attrs(
            self._h5_file,
            {
                "Distributor": distributor,
Bruce Flynn's avatar
Bruce Flynn committed
                "Mission_Name": missions[self._sat],
Bruce Flynn's avatar
Bruce Flynn committed
                "N_Dataset_Source": origin,
                "N_HDF_Creation_Date": self._format_date_attr(self._creation_time),
                "N_HDF_Creation_Time": self._format_time_attr(self._creation_time),
                "Platform_Short_Name": platform_short_names[self._sat],
            },
        )
        all_grp = self._h5_file.create_group("All_Data")
        prod_grp = self._h5_file.create_group("Data_Products")
        for rdr_type in rdr_types:
Bruce Flynn's avatar
Bruce Flynn committed
            all_grp.create_group(rdr_type.short_name + "_All")
            gran_grp = prod_grp.create_group(rdr_type.short_name)
Bruce Flynn's avatar
Bruce Flynn committed
            self._set_h5_attrs(
                gran_grp,
                {
                    "Instrument_Short_Name": instrument_short_names[rdr_type.sensor],
                    "N_Collection_Short_Name": rdr_type.short_name,
                    "N_Dataset_Type_Tag": "RDR",
                    "N_Processing_Domain": self._domain,
                },
            )

    def write_aggregate(self, rdr_type, aggr_iet, num_grans):
        self._aggr_starts[rdr_type] = aggr_iet
Bruce Flynn's avatar
Bruce Flynn committed
        grp = self._h5_file["Data_Products"][rdr_type.short_name]
        ds = grp.create_dataset(
Bruce Flynn's avatar
Bruce Flynn committed
            rdr_type.short_name + "_Aggr", [1], self._h5_ref_dtype, maxshape=[None]
        )
        ds[0] = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)].ref
        aggr_end_iet = aggr_iet + num_grans * rdr_type.gran_len
        last_gran_iet = aggr_end_iet - rdr_type.gran_len
Bruce Flynn's avatar
Bruce Flynn committed
        self._set_h5_attrs(
            ds,
            {
                "AggregateBeginningDate": self._format_date_attr(aggr_iet),
                "AggregateBeginningGranuleID": make_granule_id(self._sat, aggr_iet),
                "AggregateBeginningOrbitNumber": np.uint64(self._orbit_num),
                "AggregateBeginningTime": self._format_time_attr(aggr_iet),
                "AggregateEndingDate": self._format_date_attr(aggr_end_iet),
                "AggregateEndingGranuleID": make_granule_id(self._sat, last_gran_iet),
                "AggregateEndingOrbitNumber": np.uint64(self._orbit_num),
                "AggregateEndingTime": self._format_time_attr(aggr_end_iet),
                "AggregateNumberGranules": np.uint64(num_grans),
            },
        )

    def write_granule(self, rdr_type, gran_iet, blob, creation_time=None):
Bruce Flynn's avatar
Bruce Flynn committed
        raw_grp = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)]
        gran_idx = (gran_iet - self._aggr_starts[rdr_type]) // rdr_type.gran_len
Greg Quinn's avatar
Greg Quinn committed
        raw_ds = raw_grp.create_dataset(
Bruce Flynn's avatar
Bruce Flynn committed
            "RawApplicationPackets_{}".format(gran_idx), data=blob, maxshape=[None]
        )
        gran_grp = self._h5_file["Data_Products"][rdr_type.short_name]
Greg Quinn's avatar
Greg Quinn committed
        gran_ds = gran_grp.create_dataset(
Bruce Flynn's avatar
Bruce Flynn committed
            rdr_type.short_name + "_Gran_{}".format(gran_idx),
            [1],
            self._h5_regionref_dtype,
            maxshape=[None],
        )
Greg Quinn's avatar
Greg Quinn committed
        gran_ds[0] = raw_ds.regionref[:]
        gran_end_iet = gran_iet + rdr_type.gran_len
        creation_time = creation_time or self._creation_time or datetime.now()
        gran_id = make_granule_id(self._sat, gran_iet)
Bruce Flynn's avatar
Bruce Flynn committed
        gran_ver = "A1"
        blob_info = decode_rdr_blob(blob)
Bruce Flynn's avatar
Bruce Flynn committed
        self._set_h5_attrs(
            gran_ds,
            {
                "Beginning_Date": self._format_date_attr(gran_iet),
                "Beginning_Time": self._format_time_attr(gran_iet),
                "Ending_Date": self._format_date_attr(gran_end_iet),
                "Ending_Time": self._format_time_attr(gran_end_iet),
                "N_Beginning_Orbit_Number": np.uint64(self._orbit_num),
                "N_Beginning_Time_IET": np.uint64(gran_iet),
                "N_Creation_Date": self._format_date_attr(creation_time),
                "N_Creation_Time": self._format_time_attr(creation_time),
                "N_Ending_Time_IET": np.uint64(gran_end_iet),
                "N_Granule_ID": gran_id,
                "N_Granule_Status": "N/A",
                "N_Granule_Version": gran_ver,
                "N_IDPS_Mode": self._domain,
                "N_JPSS_Document_Ref": rdr_type.document,
                "N_LEOA_Flag": "Off",
                "N_Packet_Type": [a.name for a in blob_info.apids],
                "N_Packet_Type_Count": [
                    np.uint64(a.pkts_received) for a in blob_info.apids
                ],
                "N_Percent_Missing_Data": np.float32(
                    self._calc_percent_missing(blob_info)
                ),
                "N_Primary_Label": "Primary",  # TODO: find out what this is
                "N_Reference_ID": ":".join([rdr_type.short_name, gran_id, gran_ver]),
                "N_Software_Version": self._software_ver,
            },
        )
    def close(self):
        self._h5_file.close()
    @staticmethod
    def _set_h5_attrs(h5_obj, attrs):
Bruce Flynn's avatar
Bruce Flynn committed
        def tpcnv(v):
            if isinstance(v, list):
                return [tpcnv(x) for x in v]
            elif isinstance(v, str):
                return np.array(v.encode('ascii'), dtype="S")
            elif isinstance(v, bytes):
                return np.array(v, dtype="S")
            return v
        # for some reason all the H5 attributes are in 2-D arrays in IDPS files
        for name, value in attrs.items():
            if isinstance(value, list):
                value = [[x] for x in value]
            else:
                value = [[value]]
Bruce Flynn's avatar
Bruce Flynn committed
            h5_obj.attrs[name] = tpcnv(value)
    @staticmethod
    def _format_date_attr(t):
Bruce Flynn's avatar
Bruce Flynn committed
        return iet_to_datetime(t).strftime("%Y%m%d")

    @staticmethod
    def _format_time_attr(t):
Bruce Flynn's avatar
Bruce Flynn committed
        return iet_to_datetime(t).strftime("%H%M%S.%fZ")

    @staticmethod
    def _calc_percent_missing(common_rdr):
        total_received = sum(a.pkts_received for a in common_rdr.apids)
        total_reserved = sum(a.pkts_reserved for a in common_rdr.apids)
Bruce Flynn's avatar
Bruce Flynn committed
        return 100.0 * (total_reserved - total_received) / total_reserved
Bruce Flynn's avatar
Bruce Flynn committed
    default_origin = "ssec"
    default_domain = "dev"
    _h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference)
    _h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference)
def _encodeall(v):
    """
    Recurse v encoding stringy values. dict keys are not encoded.
    """
    if isinstance(v, dict):
        dtmp = {}
        for k, v in v.items():
            dtmp[k] = _encodeall(v)
        return dtmp
    if isinstance(v, list):
        return [_encodeall(x) for x in v]
    if isinstance(v, compat.str_types):
        return v.encode()
    return v


def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet, strict=True):
    """ Build an RDR blob of data for a single RDR native resolution granule.

    :param str sat: snpp or noaa20
    :param .stream.PacketStream pkt_stream:
        Stream of packets for a single RDR native resolution granule.
    :param RdrType rdr_type:
        One of the *RDRType classes that indicates the type of packets contained
        in the stream.
    :param int granule_iet: Granule start time in IET.
    :param bool strict:
        When True any packet stream issues, duplicates, unexpected APIDs, etc...,
        will result in an error subclass of Error.

    :raise InvalidStream: If strict and the stream looks hokey
    :raise InvalidPacket: If strict and got a bunk packet
    """
    get_jpss_packet_time = GetJpssPacketTime()
Greg Quinn's avatar
Greg Quinn committed
    granule_iet_end = granule_iet + rdr_type.gran_len
Greg Quinn's avatar
Greg Quinn committed

    total_pkt_size = 0
    apid_info = OrderedDict()
    total_trackers = 0
    all_pkts = []
    for apid in rdr_type.apids:
        apid_info[apid.num] = {
Bruce Flynn's avatar
Bruce Flynn committed
            "name": apid.name.encode(),
            "pkts_reserved": apid.max_expected,
            "pkts_received": 0,
            "first_tracker_index": total_trackers,
            "pkt_info": [{} for _ in range(apid.max_expected)],
        }
Greg Quinn's avatar
Greg Quinn committed
        total_trackers += apid.max_expected

    for pkt in pkt_stream:
Greg Quinn's avatar
Greg Quinn committed
        if pkt.apid not in apid_info:
            if strict:
                raise InvalidPacket(
                    "APID {} not expected for {}".format(pkt.apid, rdr_type.short_name),
                    pkt,
                )
            else:
                LOG.warning("dropping unexpected APID: %s", pkt)
                continue
        pkt_iet = get_jpss_packet_time(pkt)
Greg Quinn's avatar
Greg Quinn committed
        if not granule_iet <= pkt_iet < granule_iet_end:
            if strict:
                raise InvalidStream("packet stream crosses granule boundary")
            else:
                LOG.warning("dropping packet that crosses granule boundry: %s", pkt)
                continue
Greg Quinn's avatar
Greg Quinn committed
        info = apid_info[pkt.apid]
        if info["pkts_received"] > len(info["pkt_info"]) - 1:
            if strict:
                raise InvalidPacket("possible duplicate packet", pkt)
            else:
                LOG.warning("dropping possible duplicate packet: %s", pkt)
                continue
Bruce Flynn's avatar
Bruce Flynn committed
        pkt_info = info["pkt_info"][info["pkts_received"]]
        pkt_info["obs_time"] = pkt_iet
        pkt_info["seq_num"] = pkt.seqid
        pkt_info["size"] = pkt.size
        pkt_info["offset"] = total_pkt_size
        info["pkts_received"] += 1
Greg Quinn's avatar
Greg Quinn committed
        total_pkt_size += pkt.size
        all_pkts.append(pkt)

    apid_list_offset = ctypes.sizeof(StaticHeader)
    pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem)
Bruce Flynn's avatar
Bruce Flynn committed
    ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof(
        PacketTracker
    )
Greg Quinn's avatar
Greg Quinn committed
    buf_size = ap_storage_offset + total_pkt_size
    buf = np.zeros([buf_size], np.uint8)  # zeros needed to null-pad strings

    header = StaticHeader.from_buffer(buf)
    header.satellite = platform_short_names[sat].encode()
    header.sensor = instrument_short_names[rdr_type.sensor].encode()
    header.type_id = rdr_type.type_id.encode()
Greg Quinn's avatar
Greg Quinn committed
    header.num_apids = len(apid_info)
    header.apid_list_offset = apid_list_offset
    header.pkt_tracker_offset = pkt_tracker_offset
    header.ap_storage_offset = ap_storage_offset
    header.next_pkt_pos = total_pkt_size
    header.start_boundary = granule_iet
    header.end_boundary = granule_iet_end

    for i, (apid, info) in enumerate(apid_info.items()):
        offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem)
        item = ApidListItem.from_buffer(buf, offset)
Bruce Flynn's avatar
Bruce Flynn committed
        item.name = info["name"]
Greg Quinn's avatar
Greg Quinn committed
        item.value = apid
Bruce Flynn's avatar
Bruce Flynn committed
        item.pkt_tracker_start_idx = info["first_tracker_index"]
        item.pkts_reserved = info["pkts_reserved"]
        item.pkts_received = info["pkts_received"]

        for j, pkt_info in enumerate(info["pkt_info"]):
            offset = header.pkt_tracker_offset + (
                info["first_tracker_index"] + j
            ) * ctypes.sizeof(PacketTracker)
Greg Quinn's avatar
Greg Quinn committed
            tracker = PacketTracker.from_buffer(buf, offset)
            if pkt_info:
Bruce Flynn's avatar
Bruce Flynn committed
                tracker.obs_time = pkt_info["obs_time"]
                tracker.sequence_number = pkt_info["seq_num"]
                tracker.size = pkt_info["size"]
                tracker.offset = pkt_info["offset"]
Greg Quinn's avatar
Greg Quinn committed
                tracker.fill_percent = 0
            else:
                tracker.offset = -1

    buf[ap_storage_offset:] = bytearray().join(pkt.bytes() for pkt in all_pkts)

    return buf


class ViirsScienceApidInfo(object):
Bruce Flynn's avatar
Bruce Flynn committed
    apids = list(x for x in range(800, 829) if x != 824)
Bruce Flynn's avatar
Bruce Flynn committed
    names = [
        "M04",
        "M05",
        "M03",
        "M02",
        "M01",
        "M06",
        "M07",
        "M09",
        "M10",
        "M08",
        "M11",
        "M13",
        "M12",
        "I04",
        "M16",
        "M15",
        "M14",
        "I05",
        "I01",
        "I02",
        "I03",
        "DNB",
        "DNB_MGS",
        "DNB_LGS",
        "CAL",
        "ENG",
        "DNB_HGA",
        "DNB_HGB",
    ]

    @classmethod
    def get_specs(cls):
Bruce Flynn's avatar
Bruce Flynn committed
        return [
            ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid))
            for apid in cls.apids
        ]

    @classmethod
    def get_name(cls, apid):
        return cls.names[cls.apids.index(apid)]

    @classmethod
    def get_max_expected(cls, apid):
        max_scans = 48
        return max_scans * cls.get_packets_per_scan(apid)

    @classmethod
    def get_packets_per_scan(cls, apid):
        name = cls.get_name(apid)
Bruce Flynn's avatar
Bruce Flynn committed
        if name == "ENG":
            return 1
Bruce Flynn's avatar
Bruce Flynn committed
        elif name == "CAL":
            return 24
Bruce Flynn's avatar
Bruce Flynn committed
        elif name.startswith("M") or name.startswith("DNB"):
            return 17
        else:
            return 33


class CrisScienceApidInfo(object):
    apids = [1289, 1290] + list(range(1315, 1396))

    @classmethod
    def get_specs(cls):
Bruce Flynn's avatar
Bruce Flynn committed
        return [
            ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid))
            for apid in cls.apids
        ]

    @classmethod
    def get_name(cls, apid):
        if apid == 1289:
Bruce Flynn's avatar
Bruce Flynn committed
            return "EIGHT_S_SCI"
        elif apid == 1290:
Bruce Flynn's avatar
Bruce Flynn committed
            return "ENG"
Greg Quinn's avatar
Greg Quinn committed
        else:
            offset = apid - 1315
Bruce Flynn's avatar
Bruce Flynn committed
            view_types = ["N", "S", "C"]
            bands = ["LW", "MW", "SW"]
            num_fovs = 9
            view_type = view_types[offset // (num_fovs * len(bands))]
            band = bands[offset // num_fovs % len(bands)]
            fov = str(offset % num_fovs + 1)
            return view_type + band + fov

    @classmethod
    def get_max_expected(cls, apid):
        name = cls.get_name(apid)
Bruce Flynn's avatar
Bruce Flynn committed
        if name == "EIGHT_S_SCI":
Bruce Flynn's avatar
Bruce Flynn committed
        elif name == "ENG":
            return 1
        else:
            view_type = name[0]
Bruce Flynn's avatar
Bruce Flynn committed
            if view_type == "N":
Greg Quinn's avatar
Greg Quinn committed


@attr.s
class ApidSpec(object):
    num = attr.ib()
    name = attr.ib()
    max_expected = attr.ib()


class RdrTypeManager(object):
    def __init__(self):
        self._types_by_apid = {}

    def register_type(self, cls):
        for apid_spec in cls.apids:
            if apid_spec.num in self._types_by_apid:
Bruce Flynn's avatar
Bruce Flynn committed
                raise ValueError("each APID can only be handled by one RDR type")
            self._types_by_apid[apid_spec.num] = cls
        return cls

    def get_type_for_apid(self, apid):
        try:
            return self._types_by_apid[apid]
        except KeyError:
Bruce Flynn's avatar
Bruce Flynn committed
            raise RdrgenError("Unsupported APID: {}".format(apid))


rdr_type_mgr = RdrTypeManager()
rdr_type_spec = rdr_type_mgr.register_type
get_rdr_type = rdr_type_mgr.get_type_for_apid


@rdr_type_spec
class ViirsScienceRdrType(object):
Bruce Flynn's avatar
Bruce Flynn committed
    product_id = "RVIRS"
    short_name = "VIIRS-SCIENCE-RDR"
    gran_len = 85350000
Bruce Flynn's avatar
Bruce Flynn committed
    sensor = "viirs"
    type_id = "SCIENCE"
    document = "474-00448-02-06_JPSS-DD-Vol-II-Part-6_0200G.pdf"
    apids = ViirsScienceApidInfo.get_specs()
    default_aggregation = 1
@rdr_type_spec
Greg Quinn's avatar
Greg Quinn committed
class CrisScienceRdrType(object):
Bruce Flynn's avatar
Bruce Flynn committed
    product_id = "RCRIS"
    short_name = "CRIS-SCIENCE-RDR"
Greg Quinn's avatar
Greg Quinn committed
    gran_len = 31997000
Bruce Flynn's avatar
Bruce Flynn committed
    sensor = "cris"
    type_id = "SCIENCE"
    document = "474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf"
    apids = CrisScienceApidInfo.get_specs()
    default_aggregation = 15
Greg Quinn's avatar
Greg Quinn committed
@rdr_type_spec
class AtmsScienceRdrType(object):
Bruce Flynn's avatar
Bruce Flynn committed
    product_id = "RATMS"
    short_name = "ATMS-SCIENCE-RDR"
Greg Quinn's avatar
Greg Quinn committed
    gran_len = 31997000
Bruce Flynn's avatar
Bruce Flynn committed
    sensor = "atms"
    type_id = "SCIENCE"
    document = "474-00448-02-02_JPSS-DD-Vol-II-Part-2_0200B.pdf"
    apids = [
        ApidSpec(515, "CAL", max_expected=5),
        ApidSpec(528, "SCI", max_expected=1249),
        ApidSpec(530, "ENG_TEMP", max_expected=13),
        ApidSpec(531, "ENG_HS", max_expected=5),
    ]
Greg Quinn's avatar
Greg Quinn committed
    default_aggregation = 15


@rdr_type_spec
Greg Quinn's avatar
Greg Quinn committed
class SpacecraftDiaryRdrType(object):
Bruce Flynn's avatar
Bruce Flynn committed
    product_id = "RNSCA"
    short_name = "SPACECRAFT-DIARY-RDR"
Greg Quinn's avatar
Greg Quinn committed
    gran_len = 20000000
    sensor = None
Bruce Flynn's avatar
Bruce Flynn committed
    type_id = "DIARY"
    document = "474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200F.pdf"
    apids = [
        ApidSpec(0, "CRITICAL", max_expected=21),
        ApidSpec(8, "ADCS_HKH", max_expected=21),
        ApidSpec(11, "DIARY", max_expected=21),
    ]
    default_aggregation = 303


class GetJpssPacketTime(object):
    def __init__(self):
        self._viirs_tracker = ViirsGroupedPacketTimeTracker()

    def __call__(self, pkt):
        if self._viirs_tracker.tracks_apid(pkt.apid):
            return self._viirs_tracker.get_iet(pkt)
        else:
            return get_packet_iet(pkt)
class ViirsGroupedPacketTimeTracker(object):
    grouped_apids = list(range(800, 824)) + [825, 827, 828]

    @classmethod
    def tracks_apid(cls, apid):
        return apid in cls.grouped_apids

    def __init__(self):
        self._db = {}

    def get_iet(self, pkt):
        if not self.tracks_apid(pkt.apid):
Bruce Flynn's avatar
Bruce Flynn committed
            raise ValueError("APID {} not a VIIRS grouped packet type".format(pkt.apid))
        if pkt.is_first():
            obs_iet = get_packet_iet(pkt)
            self._db[pkt.apid] = (obs_iet, pkt.seqid)
            return obs_iet
        else:
Bruce Flynn's avatar
Bruce Flynn committed
            if pkt.apid not in self._db:
                # have not seen a first packet yet
                raise OrphanedViirsPacket(pkt)
            last_obs_iet, last_seq = self._db[pkt.apid]
            group_size = ViirsScienceApidInfo.get_packets_per_scan(pkt.apid)
            if not self.check_sequence_number(pkt.seqid, last_seq, group_size):
                raise OrphanedViirsPacket(pkt)
            if not self.check_packet_iet(self.get_viirs_iet(pkt), last_obs_iet):
                raise OrphanedViirsPacket(pkt)
            return last_obs_iet

    @staticmethod
    def get_viirs_iet(pkt):
        if pkt.is_standalone():
            idx = 18
        elif pkt.is_first():
            idx = 20
        else:
            idx = 10
        arr = np.frombuffer(pkt.bytes()[idx : idx + 8], "B")  # noqa
Bruce Flynn's avatar
Bruce Flynn committed
        days = arr[0:2].view(">u2")[0]
        ms = arr[2:6].view(">u4")[0]
        us = arr[6:8].view(">u2")[0]
        return cds_to_iet(days, ms, us)

    @staticmethod
    def check_sequence_number(nonfirst_seq_num, first_seq_num, group_size):
        seq_limit = 2 ** 14
        group_end = first_seq_num + group_size
        # the 2nd check below is needed to handle wrap-around
Bruce Flynn's avatar
Bruce Flynn committed
        return (
            first_seq_num < nonfirst_seq_num < group_end
            or first_seq_num < nonfirst_seq_num + seq_limit < group_end
        )

    @staticmethod
    def check_packet_iet(pkt_iet, obs_iet):
        # this can be a pretty loose check since it is only needed to prevent
        # the very rare case where the sequence number check yields a false
        # positive
        permitted_lag = 5  # seconds
        lag = (pkt_iet - obs_iet) * 1e-6
        return 0 <= lag <= permitted_lag


class RdrgenError(Exception):
    pass


class OrphanedViirsPacket(RdrgenError):
    def __init__(self, pkt):
        self.packet = pkt

    def __str__(self):
        return repr(self.packet)


Bruce Flynn's avatar
Bruce Flynn committed
def make_rdr_filename(
    rdr_types,
    sat,
    aggr_begin,
    aggr_end,
    orbit_num,
    creation_time,
    origin,
    domain,
    compressed,
):
    aggr_begin = iet_to_datetime(aggr_begin)
    aggr_end = iet_to_datetime(aggr_end)
Bruce Flynn's avatar
Bruce Flynn committed
    prod_ids = "-".join(sorted(t.product_id for t in rdr_types))
    sat = {"snpp": "npp", "j01": "j01"}[sat]
    if origin.endswith("-"):
        origin = origin[:-1] + ("c" if compressed else "u")
    def format_time(t):
Bruce Flynn's avatar
Bruce Flynn committed
        return t.strftime("%H%M%S") + str(t.microsecond // 100000)
Bruce Flynn's avatar
Bruce Flynn committed
    return "{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5".format(  # noqa
Bruce Flynn's avatar
Bruce Flynn committed
        p=prod_ids,
        s=sat,
        d=aggr_begin,
        b=format_time(aggr_begin),
        e=format_time(aggr_end),
        n=orbit_num,
        c=creation_time,
        o=origin,
        m=domain,
    )


def make_granule_id(sat, gran_iet):
    tenths = (gran_iet - satellite_base_times[sat]) // 100000
Bruce Flynn's avatar
Bruce Flynn committed
    return "{}{:012d}".format(platform_short_names[sat], tenths)
def get_packet_iet(pkt):
    tc = pkt.secondary_header.timecode
    return cds_to_iet(tc.days, tc.milliseconds, tc.microseconds)
Greg Quinn's avatar
Greg Quinn committed


def iet_to_datetime(iet):
    if isinstance(iet, datetime):
        return iet
    return iet_to_dt(iet)
def get_granule_start(sat, gran_len, iet):
    base_time = satellite_base_times[sat]
Greg Quinn's avatar
Greg Quinn committed
    return (iet - base_time) // gran_len * gran_len + base_time


def get_aggregate_start(sat, gran_len, grans_per_aggr, iet):
Greg Quinn's avatar
Greg Quinn committed
    # see "DDS Aggregation Methodology" in CDFCB vol I
    Y = gran_len
    N = grans_per_aggr
    G_s = get_granule_start(sat, Y, iet)
Greg Quinn's avatar
Greg Quinn committed
    A_n = G_s // (N * Y)
Bruce Flynn's avatar
Bruce Flynn committed
    O = G_s % Y  # noqa
Greg Quinn's avatar
Greg Quinn committed
    A_s = A_n * (Y * N) + O
    return A_s


def get_aggregate_granule_times(sat, gran_len, aggr_level, aggr_iet):
    aggr_start = get_aggregate_start(sat, gran_len, aggr_level, aggr_iet)
    aggr_end = aggr_start + aggr_level * gran_len
    return get_overlapping_granules(sat, gran_len, aggr_start, aggr_end)
def get_overlapping_granules(sat, gran_len, start_iet, stop_iet):
    rv = []
    gran_iet = get_granule_start(sat, gran_len, start_iet)
    while gran_iet < stop_iet:
        rv.append(gran_iet)
        gran_iet += gran_len
    return rv
Bruce Flynn's avatar
Bruce Flynn committed
satellite_base_times = {"snpp": 1698019234000000, "j01": 1698019234000000}
platform_short_names = {"snpp": "NPP", "j01": "J01"}
instrument_short_names = {
    "viirs": "VIIRS",
    "cris": "CrIS",
    "atms": "ATMS",
    None: "SPACECRAFT",
Bruce Flynn's avatar
Bruce Flynn committed
}