Skip to content
Snippets Groups Projects
Commit 33f009c0 authored by Max Drexler's avatar Max Drexler
Browse files

initial package commit

parent 8828cbef
No related branches found
No related tags found
1 merge request!2Switch to package setup
...@@ -46,6 +46,7 @@ This is the code that generates the payload. The payload is then converted into ...@@ -46,6 +46,7 @@ This is the code that generates the payload. The payload is then converted into
| Name | Type | Meaning | | Name | Type | Meaning |
| :--: | :--: | :-----: | | :--: | :--: | :-----: |
| `__payload_gen_time__` | datetime (str) | when the payload was generated. | | `__payload_gen_time__` | datetime (str) | when the payload was generated. |
| `__injector_script__` | str | the user@server:path/to/script that generated the payload. |
| `path` | str | fully qualified path to the grib file with this message. | | `path` | str | fully qualified path to the grib file with this message. |
| `directory` | str | directory of `path`. | | `directory` | str | directory of `path`. |
| `file_name` | str | file name of `path`. | | `file_name` | str | file name of `path`. |
......
#! /bin/bash
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
BASE="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
PYTHON=${PYTHON:-python3}
exec $PYTHON $BASE/grib_processor.py "$@"
"""
grib_processor
~~~~~~~~~~~~~~
Ingest grib files and publish metadata for files to RabbitMQ.
"""
from grib_processor.grib import GribMessage, itergrib
from grib_processor.main import dump_message, publish_message
from grib_processor.utils import grib_file_watch
__author__ = "Max Drexler"
__email__ = "mndrexler@wisc.edu"
__version__ = '0.0.1'
__all__ = [
'GribMessage',
'itergrib',
'dump_message',
'publish_message',
'grib_file_watch',
]
"""
grib_processor.__main__
~~~~~~~~~~~~~~~~~~~~~~~
Entrypoint to grib processing when using python -m grib_processor.
"""
import sys
from grib_processor.main import main
if __name__ == "__main__":
sys.exit(main())
""" {
Ingest grib files and publish metadata for files to RabbitMQ.
"""
from __future__ import annotations
__author__ = "Max Drexler"
__email__ = "mndrexler@wisc.edu"
import argparse
from collections import defaultdict
from datetime import datetime
import os
import logging
import logging.handlers
import sys
import time
import threading
from typing import DefaultDict, Generator
import grib2io
from watchfiles import watch, Change
import ssec_amqp.api as mq
import ssec_amqp.utils as amqp_utils
from dotenv import load_dotenv
if sys.version_info < (3, 8):
raise SystemError("Python version too low")
# logging stuff
LOG = logging.getLogger("grib_ingest")
LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs")
LOG_LEVELS = [
logging.CRITICAL,
logging.ERROR,
logging.WARNING,
logging.INFO,
logging.DEBUG,
]
LOG_NAME = os.path.splitext(os.path.basename(sys.argv[0]))[0] + ".log"
# Where we're publishing data to
DEFAULT_AMQP_SERVERS = ["mq1.ssec.wisc.edu", "mq2.ssec.wisc.edu", "mq3.ssec.wisc.edu"]
# Where on the servers we're putting the data.
DEFAULT_AMQP_EXCHANGE = "model"
# RabbitMQ default login details (shouldn't be authenticated to publish).
DEFAULT_AMQP_USER = "guest"
DEFAULT_AMQP_PASS = "guest"
# Format of the route key to publish to
ROUTE_KEY_FMT = "{xcd_model}.{xcd_model_id}.realtime.reserved.reserved.2"
# What file extensions to parse as grib files
GRIB_ENDINGS = [".grb", ".grib", ".grb2", ".grib2"]
# How long grib files stay in the message cache.
CACHE_TTL = 43200 # (12 hours)
# How often to clean the cache of previous messages.
CACHE_CLEAN_INTERVAL = 5400 # (1.5 hours)
# Table containing XCD model and model ids for grib messages
# [first_lat][first_lon][rows][cols][gen_proc_id]
XCD_MODELS = {
"44.196": {"174.759": {"237": {"377": {"115": ["DGEX-AKPS", "DGEX"]}}}}, "44.196": {"174.759": {"237": {"377": {"115": ["DGEX-AKPS", "DGEX"]}}}},
"19.943": {"234.907": {"303": {"491": {"115": ["DGEX-USLC", "DGEX"]}}}}, "19.943": {"234.907": {"303": {"491": {"115": ["DGEX-USLC", "DGEX"]}}}},
"0.0": { "0.0": {
...@@ -793,351 +726,4 @@ XCD_MODELS = { ...@@ -793,351 +726,4 @@ XCD_MODELS = {
"53.82": {"193.25": {"300": {"296": {"15": ["NWPS-AKMEUNK", "NWPS"]}}}}, "53.82": {"193.25": {"300": {"296": {"15": ["NWPS-AKMEUNK", "NWPS"]}}}},
"57.8": {"200.8": {"296": {"296": {"15": ["NWPS-AKMEUN2", "NWPS"]}}}}, "57.8": {"200.8": {"296": {"296": {"15": ["NWPS-AKMEUN2", "NWPS"]}}}},
"37.979669": {"234.042695": {"795": {"709": {"118": ["URMA-USLCDR2", "URMA"]}}}}, "37.979669": {"234.042695": {"795": {"709": {"118": ["URMA-USLCDR2", "URMA"]}}}},
} }
\ No newline at end of file
def xcd_lookup(
first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int
) -> tuple[str, str]:
"""Looks up xcd model names and ids based on grib message properties.
Args:
first_lat (float): The first latitude of the grib message.
first_lon (float): The first longitude of the grib message.
rows (int): The number of rows in the grib message.
cols (int): The number of columns in the grib message.
gen_proc_id (int): The generating process id of the grib message.
Returns:
tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN")
if can't find model info based on given properties.
"""
try:
return tuple(
XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][
str(gen_proc_id)
]
)
except KeyError:
return ("UNKWN", "UNKWN")
# argparse types
def non_negative_int(_arg: str) -> int:
i = int(_arg)
if i < 0:
raise ValueError
return i
def positive_int(_arg: str) -> int:
i = int(_arg)
if i <= 0:
raise ValueError
return i
def setup() -> argparse.Namespace:
"""Initialize the script so it's ready to run."""
# Set up cmdline args
parser = argparse.ArgumentParser(
description="Set paramters for ingesting and publishing of grib data."
)
parser.add_argument(
"grib_dir",
help="Directory to watch for new grib files, use -- to read file paths from stdin.",
)
parser.add_argument(
"-v",
"--verbosity",
type=non_negative_int,
default=3,
help="Specify verbosity to stderr from 0 (CRITICAL) to 5 (DEBUG).",
)
parser.add_argument(
"--watch-debounce",
type=non_negative_int,
default=5000,
help="maximum time in milliseconds to group grib file changes over before processing them.",
)
parser.add_argument(
"--log-dir", default=LOG_DIR, help="Directory to put rotating file logs into."
)
parser.add_argument(
"--testing",
action="store_true",
help='Prepends route key with "null.".',
)
ch_group = parser.add_argument_group(
"cache arguments",
"Specify how the cache will behave (the defaults work best for standard usage).",
)
ch_group.add_argument(
"--no-cache",
action="store_true",
help="Turn the cache off. Could be useful if watched files are only written to once.",
)
ch_group.add_argument(
"--clean-interval",
metavar="SECONDS",
type=positive_int,
default=CACHE_CLEAN_INTERVAL,
help="How often to clean the message cache.",
)
ch_group.add_argument(
"--cache-ttl",
metavar="SECONDS",
type=positive_int,
default=CACHE_TTL,
help="How long paths stay in the cache for.",
)
mq_group = parser.add_argument_group(
"RabbitMQ arguments", "arguments to specify how to publish the data."
)
mq_group.add_argument(
"--servers",
nargs="+",
default=DEFAULT_AMQP_SERVERS,
help="Servers to publish AMQP messages to. Defaults to %(default)s",
)
mq_group.add_argument(
"-X",
"--exchange",
default=DEFAULT_AMQP_EXCHANGE,
help='RabbitMQ exchange to publish the messages to. Defaults to "%(default)s"',
)
mq_group.add_argument(
"-u",
"--user",
default=None,
help="RabbitMQ user to login with. Can also specify using a .env file, see README#configuration.",
)
mq_group.add_argument(
"-p",
"--password",
default=None,
help="RabbitMQ password to login with. Can also specify using a .env file, see README#configuration.",
)
args = parser.parse_args()
# Verify parsed arguments
if not os.path.isdir(args.grib_dir):
if args.grib_dir == "--":
raise NotImplementedError(
"Reading from stdin is not yet implemented ¯\_(ツ)_/¯"
)
else:
parser.error("{0} is not a valid directory!".format(args.grib_dir))
if args.no_cache:
raise NotImplementedError(
"Turning the cache off is not yet implemented ¯\_(ツ)_/¯"
)
if not os.path.isdir(args.log_dir):
os.mkdir(args.log_dir)
if args.testing:
global ROUTE_KEY_FMT
ROUTE_KEY_FMT = "null.{xcd_model}.{xcd_model_id}.realtime.reserved.2"
# Set up logging
log_formatter = logging.Formatter(
fmt="[%(asctime)s][%(levelname)s]-%(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
file_handler = logging.handlers.TimedRotatingFileHandler(
os.path.join(args.log_dir, LOG_NAME), when="D", utc=True
)
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
out_handler = logging.StreamHandler()
out_handler.setFormatter(log_formatter)
out_handler.setLevel(LOG_LEVELS[min(args.verbosity, len(LOG_LEVELS) - 1)])
LOG.addHandler(file_handler)
LOG.addHandler(out_handler)
LOG.setLevel(logging.DEBUG)
return args
def _cache_entry() -> tuple[float, int]:
"""default cache entry (current time, first grib message)."""
return time.time(), 0
def _cache_cleaner(
cache: DefaultDict[str, tuple[float, int]], interval: float, ttl: float
) -> None:
"""Called from another thread, continually cleans cache in order to
prevent memory leaks.
"""
while True:
to_remove = []
LOG.debug("Cleaning the message cache, size=%d!", len(cache))
for path, (last_update_time, last_msg_num) in cache.items():
time_in_cache = time.time() - last_update_time
if time_in_cache > ttl:
LOG.info(
"Evicted %s from cache! (last message: %d, time since last update: %f).",
path,
last_msg_num - 1,
time_in_cache,
)
to_remove.append(path)
if not to_remove:
LOG.debug("Nothing to remove from cache...")
else:
for rem_path in to_remove:
cache.pop(rem_path, None)
# LOG.debug('Cleaning grib2io caches')
# grib2io._grib2io._msg_class_store.clear()
# grib2io._grib2io._latlon_datastore.clear()
time.sleep(interval)
def watch_filter(change: Change, path: str) -> bool:
"""Make sure files we ingest are grib files."""
if change == Change.deleted:
return False
if os.path.splitext(path)[1] in GRIB_ENDINGS:
return True
return False
def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None]:
"""Generate AMQP payloads from a grib file, starting at a specific message.
Args:
grib_path (str): The path to the grib file to create payloads for.
start_message (int): Which grib message to start yielding payloads for.
Yields:
JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file.
"""
with grib2io.open(grib_path) as grib_file:
for msg in grib_file[start_message:]:
f_lat = getattr(msg, "latitudeFirstGridpoint", None)
f_lon = getattr(msg, "longitudeFirstGridpoint", None)
rows = msg.ny
cols = msg.nx
gen_proc = msg.generatingProcess
xcd_info = xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
yield {
"__payload_gen_time__": amqp_utils.format_datetime(datetime.now()),
"__injector_script__": amqp_utils.INJECTOR_SCRIPT,
"path": grib_path,
"directory": os.path.dirname(grib_path),
"file_name": os.path.basename(grib_path),
"server_ip": amqp_utils.SERVER_NAME,
"server_type": "realtime",
"first_lat": f_lat,
"last_lat": getattr(msg, "latitudeLastGridpoint", None),
"first_lon": f_lon,
"last_long": getattr(msg, "longitudeLastGridpoint", None),
"forecast_hour": int(msg.valueOfForecastTime),
"run_hour": int(msg.hour) * 100 + int(msg.minute),
"model_time": amqp_utils.format_datetime(msg.refDate),
"start_time": amqp_utils.format_datetime(msg.refDate),
"projection": msg.gridDefinitionTemplateNumber.definition,
"center_id": int(msg.originatingCenter.value),
"center_desc": msg.originatingCenter.definition,
"level": msg.level,
"parameter": msg.shortName,
"param_long": msg.fullName,
"param_units": msg.units,
"grib_number": 2,
"size": "%d x %d" % (rows, cols),
"ensemble": getattr(msg, "pertubationNumber", None),
"model_id": int(gen_proc.value),
"model": gen_proc.definition,
"xcd_model": xcd_info[1],
"xcd_model_id": xcd_info[0],
# "resolution": msg._xydivisor, THIS IS NOT CORRECT!
"grib_record_number": msg._msgnum,
"title": str(msg),
}
def gribs_from_dir(
watch_dir: str, cache: DefaultDict[str, tuple[float, int]], watch_debounce: int
) -> None:
"""Process and publish grib files by watching a directory.
Args:
watch_dir (str): The directory to watch for grib files.
cache (DefaultDict[str, tuple[int, float]]): The cache to use to store grib messages in (if messages arrive in chunks).
watch_debounce (int): debounce for watch.
"""
LOG.info("Watching %s for gribs with debounce=%d", watch_dir, watch_debounce)
for changes in watch(
watch_dir,
watch_filter=watch_filter,
debounce=watch_debounce,
):
for _, path in changes:
next_msg = cache[path][1]
LOG.debug("Got grib file %s (next msg to process: %d).", path, next_msg)
msg_num = 0 # declare here to avoid unbound errors.
try:
for msg_num, amqp in enumerate(amqp_grib(path, next_msg)):
route_key = ROUTE_KEY_FMT.format(
xcd_model=amqp["xcd_model"], xcd_model_id=amqp["xcd_model_id"]
)
LOG.debug(
"Publishing %s msg %d to %s",
path,
next_msg + msg_num,
route_key,
)
mq.publish(amqp, route_key=route_key)
except (ValueError, KeyError, OSError):
LOG.exception("Error parsing grib file %s!", path)
else:
cache[path] = time.time(), next_msg + msg_num + 1
def main() -> None:
"""Main method, setup and start processing."""
try:
args = setup()
if not load_dotenv():
LOG.warning("Couldn't find .env file")
mq.connect(
*args.servers,
user=args.user or os.getenv("RMQ_USER", DEFAULT_AMQP_USER),
password=args.password or os.getenv("RMQ_PASS", DEFAULT_AMQP_PASS),
exchange=args.exchange,
)
LOG.info("Connected to %s", ", ".join(args.servers))
# maybe add loading/storing cache to disk after exits?
cache: DefaultDict[str, tuple[float, int]] = defaultdict(_cache_entry)
LOG.debug("Starting cache cleaner")
threading.Thread(
name="grib_pipeline_cleaner",
target=_cache_cleaner,
args=(cache, args.clean_interval, args.cache_ttl),
daemon=True,
).start()
except KeyboardInterrupt:
mq.disconnect()
return
try:
gribs_from_dir(args.grib_dir, cache, args.watch_debounce)
except KeyboardInterrupt:
LOG.critical("Got interrupt, goodbye!")
mq.disconnect()
return
if __name__ == "__main__":
sys.exit(main())
"""
grib_processor.amqp
~~~~~~~~~~~~~~~~~~~
How the processor generates grib messages.
"""
from __future__ import annotations
from datetime import datetime
import os
from typing import Generator
import grib2io
from ssec_amqp import utils as amqp_utils
from typing_extensions import Literal, TypedDict
# Contains a serializable mapping of first_lat, first_lon, rows, cols,
# and generating_process_ids to xcd model names and ids.
#
# format:
# dict[first_lat,
# dict[first_lon,
# dict[rows,
# dict[cols,
# dict[generating_process_id, list[str]]
# ]
# ]
# ]
# ]
# Loaded using load_xcd_models()
_XCD_MODELS = None
class GribMessage(TypedDict):
"""Amqp payload for each grib message.
"""
__payload_gen_time__: datetime
__injector_script__: str
path: str
directory: str
file_name: str
server_ip: str
server_type: Literal['realtime', 'archive']
first_lat: float | None
first_lon: float | None
last_lat: float | None
last_lon: float | None
forecast_hour: int
run_hour: int
model_time: datetime
start_time: datetime
projection: str
center_id: int
center_desc: str
level: str
parameter: str
param_long: str
param_units: str
grib_number: Literal[1, 2]
size: str
ensemble: int | None
model_id: int
model: str
xcd_model: str
xcd_model_id: str
grib_record_number: int
title: str
def load_xcd_models(*addtnl_models: tuple[float, float, int, int, int, str, str]) -> None:
"""Load the xcd models from the package.
"""
pass
def xcd_lookup(
first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int
) -> tuple[str, str]:
"""Looks up xcd model names and ids based on grib message properties.
Args:
first_lat (float): The first latitude of the grib message.
first_lon (float): The first longitude of the grib message.
rows (int): The number of rows in the grib message.
cols (int): The number of columns in the grib message.
gen_proc_id (int): The generating process id of the grib message.
Returns:
tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN")
if can't find model info based on given properties.
"""
if _XCD_MODELS is None:
return ("UNKWN", "UNKWN")
try:
return tuple(
_XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][
str(gen_proc_id)
]
)
except KeyError:
return ("UNKWN", "UNKWN")
def itergrib(grib_path: str, start_message: int | None = None) -> Generator[GribMessage, None, None]:
"""Generate AMQP payloads from a grib file, starting at a specific message.
Args:
grib_path (str): The path to the grib file to create payloads for.
start_message (int): Which grib message to start yielding payloads for.
Yields:
JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file.
"""
start_message = start_message or 0
with grib2io.open(grib_path) as grib_file:
for msg in grib_file[start_message:]:
f_lat = getattr(msg, "latitudeFirstGridpoint", None)
f_lon = getattr(msg, "longitudeFirstGridpoint", None)
rows = msg.ny
cols = msg.nx
gen_proc = msg.generatingProcess
xcd_info = xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
yield GribMessage(
__payload_gen_time__=amqp_utils.format_datetime(datetime.now()),
__injector_script__=amqp_utils.INJECTOR_SCRIPT,
path=grib_path,
directory=os.path.dirname(grib_path),
file_name=os.path.basename(grib_path),
server_ip=amqp_utils.SERVER_NAME,
server_type="realtime",
first_lat= f_lat,
last_lat=getattr(msg, "latitudeLastGridpoint", None),
first_lon=f_lon,
last_long=getattr(msg, "longitudeLastGridpoint", None),
forecast_hour=int(msg.valueOfForecastTime),
run_hour=int(msg.hour) * 100 + int(msg.minute),
model_time=amqp_utils.format_datetime(msg.refDate),
start_time=amqp_utils.format_datetime(msg.refDate),
projection=msg.gridDefinitionTemplateNumber.definition,
center_id=int(msg.originatingCenter.value),
center_desc=msg.originatingCenter.definition,
level=msg.level,
parameter=msg.shortName,
param_long=msg.fullName,
param_units=msg.units,
grib_number=2,
size="%d x %d" % (rows, cols),
ensemble=getattr(msg, "pertubationNumber", None),
model_id=int(gen_proc.value),
model=gen_proc.definition,
xcd_model=xcd_info[1],
xcd_model_id=xcd_info[0],
#resolution=msg._xydivisor, THIS IS NOT CORRECT!
grib_record_number=msg._msgnum,
title=str(msg),
)
\ No newline at end of file
"""
grib_processor.main
~~~~~~~~~~~~~~~~~~~
Unified entrypoint to the grib processor.
"""
from __future__ import annotations
import argparse
from itertools import chain
import json
import logging
import logging.handlers
import os
import signal
import sys
from typing import Callable, Generator
import warnings
from dotenv import load_dotenv
import ssec_amqp.api as mq
from grib_processor.grib import GribMessage, itergrib
from grib_processor.utils import dump_message, grib_file_watch, publish_message, realtime, DEFAULT_WATCH_DEBOUNCE, REALTIME_WATCH_DEBOUNCE, signalcontext
# Logging stuff
LOG = logging.getLogger('grib_ingest')
LOG_LEVELS = [
logging.CRITICAL,
logging.ERROR,
logging.WARNING,
logging.INFO,
logging.DEBUG,
]
LOG_NAME = 'grib_processor.log'
# Where we're publishing data to
DEFAULT_AMQP_SERVERS = ["mq1.ssec.wisc.edu", "mq2.ssec.wisc.edu", "mq3.ssec.wisc.edu"]
# Where on the servers we're putting the data.
DEFAULT_AMQP_EXCHANGE = "model"
def initialize_logging(verbosity: int, rotating_dir: str | None = None) -> None:
"""Set up logging for the package. Don't use basicConfig so we don't
override other's logs.
Optionally, can specify an existing directory to put rotating log files into.
"""
# Set up logging
log_formatter = logging.Formatter(
fmt="[%(asctime)s][%(levelname)s]-%(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
if rotating_dir is not None:
file_handler = logging.handlers.TimedRotatingFileHandler(
os.path.join(rotating_dir, LOG_NAME), when="D", utc=True
)
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
LOG.addHandler(file_handler)
out_handler = logging.StreamHandler()
out_handler.setFormatter(log_formatter)
out_handler.setLevel(LOG_LEVELS[min(verbosity, len(LOG_LEVELS) - 1)])
LOG.addHandler(out_handler)
LOG.setLevel(logging.DEBUG)
def setup() -> tuple[Generator[GribMessage, None, None], Callable[[GribMessage], None]]:
"""Initialize the script so it's ready to run."""
# Set up cmdline args
parser = argparse.ArgumentParser(
description="Set paramters for ingesting and publishing of grib data."
)
parser.add_argument(
"grib_src",
metavar='grib_source',
help="Directory to watch for new grib files, file to read grib paths from, or '--' to read file paths from stdin.",
)
parser.add_argument(
"-v",
"--verbosity",
type=int,
default=3,
help="Specify verbosity to stderr from 0 (CRITICAL) to 5 (DEBUG).",
)
parser.add_argument(
"--log-dir", default=None, help="Set up a rotating file logger in this directory."
)
parser.add_argument(
'-R', '--realtime',
action='store_true',
help='Turns on parameters for more efficient realtime processing like caching and debounce. Also adds more robust error handling.'
)
parser.add_argument(
'-r', '--recursive',
action='store_true',
help='When using a directory as the source, process all files in the directory recursively instead of setting up a watch.'
)
parser.add_argument(
'-o', '--output',
choices=['amqp', 'json'],
default='amqp',
help='Where to output processed grib messages to. Default is %(default)s.'
)
mq_group = parser.add_argument_group(
"RabbitMQ arguments", "Options for how to output grib messages when using '-o=amqp'."
)
mq_group.add_argument(
"--servers",
nargs="+",
default=DEFAULT_AMQP_SERVERS,
help="Servers to publish AMQP messages to. Defaults to %(default)s",
)
mq_group.add_argument(
"-X",
"--exchange",
default=DEFAULT_AMQP_EXCHANGE,
help='RabbitMQ exchange to publish the messages to. Defaults to "%(default)s"',
)
mq_group.add_argument(
"-u",
"--user",
default=None,
help="RabbitMQ user to login with. Can also specify using a .env file, see README#configuration.",
)
mq_group.add_argument(
"-p",
"--password",
default=None,
help="RabbitMQ password to login with. Can also specify using a .env file, see README#configuration.",
)
args = parser.parse_args()
if args.verbosity < 0:
parser.error("--verbosity cannot be negative!")
initialize_logging(args.verbosity, args.log_dir)
# Get an iterator over grib files to process
if os.path.isdir(args.grib_src):
LOG.info("Sourcing grib files from directory watch @ %s realtime=%s", args.grib_src, args.realtime)
if not args.realtime:
warnings.warn('Processing files from a directory without using --realtime!')
debounce = DEFAULT_WATCH_DEBOUNCE
else:
debounce = REALTIME_WATCH_DEBOUNCE
file_iter = grib_file_watch(args.grib_src, debounce=debounce, recursive=args.recursive)
elif os.path.isfile(args.grib_src):
LOG.info("Sourcing grib file directly from CLI, %s", args.grib_src)
file_iter = (args.grib_src,)
elif args.grib_src == "--":
LOG.info("Sourcing grib files from stdin, realtime=%s", args.realtime)
file_iter = map(str.strip, iter(sys.stdin.readline, ''))
else:
parser.error("{0} is not a valid source!".format(args.grib_src))
# Get an iterator of grib messages to emit
if args.realtime:
source = realtime(file_iter)
else:
source = chain.from_iterable(map(itergrib, file_iter))
# Possible to source connection parameters from the environment
if not load_dotenv():
LOG.warning("Couldn't find .env file")
if args.output == 'amqp':
LOG.info("Emitting messages to amqp")
# will automatically disconnect on exit
mq.connect(
*args.servers,
user=args.user or os.getenv("RMQ_USER", None),
password=args.password or os.getenv("RMQ_PASS", None),
exchange=args.exchange,
)
LOG.info("Connected to %s", ", ".join(args.servers))
emit = publish_message
else:
LOG.info("Emitting messages to stdout")
emit = dump_message
return source, emit
def __raise_interrupt(_sig, _frame): # noqa: ARG001
"""Signal handler that raises KeybardInterrupt."""
raise KeyboardInterrupt
def main() -> None:
"""Main method, setup and start processing."""
try:
source, emit = setup()
except KeyboardInterrupt:
return
with signalcontext(signal.SIGTERM, __raise_interrupt):
try:
for grib_msg in source:
LOG.debug('Got %s from source', grib_msg)
emit(grib_msg)
except KeyboardInterrupt:
LOG.critical("Got interrupt, goodbye!")
return
"""
grib_processor.sources
~~~~~~~~~~~~~~~~~~~~~~
The different ways to source grib files for processing.
"""
from __future__ import annotations
import atexit
from collections import defaultdict
from contextlib import contextmanager
import json
import logging
import os
import signal
import time
from typing import Callable, DefaultDict, Generator, Iterator, Tuple
from watchfiles import watch, Change
from typing_extensions import TypeAlias
from grib_processor.grib import GribMessage, itergrib
# Our logger
LOG = logging.getLogger('grib_ingest')
# How long grib files stay in the message cache.
CACHE_TTL = 43200 # (12 hours)
# How often to clean the cache of previous messages.
CACHE_CLEAN_INTERVAL = 5400 # (1.5 hours)
# What file extensions to parse as grib files
GRIB_ENDINGS = [".grb", ".grib", ".grb2", ".grib2"]
# Debounce is how long to group grib file changes over before processing them
# watchfiles default debounce
DEFAULT_WATCH_DEBOUNCE = 1600
# debounce when doing realtime processing
REALTIME_WATCH_DEBOUNCE = 4500
# Format of the route key to messages publish with
GRIB_ROUTE_KEY_FMT = "{xcd_model}.{xcd_model_id}.realtime.reserved.reserved.2"
# Cache path
REALTIME_CACHE_PATH = os.path.join(os.path.expanduser('~/.cache/'), 'grib_processor_cache.json')
# Types used for realtime caching
_CEntry: TypeAlias = Tuple[float, int]
_Cache: TypeAlias = DefaultDict[str, _CEntry]
@contextmanager
def signalcontext(signum: int, handler: Callable):
"""Context manager that changes a signal handler on entry and resets it
on exit.
Args:
signum (int): Signal to change.
handler (tp.Callable): New signal handler to use.
"""
try:
orig_h = signal.signal(signum, handler)
yield
finally:
signal.signal(signum, orig_h)
def _default_cache_entry() -> _CEntry:
"""default cache entry (current time, first grib message)."""
return time.time(), 0
def _initialize_cache() -> _Cache:
"""Sets up the realtime cache to use. Potentially loading previously saved
caches.
Returns:
_Cache: Cache of file names to last grib message processed.
"""
LOG.info("Loading cache from disk")
try:
with open(REALTIME_CACHE_PATH, 'r', encoding='utf-8') as cache_file:
cache_data = json.load(cache_file)
LOG.info("realtime - cache loaded from disk at %s", REALTIME_CACHE_PATH)
except (OSError, json.JSONDecodeError):
LOG.exception("Error loading previous cache data from %s", REALTIME_CACHE_PATH)
cache_data = {}
return defaultdict(_default_cache_entry, cache_data)
def _save_cache(cache: _Cache) -> None:
"""Saves a cache to disk at the path REALTIME_CACHE_PATH.
Args:
cache (_Cache): The cache to save to disk.
"""
LOG.info("realtime - saving cache to disk")
if not cache:
LOG.debug("realtime - cache is empty")
return
try:
with open(REALTIME_CACHE_PATH, 'w', encoding='utf-8') as cache_file:
json.dump(cache, cache_file)
LOG.info("realtime - cache saved to disk at %s", REALTIME_CACHE_PATH)
except (OSError, ValueError):
LOG.exception("Couldn't save cache to %s", REALTIME_CACHE_PATH)
def _clean_caches(realtime_cache: _Cache) -> None:
"""Removes expired items from the cache. Also, cleans caches
used by 3rd party libraries (grib2io) that could cause memory leaks
in long-running processing.
Args:
cache (_Cache): The realtime cache to clean.
"""
to_remove = []
LOG.debug("realtime - Cleaning the cache, size=%d!", len(realtime_cache))
for path, (last_update_time, last_msg_num) in realtime_cache.items():
time_in_cache = time.time() - last_update_time
if time_in_cache > CACHE_TTL:
LOG.info(
"realtime - Evicted %s from cache! (last message: %d, time since last update: %f).",
path,
last_msg_num - 1,
time_in_cache,
)
to_remove.append(path)
if not to_remove:
LOG.debug("realtime - Nothing to remove from cache...")
else:
for rem_path in to_remove:
realtime_cache.pop(rem_path, None)
# LOG.debug('Cleaning grib2io caches')
# grib2io._grib2io._msg_class_store.clear()
# grib2io._grib2io._latlon_datastore.clear()
def realtime(file_iter: Iterator[str]) -> Generator[GribMessage, None, None]:
"""Add message caching and exception handling to a grib file iterator. This
is useful if the iterator returns files that aren't fully written to or
duplicates.
Args:
file_iter (Iterator[str]): An iterator returning paths to grib files.
Yields:
Generator[GribMessage, None, None]: A GribMessage generator.
"""
cache = _initialize_cache()
_clean_caches(cache)
last_clean_time = time.time()
atexit.register(_save_cache, cache)
for file in file_iter:
LOG.info('realtime - got file %s', file)
next_msg = cache.get(file, (None, None))[1]
if next_msg is None:
LOG.debug("realtime - cache miss %s", file)
cache[file] = _default_cache_entry()
next_msg = 0
else:
LOG.debug('realtime - cache hit %s @ %d', file, next_msg)
msg_num = None
try:
for msg_num, msg in enumerate(itergrib(file, start_message=next_msg), next_msg):
LOG.debug('realtime - got msg %s:%d', file, msg_num)
yield msg
except (KeyError, OSError):
LOG.exception("realtime - Error processing grib file %s @ msg %d", file, msg_num or 0)
if msg_num is not None: # msg_num is None when exception or end of grib file
cache[file] = time.time(), msg_num + 1
if time.time() - last_clean_time > CACHE_CLEAN_INTERVAL:
LOG.info('realtime - time to clean the cache')
_clean_caches(cache)
def grib_filter(change: Change, path: str) -> bool:
"""Make sure files we ingest are grib files."""
if change == Change.deleted:
# Can't process a deleted file
return False
if os.path.splitext(path)[1] in GRIB_ENDINGS:
return True
return False
def grib_file_watch(
watch_dir: str, debounce: int | None = None, recursive=False
) -> Generator[str, None, None]:
"""Get grib files by watching a directory for changes.
Args:
watch_dir (str): The directory to watch for grib files.
debounce (int | None): How long to wait to group file changes together before yielding.
recursive (bool): Watch directories under watch_dir recursively?
Returns:
Generator[str, None, None]: A grib file path generator.
"""
debounce = debounce or DEFAULT_WATCH_DEBOUNCE
LOG.info("Watching directory %s for gribs with debounce=%d, recursive=%s", watch_dir, debounce, recursive)
for changes in watch(
watch_dir,
watch_filter=grib_filter,
debounce=debounce,
recursive=recursive,
):
for _, path in changes:
yield path
def publish_message(msg: GribMessage) -> None:
"""Publish a message to RabbitMQ servers using the ssec_amqp package.
This requires previous setup using ssec_amqp.api.connect().
"""
route_key = GRIB_ROUTE_KEY_FMT.format(xcd_model=msg.get('xcd_model'), xcd_model_id=msg.get('xcd_model_id'))
status = mq.publish(msg, route_key=route_key)
LOG.info('Published to %s. status: %s', route_key, status)
def dump_message(msg: GribMessage) -> None:
"""Print a message to stdout.
"""
print(json.dumps(msg), flush=True)
[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "grib_processor"
dependencies = [
"grib2io >= 2.2.0",
"watchfiles >= 0.20.0",
"python-dotenv >= 1.0.0",
"quickmq >= 1.1.0",
"typing-extensions",
]
requires-python = ">=3.8"
authors = [
{name = "Max Drexler", email="mndrexler@wisc.edu"}
]
description = "Ingest and publish GRIB metadata to RabbitMQ servers. Made for the SSEC."
readme = "README.md"
license = { file = "LICENSE" }
keywords = ["GRIB", "inventory", "SDS", "RabbitMQ", "SSEC"]
dynamic = ['version']
[project.urls]
Homepage = "https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream"
Repository = "https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream.git"
Documentation = "https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream/-/blob/main/README.md?ref_type=heads"
[project.scripts]
grib_processor = "grib_processor:main.main"
[tool.setuptools]
packages = ["grib_processor"]
[tool.setuptools.dynamic]
version = {attr = "grib_processor.__version__"}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment