Skip to content
Snippets Groups Projects
Commit 9c01ce88 authored by Max Drexler's avatar Max Drexler
Browse files

separate the amqp payload from grib metadata

parent 72bc2f53
No related branches found
No related tags found
1 merge request!2Switch to package setup
...@@ -34,16 +34,9 @@ from typing_extensions import Literal, TypedDict ...@@ -34,16 +34,9 @@ from typing_extensions import Literal, TypedDict
_XCD_MODELS = None _XCD_MODELS = None
class GribPayload(TypedDict): class GribMetadata(TypedDict):
"""Amqp payload for each grib message.""" """Metadata extracted from each grib message."""
__payload_gen_time__: datetime
__injector_script__: str
path: str
directory: str
file_name: str
server_ip: str
server_type: Literal["realtime", "archive"]
first_lat: float | None first_lat: float | None
first_lon: float | None first_lon: float | None
last_lat: float | None last_lat: float | None
...@@ -70,6 +63,22 @@ class GribPayload(TypedDict): ...@@ -70,6 +63,22 @@ class GribPayload(TypedDict):
title: str title: str
class GribPayload(GribMetadata):
"""Amqp payload for each grib message.
Includes metadata about the grib message and metadata about
the file and server itself.
"""
__payload_gen_time__: datetime
__injector_script__: str
path: str
directory: str
file_name: str
server_ip: str
server_type: Literal["realtime", "archive"]
def load_xcd_models( def load_xcd_models(
*addtnl_models: tuple[float, float, int, int, int, str, str], *addtnl_models: tuple[float, float, int, int, int, str, str],
) -> None: ) -> None:
...@@ -105,6 +114,50 @@ def xcd_lookup( ...@@ -105,6 +114,50 @@ def xcd_lookup(
return ("UNKWN", "UNKWN") return ("UNKWN", "UNKWN")
def extract_metadata(msg: grib2io.Grib2Message) -> GribMetadata:
"""Extract metadata about a grib2 message created using grib2io.
Returns:
dictionary with keys defined by GribMetadata.
"""
f_lat = getattr(msg, "latitudeFirstGridpoint", None)
f_lon = getattr(msg, "longitudeFirstGridpoint", None)
rows = msg.ny
cols = msg.nx
gen_proc = msg.generatingProcess
xcd_info = (
"UNKWN",
"UNKWN",
) # xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
return GribMetadata(
first_lat=f_lat,
last_lat=getattr(msg, "latitudeLastGridpoint", None),
first_lon=f_lon,
last_lon=getattr(msg, "longitudeLastGridpoint", None),
forecast_hour=int(msg.valueOfForecastTime),
run_hour=int(msg.hour) * 100 + int(msg.minute),
model_time=amqp_utils.format_datetime(msg.refDate),
start_time=amqp_utils.format_datetime(msg.refDate),
projection=msg.gridDefinitionTemplateNumber.definition,
center_id=int(msg.originatingCenter.value),
center_desc=msg.originatingCenter.definition,
level=msg.level,
parameter=msg.shortName,
param_long=msg.fullName,
param_units=msg.units,
grib_number=2,
size="%d x %d" % (rows, cols),
ensemble=getattr(msg, "pertubationNumber", None),
model_id=int(gen_proc.value),
model=gen_proc.definition,
xcd_model=xcd_info[1],
xcd_model_id=xcd_info[0],
# resolution=msg._xydivisor, THIS IS NOT CORRECT!
grib_record_number=msg._msgnum,
title=str(msg),
)
def itergrib( def itergrib(
grib_path: str, start_message: int | None = None grib_path: str, start_message: int | None = None
) -> Generator[GribPayload, None, None]: ) -> Generator[GribPayload, None, None]:
...@@ -120,15 +173,7 @@ def itergrib( ...@@ -120,15 +173,7 @@ def itergrib(
start_message = start_message or 0 start_message = start_message or 0
with grib2io.open(grib_path) as grib_file: with grib2io.open(grib_path) as grib_file:
for msg in grib_file[start_message:]: for msg in grib_file[start_message:]:
f_lat = getattr(msg, "latitudeFirstGridpoint", None) meta = extract_metadata(msg)
f_lon = getattr(msg, "longitudeFirstGridpoint", None)
rows = msg.ny
cols = msg.nx
gen_proc = msg.generatingProcess
xcd_info = (
"UNKWN",
"UNKWN",
) # xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
yield GribPayload( yield GribPayload(
__payload_gen_time__=amqp_utils.format_datetime(datetime.now()), __payload_gen_time__=amqp_utils.format_datetime(datetime.now()),
__injector_script__=amqp_utils.INJECTOR_SCRIPT, __injector_script__=amqp_utils.INJECTOR_SCRIPT,
...@@ -137,29 +182,5 @@ def itergrib( ...@@ -137,29 +182,5 @@ def itergrib(
file_name=os.path.basename(grib_path), file_name=os.path.basename(grib_path),
server_ip=amqp_utils.SERVER_NAME, server_ip=amqp_utils.SERVER_NAME,
server_type="realtime", server_type="realtime",
first_lat=f_lat, **meta,
last_lat=getattr(msg, "latitudeLastGridpoint", None),
first_lon=f_lon,
last_lon=getattr(msg, "longitudeLastGridpoint", None),
forecast_hour=int(msg.valueOfForecastTime),
run_hour=int(msg.hour) * 100 + int(msg.minute),
model_time=amqp_utils.format_datetime(msg.refDate),
start_time=amqp_utils.format_datetime(msg.refDate),
projection=msg.gridDefinitionTemplateNumber.definition,
center_id=int(msg.originatingCenter.value),
center_desc=msg.originatingCenter.definition,
level=msg.level,
parameter=msg.shortName,
param_long=msg.fullName,
param_units=msg.units,
grib_number=2,
size="%d x %d" % (rows, cols),
ensemble=getattr(msg, "pertubationNumber", None),
model_id=int(gen_proc.value),
model=gen_proc.definition,
xcd_model=xcd_info[1],
xcd_model_id=xcd_info[0],
# resolution=msg._xydivisor, THIS IS NOT CORRECT!
grib_record_number=msg._msgnum,
title=str(msg),
) )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment