Skip to content
Snippets Groups Projects
grib.py 5.16 KiB
"""
grib_processor.amqp
~~~~~~~~~~~~~~~~~~~

How the processor generates grib messages.
"""

from __future__ import annotations

from datetime import datetime
import os
from typing import Generator

import grib2io
from ssec_amqp import utils as amqp_utils
from typing_extensions import Literal, TypedDict


# Contains a serializable mapping of first_lat, first_lon, rows, cols,
# and generating_process_ids to xcd model names and ids.
#
# format:

# dict[first_lat,
#   dict[first_lon,
#       dict[rows,
#           dict[cols,
#               dict[generating_process_id, list[str]]
#           ]
#       ]
#   ]
# ]
# Loaded using load_xcd_models()
_XCD_MODELS = None


class GribPayload(TypedDict):
    """Amqp payload for each grib message."""

    __payload_gen_time__: datetime
    __injector_script__: str
    path: str
    directory: str
    file_name: str
    server_ip: str
    server_type: Literal["realtime", "archive"]
    first_lat: float | None
    first_lon: float | None
    last_lat: float | None
    last_lon: float | None
    forecast_hour: int
    run_hour: int
    model_time: datetime
    start_time: datetime
    projection: str
    center_id: int
    center_desc: str
    level: str
    parameter: str
    param_long: str
    param_units: str
    grib_number: Literal[1, 2]
    size: str
    ensemble: int | None
    model_id: int
    model: str
    xcd_model: str
    xcd_model_id: str
    grib_record_number: int
    title: str


def load_xcd_models(
    *addtnl_models: tuple[float, float, int, int, int, str, str],
) -> None:
    """Load the xcd models from the package."""
    pass


def xcd_lookup(
    first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int
) -> tuple[str, str]:
    """Looks up xcd model names and ids based on grib message properties.

    Args:
        first_lat (float): The first latitude of the grib message.
        first_lon (float): The first longitude of the grib message.
        rows (int): The number of rows in the grib message.
        cols (int): The number of columns in the grib message.
        gen_proc_id (int): The generating process id of the grib message.

    Returns:
        tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN")
        if can't find model info based on given properties.
    """
    if _XCD_MODELS is None:
        return ("UNKWN", "UNKWN")
    try:
        return tuple(
            _XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][
                str(gen_proc_id)
            ]
        )
    except KeyError:
        return ("UNKWN", "UNKWN")


def itergrib(
    grib_path: str, start_message: int | None = None
) -> Generator[GribPayload, None, None]:
    """Generate AMQP payloads from a grib file, starting at a specific message.

    Args:
        grib_path (str): The path to the grib file to create payloads for.
        start_message (int): Which grib message to start yielding payloads for.

    Yields:
        JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file.
    """
    start_message = start_message or 0
    with grib2io.open(grib_path) as grib_file:
        for msg in grib_file[start_message:]:
            f_lat = getattr(msg, "latitudeFirstGridpoint", None)
            f_lon = getattr(msg, "longitudeFirstGridpoint", None)
            rows = msg.ny
            cols = msg.nx
            gen_proc = msg.generatingProcess
            xcd_info = (
                "UNKWN",
                "UNKWN",
            )  # xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
            yield GribPayload(
                __payload_gen_time__=amqp_utils.format_datetime(datetime.now()),
                __injector_script__=amqp_utils.INJECTOR_SCRIPT,
                path=grib_path,
                directory=os.path.dirname(grib_path),
                file_name=os.path.basename(grib_path),
                server_ip=amqp_utils.SERVER_NAME,
                server_type="realtime",
                first_lat=f_lat,
                last_lat=getattr(msg, "latitudeLastGridpoint", None),
                first_lon=f_lon,
                last_lon=getattr(msg, "longitudeLastGridpoint", None),
                forecast_hour=int(msg.valueOfForecastTime),
                run_hour=int(msg.hour) * 100 + int(msg.minute),
                model_time=amqp_utils.format_datetime(msg.refDate),
                start_time=amqp_utils.format_datetime(msg.refDate),
                projection=msg.gridDefinitionTemplateNumber.definition,
                center_id=int(msg.originatingCenter.value),
                center_desc=msg.originatingCenter.definition,
                level=msg.level,
                parameter=msg.shortName,
                param_long=msg.fullName,
                param_units=msg.units,
                grib_number=2,
                size="%d x %d" % (rows, cols),
                ensemble=getattr(msg, "pertubationNumber", None),
                model_id=int(gen_proc.value),
                model=gen_proc.definition,
                xcd_model=xcd_info[1],
                xcd_model_id=xcd_info[0],
                # resolution=msg._xydivisor, THIS IS NOT CORRECT!
                grib_record_number=msg._msgnum,
                title=str(msg),
            )