-
Max Drexler authoredMax Drexler authored
grib.py 5.16 KiB
"""
grib_processor.amqp
~~~~~~~~~~~~~~~~~~~
How the processor generates grib messages.
"""
from __future__ import annotations
from datetime import datetime
import os
from typing import Generator
import grib2io
from ssec_amqp import utils as amqp_utils
from typing_extensions import Literal, TypedDict
# Contains a serializable mapping of first_lat, first_lon, rows, cols,
# and generating_process_ids to xcd model names and ids.
#
# format:
# dict[first_lat,
# dict[first_lon,
# dict[rows,
# dict[cols,
# dict[generating_process_id, list[str]]
# ]
# ]
# ]
# ]
# Loaded using load_xcd_models()
_XCD_MODELS = None
class GribPayload(TypedDict):
"""Amqp payload for each grib message."""
__payload_gen_time__: datetime
__injector_script__: str
path: str
directory: str
file_name: str
server_ip: str
server_type: Literal["realtime", "archive"]
first_lat: float | None
first_lon: float | None
last_lat: float | None
last_lon: float | None
forecast_hour: int
run_hour: int
model_time: datetime
start_time: datetime
projection: str
center_id: int
center_desc: str
level: str
parameter: str
param_long: str
param_units: str
grib_number: Literal[1, 2]
size: str
ensemble: int | None
model_id: int
model: str
xcd_model: str
xcd_model_id: str
grib_record_number: int
title: str
def load_xcd_models(
*addtnl_models: tuple[float, float, int, int, int, str, str],
) -> None:
"""Load the xcd models from the package."""
pass
def xcd_lookup(
first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int
) -> tuple[str, str]:
"""Looks up xcd model names and ids based on grib message properties.
Args:
first_lat (float): The first latitude of the grib message.
first_lon (float): The first longitude of the grib message.
rows (int): The number of rows in the grib message.
cols (int): The number of columns in the grib message.
gen_proc_id (int): The generating process id of the grib message.
Returns:
tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN")
if can't find model info based on given properties.
"""
if _XCD_MODELS is None:
return ("UNKWN", "UNKWN")
try:
return tuple(
_XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][
str(gen_proc_id)
]
)
except KeyError:
return ("UNKWN", "UNKWN")
def itergrib(
grib_path: str, start_message: int | None = None
) -> Generator[GribPayload, None, None]:
"""Generate AMQP payloads from a grib file, starting at a specific message.
Args:
grib_path (str): The path to the grib file to create payloads for.
start_message (int): Which grib message to start yielding payloads for.
Yields:
JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file.
"""
start_message = start_message or 0
with grib2io.open(grib_path) as grib_file:
for msg in grib_file[start_message:]:
f_lat = getattr(msg, "latitudeFirstGridpoint", None)
f_lon = getattr(msg, "longitudeFirstGridpoint", None)
rows = msg.ny
cols = msg.nx
gen_proc = msg.generatingProcess
xcd_info = (
"UNKWN",
"UNKWN",
) # xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
yield GribPayload(
__payload_gen_time__=amqp_utils.format_datetime(datetime.now()),
__injector_script__=amqp_utils.INJECTOR_SCRIPT,
path=grib_path,
directory=os.path.dirname(grib_path),
file_name=os.path.basename(grib_path),
server_ip=amqp_utils.SERVER_NAME,
server_type="realtime",
first_lat=f_lat,
last_lat=getattr(msg, "latitudeLastGridpoint", None),
first_lon=f_lon,
last_lon=getattr(msg, "longitudeLastGridpoint", None),
forecast_hour=int(msg.valueOfForecastTime),
run_hour=int(msg.hour) * 100 + int(msg.minute),
model_time=amqp_utils.format_datetime(msg.refDate),
start_time=amqp_utils.format_datetime(msg.refDate),
projection=msg.gridDefinitionTemplateNumber.definition,
center_id=int(msg.originatingCenter.value),
center_desc=msg.originatingCenter.definition,
level=msg.level,
parameter=msg.shortName,
param_long=msg.fullName,
param_units=msg.units,
grib_number=2,
size="%d x %d" % (rows, cols),
ensemble=getattr(msg, "pertubationNumber", None),
model_id=int(gen_proc.value),
model=gen_proc.definition,
xcd_model=xcd_info[1],
xcd_model_id=xcd_info[0],
# resolution=msg._xydivisor, THIS IS NOT CORRECT!
grib_record_number=msg._msgnum,
title=str(msg),
)