diff --git a/grib_pipeline.py b/grib_pipeline.py index 1de80783b05ef3d497cd44c4082906ce9bb58d63..aeae1c3497a7de50da9084507001c35ac55129d5 100644 --- a/grib_pipeline.py +++ b/grib_pipeline.py @@ -2,33 +2,54 @@ Ingest grib files and publish metadata for files to RabbitMQ. """ +from __future__ import annotations + __author__ = 'Max Drexler' __email__ = 'mndrexler@wisc.edu' +import argparse from collections import defaultdict from datetime import datetime import json import os from pathlib import Path from socket import gethostname +import logging +import logging.handlers import sys import time from typing import Generator import grib2io from watchfiles import watch, Change -from ssec_amqp import AmqpClient, AmqpExchange +import ssec_amqp.api as mq from dotenv import load_dotenv if sys.version_info < (3, 8): raise SystemError('Python version too low') +# logging stuff +LOG = logging.getLogger('grib_ingest') +LOG_DIR = os.getenv('GRIB_LOG_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')) +LOG_LEVELS = [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO,logging.DEBUG] +LOG_NAME = os.path.splitext(sys.argv[0])[0] + '.log' + +# Where we're publishing data from HOSTNAME = gethostname() + +# Where we're publishing data to +DEFAULT_AMQP_SERVERS = ['mq1.ssec.wisc.edu', 'mq2.ssec.wisc.edu', 'mq3.ssec.wisc.edu'] + +# Format of the route key to publish to +ROUTE_KEY_FMT = "{xcd_model}.{xcd_model_id}.realtime.reserved.reserved.2" + +# What file extensions to parse as grib files GRIB_ENDINGS = [".grb", ".grib", ".grb2", ".grib2"] +# Table containing XCD model and model ids for grib messages # (first_lat, first_lon, rows, cols, gen_proc_id) XCD_MODELS = { ("44.196", "174.759", "237", "377", "115"): ["DGEX-AKPS", "DGEX"], @@ -401,8 +422,9 @@ def watch_filter(change: Change, path: str) -> bool: return False -def date_type_from_dt(dt: datetime) -> str: - """Format datetimes in a consistent manner. +def format_dt(dt: datetime) -> str: + """Format datetimes in a manner consistent with other AMQP publishers at + the SSEC. Args: dt (datetime): the datetime object to format. @@ -432,7 +454,7 @@ def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None] gen_proc = msg.generatingProcess xcd_info = XCD_MODELS.get((str(f_lat), str(f_lon), str(rows), str(cols), str(gen_proc.value)), ['UKNWN', 'UKNWN']) yield { - "__payload_gen_time__": date_type_from_dt(datetime.now()), + "__payload_gen_time__": format_dt(datetime.now()), "path": grib_path, "directory": os.path.dirname(grib_path), "file_name": os.path.basename(grib_path), @@ -444,7 +466,7 @@ def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None] "last_long": getattr(msg, "longitudeLastGridpoint", None), "forecast_hour": int(msg.valueOfForecastTime), "run_hour": int(msg.hour) * 100 + int(msg.minute), - "model_time": date_type_from_dt(msg.refDate), + "model_time": format_dt(msg.refDate), "projection": msg.gridDefinitionTemplateNumber.definition, "center_id": int(msg.originatingCenter.value), "center_desc": msg.originatingCenter.definition, @@ -465,34 +487,90 @@ def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None] } -def main(): +def init() -> argparse.Namespace: + """Initialize the script so it's ready to run. + """ + # Set up cmdline args + parser = argparse.ArgumentParser(description='Set paramters for ingesting and publishing of grib data.') + parser.add_argument('grib_dir', help='Directory to watch for new grib files, use -- to read file paths from stdin.') + parser.add_argument('-v', '--verbosity', type=int, default=3, help='Specify verbosity to stderr from 0 to 5.') + parser.add_argument('--watch-debounce', type=int, default=5000, help='maximum time in milliseconds to group grib file changes over before processing them.') + parser.add_argument('--log-dir', default=LOG_DIR, help='Directory to put rotating file logs into.') + parser.add_argument('--servers', nargs='+', default=DEFAULT_AMQP_SERVERS, help='Servers to publish AMQP messages to.') + parser.add_argument('--no-monitor', action='store_true', help="Toggle off the monitoring process that collects data on how the main process is running.") + + args = parser.parse_args() + + # Verify parsed arguments + if not os.path.isdir(args.grib_dir): + if args.grib_dir == '--': + raise NotImplementedError('Reading from stdin is not yet implemented ¯\_(ツ)_/¯') + else: + raise FileNotFoundError(args.grib_dir) + + if not os.path.isdir(args.log_dir): + os.mkdir(args.log_dir) + + if args.watch_debounce < 0: + raise ValueError("--watch-debounce can't be negative!") + + if args.verbosity < 0: + raise ValueError("--verbosity can't be negative!") + if not load_dotenv(): - print("Couldn't find .env file") - sys.exit(1) - - cl = AmqpClient() - cl.connect( - AmqpExchange( - "mq1.ssec.wisc.edu", os.getenv("RMQ_USER"), os.getenv("RMQ_PASS"), "model" - ) + raise SystemError("Couldn't find .env file with RabbitMQ credentials!") + + if os.getenv('RMQ_USER') is None: + raise ValueError("Specify the RabbitMQ user in your .env file with RMQ_USER!") + if os.getenv('RMQ_PASS') is None: + raise ValueError("Specify the RabbitMQ password in your .env file with RMQ_PASS!") + + + # Set up logging + log_formatter = logging.Formatter(fmt='[%(asctime)s][%(levelname)s]-%(message)s', datefmt='%Y-%m-%dT%H:%M:%S') + + file_handler = logging.handlers.TimedRotatingFileHandler(os.path.join(args.log_dir, LOG_NAME), when='D', utc=True) + file_handler.setFormatter(log_formatter) + file_handler.setLevel(logging.DEBUG) + + out_handler = logging.StreamHandler() + out_handler.setFormatter(log_formatter) + out_handler.setLevel(LOG_LEVELS[min(args.verbosity, len(LOG_LEVELS) - 1)]) + + LOG.addHandler(file_handler) + LOG.addHandler(out_handler) + LOG.setLevel(logging.DEBUG) + + return args + + +def main(args: argparse.Namespace): + LOG.info('Starting grib_pipeline') + mq.connect( + *args.servers, + user=os.environ['RMQ_USER'], + password=os.environ['RMQ_PASS'], + exchange='model' ) + LOG.info('Connected to %s', ', '.join(args.servers)) - mem = defaultdict(int) - avg_time = 0 - for changes in watch("/data/xcd/grib/", watch_filter=watch_filter, debounce=5000): + mem = defaultdict(int) # this will cause a memory leak without cleaning old data. + LOG.info('Starting watch on %s with debounce=%d', args.grib_dir, args.watch_debounce) + for changes in watch(args.grib_dir, watch_filter=watch_filter, debounce=args.watch_debounce): for _, path in changes: - start_time = time.time() i = 0 - for i, amqp in enumerate(amqp_grib(path, mem[path])): - cl.publish(amqp, route_key=f"{amqp['xcd_model']}.{amqp['xcd_model_id']}.realtime.reserved.reserved.2") - print(json.dumps(amqp, indent=4)) - mem[path] += i - print(path, mem[path]) - avg_time -= avg_time / 100 - avg_time += (time.time() - start_time) / 100 - print(avg_time) + next_msg = mem[path] + LOG.debug('Got grib file %s, already processed up to %d messages.', path, next_msg) + for i, amqp in enumerate(amqp_grib(path, next_msg)): + route_key = ROUTE_KEY_FMT.format(xcd_model=amqp['xcd_model'], xcd_model_id=amqp['xcd_model_id']) + LOG.debug('Publishing %s msg %d to %s', path, next_msg + i, route_key) + mq.publish(amqp, route_key=route_key) if __name__ == "__main__": - sys.exit(main()) - + try: + args = init() + main(args) + except KeyboardInterrupt: + LOG.info('Got interrupt, goodbye') + sys.exit(0)