diff --git a/payload.md b/docs/payload.md similarity index 97% rename from payload.md rename to docs/payload.md index 54f43ee8a575d79cd52c4ef5476283e98a106317..02cb24c42c324865cb3cf8b6ae305a7d4edaa65e 100644 --- a/payload.md +++ b/docs/payload.md @@ -46,6 +46,7 @@ This is the code that generates the payload. The payload is then converted into | Name | Type | Meaning | | :--: | :--: | :-----: | | `__payload_gen_time__` | datetime (str) | when the payload was generated. | +| `__injector_script__` | str | the user@server:path/to/script that generated the payload. | | `path` | str | fully qualified path to the grib file with this message. | | `directory` | str | directory of `path`. | | `file_name` | str | file name of `path`. | diff --git a/grib_processor b/grib_processor deleted file mode 100755 index 4b96fb219bf36bc2010d4baada74c127645503f6..0000000000000000000000000000000000000000 --- a/grib_processor +++ /dev/null @@ -1,9 +0,0 @@ -#! /bin/bash - -SOURCE="${BASH_SOURCE[0]}" -while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done -BASE="$( cd -P "$( dirname "$SOURCE" )" && pwd )" - -PYTHON=${PYTHON:-python3} - -exec $PYTHON $BASE/grib_processor.py "$@" diff --git a/grib_processor/__init__.py b/grib_processor/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8ed7d8ddddc2dfc4bd2a0429e28d905297ea2bfa --- /dev/null +++ b/grib_processor/__init__.py @@ -0,0 +1,23 @@ +""" +grib_processor +~~~~~~~~~~~~~~ + +Ingest grib files and publish metadata for files to RabbitMQ. +""" + +from grib_processor.grib import GribMessage, itergrib +from grib_processor.main import dump_message, publish_message +from grib_processor.utils import grib_file_watch + + +__author__ = "Max Drexler" +__email__ = "mndrexler@wisc.edu" +__version__ = '0.0.1' + +__all__ = [ + 'GribMessage', + 'itergrib', + 'dump_message', + 'publish_message', + 'grib_file_watch', +] diff --git a/grib_processor/__main__.py b/grib_processor/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..117c77974b4829028437db09381a9b42f2622902 --- /dev/null +++ b/grib_processor/__main__.py @@ -0,0 +1,14 @@ +""" +grib_processor.__main__ +~~~~~~~~~~~~~~~~~~~~~~~ + +Entrypoint to grib processing when using python -m grib_processor. +""" + +import sys + +from grib_processor.main import main + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/grib_processor.py b/grib_processor/data/xcd_model_info.json similarity index 66% rename from grib_processor.py rename to grib_processor/data/xcd_model_info.json index d8f8935bf35d232838c857b55b34794cd0e1271a..cfcc3c26c319b62c277ee4ab42fd78db1a69033d 100644 --- a/grib_processor.py +++ b/grib_processor/data/xcd_model_info.json @@ -1,71 +1,4 @@ -""" -Ingest grib files and publish metadata for files to RabbitMQ. -""" - -from __future__ import annotations - -__author__ = "Max Drexler" -__email__ = "mndrexler@wisc.edu" - - -import argparse -from collections import defaultdict -from datetime import datetime -import os -import logging -import logging.handlers -import sys -import time -import threading -from typing import DefaultDict, Generator - -import grib2io -from watchfiles import watch, Change -import ssec_amqp.api as mq -import ssec_amqp.utils as amqp_utils -from dotenv import load_dotenv - - -if sys.version_info < (3, 8): - raise SystemError("Python version too low") - -# logging stuff -LOG = logging.getLogger("grib_ingest") -LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") -LOG_LEVELS = [ - logging.CRITICAL, - logging.ERROR, - logging.WARNING, - logging.INFO, - logging.DEBUG, -] -LOG_NAME = os.path.splitext(os.path.basename(sys.argv[0]))[0] + ".log" - -# Where we're publishing data to -DEFAULT_AMQP_SERVERS = ["mq1.ssec.wisc.edu", "mq2.ssec.wisc.edu", "mq3.ssec.wisc.edu"] - -# Where on the servers we're putting the data. -DEFAULT_AMQP_EXCHANGE = "model" - -# RabbitMQ default login details (shouldn't be authenticated to publish). -DEFAULT_AMQP_USER = "guest" -DEFAULT_AMQP_PASS = "guest" - -# Format of the route key to publish to -ROUTE_KEY_FMT = "{xcd_model}.{xcd_model_id}.realtime.reserved.reserved.2" - -# What file extensions to parse as grib files -GRIB_ENDINGS = [".grb", ".grib", ".grb2", ".grib2"] - -# How long grib files stay in the message cache. -CACHE_TTL = 43200 # (12 hours) - -# How often to clean the cache of previous messages. -CACHE_CLEAN_INTERVAL = 5400 # (1.5 hours) - -# Table containing XCD model and model ids for grib messages -# [first_lat][first_lon][rows][cols][gen_proc_id] -XCD_MODELS = { +{ "44.196": {"174.759": {"237": {"377": {"115": ["DGEX-AKPS", "DGEX"]}}}}, "19.943": {"234.907": {"303": {"491": {"115": ["DGEX-USLC", "DGEX"]}}}}, "0.0": { @@ -793,351 +726,4 @@ XCD_MODELS = { "53.82": {"193.25": {"300": {"296": {"15": ["NWPS-AKMEUNK", "NWPS"]}}}}, "57.8": {"200.8": {"296": {"296": {"15": ["NWPS-AKMEUN2", "NWPS"]}}}}, "37.979669": {"234.042695": {"795": {"709": {"118": ["URMA-USLCDR2", "URMA"]}}}}, -} - - -def xcd_lookup( - first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int -) -> tuple[str, str]: - """Looks up xcd model names and ids based on grib message properties. - - Args: - first_lat (float): The first latitude of the grib message. - first_lon (float): The first longitude of the grib message. - rows (int): The number of rows in the grib message. - cols (int): The number of columns in the grib message. - gen_proc_id (int): The generating process id of the grib message. - - Returns: - tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN") - if can't find model info based on given properties. - """ - try: - return tuple( - XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][ - str(gen_proc_id) - ] - ) - except KeyError: - return ("UNKWN", "UNKWN") - - -# argparse types -def non_negative_int(_arg: str) -> int: - i = int(_arg) - if i < 0: - raise ValueError - return i - - -def positive_int(_arg: str) -> int: - i = int(_arg) - if i <= 0: - raise ValueError - return i - - -def setup() -> argparse.Namespace: - """Initialize the script so it's ready to run.""" - # Set up cmdline args - parser = argparse.ArgumentParser( - description="Set paramters for ingesting and publishing of grib data." - ) - parser.add_argument( - "grib_dir", - help="Directory to watch for new grib files, use -- to read file paths from stdin.", - ) - parser.add_argument( - "-v", - "--verbosity", - type=non_negative_int, - default=3, - help="Specify verbosity to stderr from 0 (CRITICAL) to 5 (DEBUG).", - ) - parser.add_argument( - "--watch-debounce", - type=non_negative_int, - default=5000, - help="maximum time in milliseconds to group grib file changes over before processing them.", - ) - parser.add_argument( - "--log-dir", default=LOG_DIR, help="Directory to put rotating file logs into." - ) - parser.add_argument( - "--testing", - action="store_true", - help='Prepends route key with "null.".', - ) - - ch_group = parser.add_argument_group( - "cache arguments", - "Specify how the cache will behave (the defaults work best for standard usage).", - ) - ch_group.add_argument( - "--no-cache", - action="store_true", - help="Turn the cache off. Could be useful if watched files are only written to once.", - ) - ch_group.add_argument( - "--clean-interval", - metavar="SECONDS", - type=positive_int, - default=CACHE_CLEAN_INTERVAL, - help="How often to clean the message cache.", - ) - ch_group.add_argument( - "--cache-ttl", - metavar="SECONDS", - type=positive_int, - default=CACHE_TTL, - help="How long paths stay in the cache for.", - ) - - mq_group = parser.add_argument_group( - "RabbitMQ arguments", "arguments to specify how to publish the data." - ) - mq_group.add_argument( - "--servers", - nargs="+", - default=DEFAULT_AMQP_SERVERS, - help="Servers to publish AMQP messages to. Defaults to %(default)s", - ) - mq_group.add_argument( - "-X", - "--exchange", - default=DEFAULT_AMQP_EXCHANGE, - help='RabbitMQ exchange to publish the messages to. Defaults to "%(default)s"', - ) - mq_group.add_argument( - "-u", - "--user", - default=None, - help="RabbitMQ user to login with. Can also specify using a .env file, see README#configuration.", - ) - mq_group.add_argument( - "-p", - "--password", - default=None, - help="RabbitMQ password to login with. Can also specify using a .env file, see README#configuration.", - ) - - args = parser.parse_args() - - # Verify parsed arguments - if not os.path.isdir(args.grib_dir): - if args.grib_dir == "--": - raise NotImplementedError( - "Reading from stdin is not yet implemented ¯\_(ツ)_/¯" - ) - else: - parser.error("{0} is not a valid directory!".format(args.grib_dir)) - - if args.no_cache: - raise NotImplementedError( - "Turning the cache off is not yet implemented ¯\_(ツ)_/¯" - ) - - if not os.path.isdir(args.log_dir): - os.mkdir(args.log_dir) - - if args.testing: - global ROUTE_KEY_FMT - ROUTE_KEY_FMT = "null.{xcd_model}.{xcd_model_id}.realtime.reserved.2" - - # Set up logging - log_formatter = logging.Formatter( - fmt="[%(asctime)s][%(levelname)s]-%(message)s", datefmt="%Y-%m-%dT%H:%M:%S" - ) - - file_handler = logging.handlers.TimedRotatingFileHandler( - os.path.join(args.log_dir, LOG_NAME), when="D", utc=True - ) - file_handler.setFormatter(log_formatter) - file_handler.setLevel(logging.DEBUG) - - out_handler = logging.StreamHandler() - out_handler.setFormatter(log_formatter) - out_handler.setLevel(LOG_LEVELS[min(args.verbosity, len(LOG_LEVELS) - 1)]) - - LOG.addHandler(file_handler) - LOG.addHandler(out_handler) - LOG.setLevel(logging.DEBUG) - - return args - - -def _cache_entry() -> tuple[float, int]: - """default cache entry (current time, first grib message).""" - return time.time(), 0 - - -def _cache_cleaner( - cache: DefaultDict[str, tuple[float, int]], interval: float, ttl: float -) -> None: - """Called from another thread, continually cleans cache in order to - prevent memory leaks. - """ - while True: - to_remove = [] - LOG.debug("Cleaning the message cache, size=%d!", len(cache)) - for path, (last_update_time, last_msg_num) in cache.items(): - time_in_cache = time.time() - last_update_time - if time_in_cache > ttl: - LOG.info( - "Evicted %s from cache! (last message: %d, time since last update: %f).", - path, - last_msg_num - 1, - time_in_cache, - ) - to_remove.append(path) - if not to_remove: - LOG.debug("Nothing to remove from cache...") - else: - for rem_path in to_remove: - cache.pop(rem_path, None) - # LOG.debug('Cleaning grib2io caches') - # grib2io._grib2io._msg_class_store.clear() - # grib2io._grib2io._latlon_datastore.clear() - time.sleep(interval) - - -def watch_filter(change: Change, path: str) -> bool: - """Make sure files we ingest are grib files.""" - if change == Change.deleted: - return False - if os.path.splitext(path)[1] in GRIB_ENDINGS: - return True - return False - - -def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None]: - """Generate AMQP payloads from a grib file, starting at a specific message. - - Args: - grib_path (str): The path to the grib file to create payloads for. - start_message (int): Which grib message to start yielding payloads for. - - Yields: - JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file. - """ - with grib2io.open(grib_path) as grib_file: - for msg in grib_file[start_message:]: - f_lat = getattr(msg, "latitudeFirstGridpoint", None) - f_lon = getattr(msg, "longitudeFirstGridpoint", None) - rows = msg.ny - cols = msg.nx - gen_proc = msg.generatingProcess - xcd_info = xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value) - yield { - "__payload_gen_time__": amqp_utils.format_datetime(datetime.now()), - "__injector_script__": amqp_utils.INJECTOR_SCRIPT, - "path": grib_path, - "directory": os.path.dirname(grib_path), - "file_name": os.path.basename(grib_path), - "server_ip": amqp_utils.SERVER_NAME, - "server_type": "realtime", - "first_lat": f_lat, - "last_lat": getattr(msg, "latitudeLastGridpoint", None), - "first_lon": f_lon, - "last_long": getattr(msg, "longitudeLastGridpoint", None), - "forecast_hour": int(msg.valueOfForecastTime), - "run_hour": int(msg.hour) * 100 + int(msg.minute), - "model_time": amqp_utils.format_datetime(msg.refDate), - "start_time": amqp_utils.format_datetime(msg.refDate), - "projection": msg.gridDefinitionTemplateNumber.definition, - "center_id": int(msg.originatingCenter.value), - "center_desc": msg.originatingCenter.definition, - "level": msg.level, - "parameter": msg.shortName, - "param_long": msg.fullName, - "param_units": msg.units, - "grib_number": 2, - "size": "%d x %d" % (rows, cols), - "ensemble": getattr(msg, "pertubationNumber", None), - "model_id": int(gen_proc.value), - "model": gen_proc.definition, - "xcd_model": xcd_info[1], - "xcd_model_id": xcd_info[0], - # "resolution": msg._xydivisor, THIS IS NOT CORRECT! - "grib_record_number": msg._msgnum, - "title": str(msg), - } - - -def gribs_from_dir( - watch_dir: str, cache: DefaultDict[str, tuple[float, int]], watch_debounce: int -) -> None: - """Process and publish grib files by watching a directory. - - Args: - watch_dir (str): The directory to watch for grib files. - cache (DefaultDict[str, tuple[int, float]]): The cache to use to store grib messages in (if messages arrive in chunks). - watch_debounce (int): debounce for watch. - """ - LOG.info("Watching %s for gribs with debounce=%d", watch_dir, watch_debounce) - for changes in watch( - watch_dir, - watch_filter=watch_filter, - debounce=watch_debounce, - ): - for _, path in changes: - next_msg = cache[path][1] - LOG.debug("Got grib file %s (next msg to process: %d).", path, next_msg) - msg_num = 0 # declare here to avoid unbound errors. - try: - for msg_num, amqp in enumerate(amqp_grib(path, next_msg)): - route_key = ROUTE_KEY_FMT.format( - xcd_model=amqp["xcd_model"], xcd_model_id=amqp["xcd_model_id"] - ) - LOG.debug( - "Publishing %s msg %d to %s", - path, - next_msg + msg_num, - route_key, - ) - mq.publish(amqp, route_key=route_key) - except (ValueError, KeyError, OSError): - LOG.exception("Error parsing grib file %s!", path) - else: - cache[path] = time.time(), next_msg + msg_num + 1 - - -def main() -> None: - """Main method, setup and start processing.""" - try: - args = setup() - - if not load_dotenv(): - LOG.warning("Couldn't find .env file") - - mq.connect( - *args.servers, - user=args.user or os.getenv("RMQ_USER", DEFAULT_AMQP_USER), - password=args.password or os.getenv("RMQ_PASS", DEFAULT_AMQP_PASS), - exchange=args.exchange, - ) - LOG.info("Connected to %s", ", ".join(args.servers)) - - # maybe add loading/storing cache to disk after exits? - cache: DefaultDict[str, tuple[float, int]] = defaultdict(_cache_entry) - LOG.debug("Starting cache cleaner") - threading.Thread( - name="grib_pipeline_cleaner", - target=_cache_cleaner, - args=(cache, args.clean_interval, args.cache_ttl), - daemon=True, - ).start() - except KeyboardInterrupt: - mq.disconnect() - return - - try: - gribs_from_dir(args.grib_dir, cache, args.watch_debounce) - except KeyboardInterrupt: - LOG.critical("Got interrupt, goodbye!") - mq.disconnect() - return - - -if __name__ == "__main__": - sys.exit(main()) +} \ No newline at end of file diff --git a/grib_processor/grib.py b/grib_processor/grib.py new file mode 100644 index 0000000000000000000000000000000000000000..de3dc582ce6a98a7bc619e15d40fe24d932e78d2 --- /dev/null +++ b/grib_processor/grib.py @@ -0,0 +1,160 @@ +""" +grib_processor.amqp +~~~~~~~~~~~~~~~~~~~ + +How the processor generates grib messages. +""" + +from __future__ import annotations + +from datetime import datetime +import os +from typing import Generator + +import grib2io +from ssec_amqp import utils as amqp_utils +from typing_extensions import Literal, TypedDict + + +# Contains a serializable mapping of first_lat, first_lon, rows, cols, +# and generating_process_ids to xcd model names and ids. +# +# format: + +# dict[first_lat, +# dict[first_lon, +# dict[rows, +# dict[cols, +# dict[generating_process_id, list[str]] +# ] +# ] +# ] +# ] +# Loaded using load_xcd_models() +_XCD_MODELS = None + + +class GribMessage(TypedDict): + """Amqp payload for each grib message. + """ + __payload_gen_time__: datetime + __injector_script__: str + path: str + directory: str + file_name: str + server_ip: str + server_type: Literal['realtime', 'archive'] + first_lat: float | None + first_lon: float | None + last_lat: float | None + last_lon: float | None + forecast_hour: int + run_hour: int + model_time: datetime + start_time: datetime + projection: str + center_id: int + center_desc: str + level: str + parameter: str + param_long: str + param_units: str + grib_number: Literal[1, 2] + size: str + ensemble: int | None + model_id: int + model: str + xcd_model: str + xcd_model_id: str + grib_record_number: int + title: str + + + +def load_xcd_models(*addtnl_models: tuple[float, float, int, int, int, str, str]) -> None: + """Load the xcd models from the package. + """ + pass + + +def xcd_lookup( + first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int +) -> tuple[str, str]: + """Looks up xcd model names and ids based on grib message properties. + + Args: + first_lat (float): The first latitude of the grib message. + first_lon (float): The first longitude of the grib message. + rows (int): The number of rows in the grib message. + cols (int): The number of columns in the grib message. + gen_proc_id (int): The generating process id of the grib message. + + Returns: + tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN") + if can't find model info based on given properties. + """ + if _XCD_MODELS is None: + return ("UNKWN", "UNKWN") + try: + return tuple( + _XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][ + str(gen_proc_id) + ] + ) + except KeyError: + return ("UNKWN", "UNKWN") + + +def itergrib(grib_path: str, start_message: int | None = None) -> Generator[GribMessage, None, None]: + """Generate AMQP payloads from a grib file, starting at a specific message. + + Args: + grib_path (str): The path to the grib file to create payloads for. + start_message (int): Which grib message to start yielding payloads for. + + Yields: + JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file. + """ + start_message = start_message or 0 + with grib2io.open(grib_path) as grib_file: + for msg in grib_file[start_message:]: + f_lat = getattr(msg, "latitudeFirstGridpoint", None) + f_lon = getattr(msg, "longitudeFirstGridpoint", None) + rows = msg.ny + cols = msg.nx + gen_proc = msg.generatingProcess + xcd_info = xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value) + yield GribMessage( + __payload_gen_time__=amqp_utils.format_datetime(datetime.now()), + __injector_script__=amqp_utils.INJECTOR_SCRIPT, + path=grib_path, + directory=os.path.dirname(grib_path), + file_name=os.path.basename(grib_path), + server_ip=amqp_utils.SERVER_NAME, + server_type="realtime", + first_lat= f_lat, + last_lat=getattr(msg, "latitudeLastGridpoint", None), + first_lon=f_lon, + last_long=getattr(msg, "longitudeLastGridpoint", None), + forecast_hour=int(msg.valueOfForecastTime), + run_hour=int(msg.hour) * 100 + int(msg.minute), + model_time=amqp_utils.format_datetime(msg.refDate), + start_time=amqp_utils.format_datetime(msg.refDate), + projection=msg.gridDefinitionTemplateNumber.definition, + center_id=int(msg.originatingCenter.value), + center_desc=msg.originatingCenter.definition, + level=msg.level, + parameter=msg.shortName, + param_long=msg.fullName, + param_units=msg.units, + grib_number=2, + size="%d x %d" % (rows, cols), + ensemble=getattr(msg, "pertubationNumber", None), + model_id=int(gen_proc.value), + model=gen_proc.definition, + xcd_model=xcd_info[1], + xcd_model_id=xcd_info[0], + #resolution=msg._xydivisor, THIS IS NOT CORRECT! + grib_record_number=msg._msgnum, + title=str(msg), + ) \ No newline at end of file diff --git a/grib_processor/main.py b/grib_processor/main.py new file mode 100644 index 0000000000000000000000000000000000000000..75923e4ec8528e53d18c58b83a58850623d93827 --- /dev/null +++ b/grib_processor/main.py @@ -0,0 +1,214 @@ +""" +grib_processor.main +~~~~~~~~~~~~~~~~~~~ + +Unified entrypoint to the grib processor. +""" + +from __future__ import annotations + + +import argparse +from itertools import chain +import json +import logging +import logging.handlers +import os +import signal +import sys +from typing import Callable, Generator +import warnings + +from dotenv import load_dotenv +import ssec_amqp.api as mq + +from grib_processor.grib import GribMessage, itergrib +from grib_processor.utils import dump_message, grib_file_watch, publish_message, realtime, DEFAULT_WATCH_DEBOUNCE, REALTIME_WATCH_DEBOUNCE, signalcontext + + +# Logging stuff +LOG = logging.getLogger('grib_ingest') +LOG_LEVELS = [ + logging.CRITICAL, + logging.ERROR, + logging.WARNING, + logging.INFO, + logging.DEBUG, +] +LOG_NAME = 'grib_processor.log' + +# Where we're publishing data to +DEFAULT_AMQP_SERVERS = ["mq1.ssec.wisc.edu", "mq2.ssec.wisc.edu", "mq3.ssec.wisc.edu"] + +# Where on the servers we're putting the data. +DEFAULT_AMQP_EXCHANGE = "model" + + +def initialize_logging(verbosity: int, rotating_dir: str | None = None) -> None: + """Set up logging for the package. Don't use basicConfig so we don't + override other's logs. + + Optionally, can specify an existing directory to put rotating log files into. + """ + # Set up logging + log_formatter = logging.Formatter( + fmt="[%(asctime)s][%(levelname)s]-%(message)s", datefmt="%Y-%m-%dT%H:%M:%S" + ) + + if rotating_dir is not None: + file_handler = logging.handlers.TimedRotatingFileHandler( + os.path.join(rotating_dir, LOG_NAME), when="D", utc=True + ) + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.DEBUG) + LOG.addHandler(file_handler) + + out_handler = logging.StreamHandler() + out_handler.setFormatter(log_formatter) + out_handler.setLevel(LOG_LEVELS[min(verbosity, len(LOG_LEVELS) - 1)]) + LOG.addHandler(out_handler) + + LOG.setLevel(logging.DEBUG) + + +def setup() -> tuple[Generator[GribMessage, None, None], Callable[[GribMessage], None]]: + """Initialize the script so it's ready to run.""" + # Set up cmdline args + parser = argparse.ArgumentParser( + description="Set paramters for ingesting and publishing of grib data." + ) + parser.add_argument( + "grib_src", + metavar='grib_source', + help="Directory to watch for new grib files, file to read grib paths from, or '--' to read file paths from stdin.", + ) + parser.add_argument( + "-v", + "--verbosity", + type=int, + default=3, + help="Specify verbosity to stderr from 0 (CRITICAL) to 5 (DEBUG).", + ) + parser.add_argument( + "--log-dir", default=None, help="Set up a rotating file logger in this directory." + ) + + parser.add_argument( + '-R', '--realtime', + action='store_true', + help='Turns on parameters for more efficient realtime processing like caching and debounce. Also adds more robust error handling.' + ) + parser.add_argument( + '-r', '--recursive', + action='store_true', + help='When using a directory as the source, process all files in the directory recursively instead of setting up a watch.' + ) + + parser.add_argument( + '-o', '--output', + choices=['amqp', 'json'], + default='amqp', + help='Where to output processed grib messages to. Default is %(default)s.' + ) + + mq_group = parser.add_argument_group( + "RabbitMQ arguments", "Options for how to output grib messages when using '-o=amqp'." + ) + mq_group.add_argument( + "--servers", + nargs="+", + default=DEFAULT_AMQP_SERVERS, + help="Servers to publish AMQP messages to. Defaults to %(default)s", + ) + mq_group.add_argument( + "-X", + "--exchange", + default=DEFAULT_AMQP_EXCHANGE, + help='RabbitMQ exchange to publish the messages to. Defaults to "%(default)s"', + ) + mq_group.add_argument( + "-u", + "--user", + default=None, + help="RabbitMQ user to login with. Can also specify using a .env file, see README#configuration.", + ) + mq_group.add_argument( + "-p", + "--password", + default=None, + help="RabbitMQ password to login with. Can also specify using a .env file, see README#configuration.", + ) + + args = parser.parse_args() + + if args.verbosity < 0: + parser.error("--verbosity cannot be negative!") + initialize_logging(args.verbosity, args.log_dir) + + # Get an iterator over grib files to process + if os.path.isdir(args.grib_src): + LOG.info("Sourcing grib files from directory watch @ %s realtime=%s", args.grib_src, args.realtime) + if not args.realtime: + warnings.warn('Processing files from a directory without using --realtime!') + debounce = DEFAULT_WATCH_DEBOUNCE + else: + debounce = REALTIME_WATCH_DEBOUNCE + + file_iter = grib_file_watch(args.grib_src, debounce=debounce, recursive=args.recursive) + elif os.path.isfile(args.grib_src): + LOG.info("Sourcing grib file directly from CLI, %s", args.grib_src) + file_iter = (args.grib_src,) + elif args.grib_src == "--": + LOG.info("Sourcing grib files from stdin, realtime=%s", args.realtime) + file_iter = map(str.strip, iter(sys.stdin.readline, '')) + else: + parser.error("{0} is not a valid source!".format(args.grib_src)) + + # Get an iterator of grib messages to emit + if args.realtime: + source = realtime(file_iter) + else: + source = chain.from_iterable(map(itergrib, file_iter)) + + # Possible to source connection parameters from the environment + if not load_dotenv(): + LOG.warning("Couldn't find .env file") + + if args.output == 'amqp': + LOG.info("Emitting messages to amqp") + # will automatically disconnect on exit + mq.connect( + *args.servers, + user=args.user or os.getenv("RMQ_USER", None), + password=args.password or os.getenv("RMQ_PASS", None), + exchange=args.exchange, + ) + LOG.info("Connected to %s", ", ".join(args.servers)) + emit = publish_message + else: + LOG.info("Emitting messages to stdout") + emit = dump_message + + return source, emit + + +def __raise_interrupt(_sig, _frame): # noqa: ARG001 + """Signal handler that raises KeybardInterrupt.""" + raise KeyboardInterrupt + + +def main() -> None: + """Main method, setup and start processing.""" + try: + source, emit = setup() + except KeyboardInterrupt: + return + + with signalcontext(signal.SIGTERM, __raise_interrupt): + try: + for grib_msg in source: + LOG.debug('Got %s from source', grib_msg) + emit(grib_msg) + except KeyboardInterrupt: + LOG.critical("Got interrupt, goodbye!") + return diff --git a/grib_processor/utils.py b/grib_processor/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..54faacf7b7d16075d335999b35c9ac21b2582441 --- /dev/null +++ b/grib_processor/utils.py @@ -0,0 +1,236 @@ +""" +grib_processor.sources +~~~~~~~~~~~~~~~~~~~~~~ + +The different ways to source grib files for processing. +""" + +from __future__ import annotations + +import atexit +from collections import defaultdict +from contextlib import contextmanager +import json +import logging +import os +import signal +import time +from typing import Callable, DefaultDict, Generator, Iterator, Tuple + +from watchfiles import watch, Change +from typing_extensions import TypeAlias + +from grib_processor.grib import GribMessage, itergrib + + +# Our logger +LOG = logging.getLogger('grib_ingest') + +# How long grib files stay in the message cache. +CACHE_TTL = 43200 # (12 hours) + +# How often to clean the cache of previous messages. +CACHE_CLEAN_INTERVAL = 5400 # (1.5 hours) + +# What file extensions to parse as grib files +GRIB_ENDINGS = [".grb", ".grib", ".grb2", ".grib2"] + +# Debounce is how long to group grib file changes over before processing them + +# watchfiles default debounce +DEFAULT_WATCH_DEBOUNCE = 1600 + +# debounce when doing realtime processing +REALTIME_WATCH_DEBOUNCE = 4500 + +# Format of the route key to messages publish with +GRIB_ROUTE_KEY_FMT = "{xcd_model}.{xcd_model_id}.realtime.reserved.reserved.2" + +# Cache path +REALTIME_CACHE_PATH = os.path.join(os.path.expanduser('~/.cache/'), 'grib_processor_cache.json') + +# Types used for realtime caching +_CEntry: TypeAlias = Tuple[float, int] +_Cache: TypeAlias = DefaultDict[str, _CEntry] + + +@contextmanager +def signalcontext(signum: int, handler: Callable): + """Context manager that changes a signal handler on entry and resets it + on exit. + + Args: + signum (int): Signal to change. + handler (tp.Callable): New signal handler to use. + """ + try: + orig_h = signal.signal(signum, handler) + yield + finally: + signal.signal(signum, orig_h) + + +def _default_cache_entry() -> _CEntry: + """default cache entry (current time, first grib message).""" + return time.time(), 0 + + +def _initialize_cache() -> _Cache: + """Sets up the realtime cache to use. Potentially loading previously saved + caches. + + Returns: + _Cache: Cache of file names to last grib message processed. + """ + LOG.info("Loading cache from disk") + try: + with open(REALTIME_CACHE_PATH, 'r', encoding='utf-8') as cache_file: + cache_data = json.load(cache_file) + LOG.info("realtime - cache loaded from disk at %s", REALTIME_CACHE_PATH) + except (OSError, json.JSONDecodeError): + LOG.exception("Error loading previous cache data from %s", REALTIME_CACHE_PATH) + cache_data = {} + + return defaultdict(_default_cache_entry, cache_data) + + +def _save_cache(cache: _Cache) -> None: + """Saves a cache to disk at the path REALTIME_CACHE_PATH. + + Args: + cache (_Cache): The cache to save to disk. + """ + LOG.info("realtime - saving cache to disk") + if not cache: + LOG.debug("realtime - cache is empty") + return + try: + with open(REALTIME_CACHE_PATH, 'w', encoding='utf-8') as cache_file: + json.dump(cache, cache_file) + LOG.info("realtime - cache saved to disk at %s", REALTIME_CACHE_PATH) + except (OSError, ValueError): + LOG.exception("Couldn't save cache to %s", REALTIME_CACHE_PATH) + + +def _clean_caches(realtime_cache: _Cache) -> None: + """Removes expired items from the cache. Also, cleans caches + used by 3rd party libraries (grib2io) that could cause memory leaks + in long-running processing. + + Args: + cache (_Cache): The realtime cache to clean. + """ + + to_remove = [] + LOG.debug("realtime - Cleaning the cache, size=%d!", len(realtime_cache)) + for path, (last_update_time, last_msg_num) in realtime_cache.items(): + time_in_cache = time.time() - last_update_time + if time_in_cache > CACHE_TTL: + LOG.info( + "realtime - Evicted %s from cache! (last message: %d, time since last update: %f).", + path, + last_msg_num - 1, + time_in_cache, + ) + to_remove.append(path) + if not to_remove: + LOG.debug("realtime - Nothing to remove from cache...") + else: + for rem_path in to_remove: + realtime_cache.pop(rem_path, None) + # LOG.debug('Cleaning grib2io caches') + # grib2io._grib2io._msg_class_store.clear() + # grib2io._grib2io._latlon_datastore.clear() + + +def realtime(file_iter: Iterator[str]) -> Generator[GribMessage, None, None]: + """Add message caching and exception handling to a grib file iterator. This + is useful if the iterator returns files that aren't fully written to or + duplicates. + + Args: + file_iter (Iterator[str]): An iterator returning paths to grib files. + + Yields: + Generator[GribMessage, None, None]: A GribMessage generator. + """ + cache = _initialize_cache() + _clean_caches(cache) + last_clean_time = time.time() + atexit.register(_save_cache, cache) + + for file in file_iter: + LOG.info('realtime - got file %s', file) + next_msg = cache.get(file, (None, None))[1] + if next_msg is None: + LOG.debug("realtime - cache miss %s", file) + cache[file] = _default_cache_entry() + next_msg = 0 + else: + LOG.debug('realtime - cache hit %s @ %d', file, next_msg) + msg_num = None + try: + for msg_num, msg in enumerate(itergrib(file, start_message=next_msg), next_msg): + LOG.debug('realtime - got msg %s:%d', file, msg_num) + yield msg + except (KeyError, OSError): + LOG.exception("realtime - Error processing grib file %s @ msg %d", file, msg_num or 0) + + if msg_num is not None: # msg_num is None when exception or end of grib file + cache[file] = time.time(), msg_num + 1 + + if time.time() - last_clean_time > CACHE_CLEAN_INTERVAL: + LOG.info('realtime - time to clean the cache') + _clean_caches(cache) + + +def grib_filter(change: Change, path: str) -> bool: + """Make sure files we ingest are grib files.""" + if change == Change.deleted: + # Can't process a deleted file + return False + if os.path.splitext(path)[1] in GRIB_ENDINGS: + return True + return False + + +def grib_file_watch( + watch_dir: str, debounce: int | None = None, recursive=False +) -> Generator[str, None, None]: + """Get grib files by watching a directory for changes. + + Args: + watch_dir (str): The directory to watch for grib files. + debounce (int | None): How long to wait to group file changes together before yielding. + recursive (bool): Watch directories under watch_dir recursively? + + Returns: + Generator[str, None, None]: A grib file path generator. + """ + debounce = debounce or DEFAULT_WATCH_DEBOUNCE + LOG.info("Watching directory %s for gribs with debounce=%d, recursive=%s", watch_dir, debounce, recursive) + for changes in watch( + watch_dir, + watch_filter=grib_filter, + debounce=debounce, + recursive=recursive, + ): + for _, path in changes: + yield path + + +def publish_message(msg: GribMessage) -> None: + """Publish a message to RabbitMQ servers using the ssec_amqp package. + This requires previous setup using ssec_amqp.api.connect(). + """ + route_key = GRIB_ROUTE_KEY_FMT.format(xcd_model=msg.get('xcd_model'), xcd_model_id=msg.get('xcd_model_id')) + status = mq.publish(msg, route_key=route_key) + LOG.info('Published to %s. status: %s', route_key, status) + + +def dump_message(msg: GribMessage) -> None: + """Print a message to stdout. + """ + print(json.dumps(msg), flush=True) + + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..8aa05c443476580026ab2d9a39bd34a4d4d4bd89 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["setuptools >= 61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "grib_processor" +dependencies = [ + "grib2io >= 2.2.0", + "watchfiles >= 0.20.0", + "python-dotenv >= 1.0.0", + "quickmq >= 1.1.0", + "typing-extensions", +] +requires-python = ">=3.8" +authors = [ + {name = "Max Drexler", email="mndrexler@wisc.edu"} +] +description = "Ingest and publish GRIB metadata to RabbitMQ servers. Made for the SSEC." +readme = "README.md" +license = { file = "LICENSE" } +keywords = ["GRIB", "inventory", "SDS", "RabbitMQ", "SSEC"] +dynamic = ['version'] + +[project.urls] +Homepage = "https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream" +Repository = "https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream.git" +Documentation = "https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream/-/blob/main/README.md?ref_type=heads" + +[project.scripts] +grib_processor = "grib_processor:main.main" + +[tool.setuptools] +packages = ["grib_processor"] + +[tool.setuptools.dynamic] +version = {attr = "grib_processor.__version__"}