Skip to content
Snippets Groups Projects
Commit 2d21f872 authored by Max Drexler's avatar Max Drexler
Browse files

Merge branch 'packaging' into 'main'

Switch to package setup

See merge request !2
parents d294ab33 d219b8f3
No related branches found
No related tags found
1 merge request!2Switch to package setup
repos:
- repo: 'https://github.com/pre-commit/pre-commit-hooks'
rev: v4.6.0
hooks:
- id: check-yaml
- id: debug-statements
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: 'https://github.com/pre-commit/mirrors-mypy'
rev: v1.10.0
hooks:
- id: mypy
- repo: 'https://github.com/astral-sh/ruff-pre-commit'
rev: v0.5.2
hooks:
- id: ruff
args:
- '--fix'
- id: ruff-format
......@@ -2,16 +2,62 @@
How to set up your dev environment to work on the grib processor.
First, create a virtual environment like in the [readme](/README.md#setup).
## Goals
Then, install ruff and mypy.
The main goal of this package is to provide a realtime event processor for the SDS that publishes metadata about incoming grib files. As this package is made for the SDS, final say on creative decision goes to the SDS managers.
Before pushing commits run:
The main goal can be accomplished using the `--realtime` CLI option.
`ruff format grib_processor.py`
Additional (secondary) goals include:
`ruff check grib_processor.py`
* Providing different ways to ingest grib files; stdin, specifying one file, etc.
* Providing a python interface to the event processor's components.
* Providing an interface to extract metadata from grib files.
`mypy grib_processor.py`
## Setup
>Note: mypy will give an error about untyped imports, this can be ignored.
First, clone the repo locally.
```bash
git clone https://gitlab.ssec.wisc.edu/mdrexler/grib_rmq_stream.git
cd grib_rmq_stream
```
Next, install the package from source and create a virtual environment like in the [readme](/README.md#setup).
Then, install the dev dependencies.
```bash
python -m pip install -r requirements_dev.txt
```
Finally, install the package in editable mode.
```bash
python -m pip install -e .
```
### Pre-commit
[pre-commit](https://pre-commit.com/) can be used to automatically format/lint/type check any new commits you make. It is recommended, but not necessary as the CI will also check that for you.
```bash
pre-commit install
pre-commit run --all-files
```
## Testing
When adding a new feature, please add corresponding unit tests under [tests/](/tests/).
Tests can be run using pytest.
```bash
pytest tests/
```
To get the code coverage use the coverage command.
```bash
coverage run -m pytest
```
......@@ -4,4 +4,4 @@ Permission is hereby granted, free of charge, to any person obtaining a copy of
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
......@@ -46,6 +46,7 @@ This is the code that generates the payload. The payload is then converted into
| Name | Type | Meaning |
| :--: | :--: | :-----: |
| `__payload_gen_time__` | datetime (str) | when the payload was generated. |
| `__injector_script__` | str | the user@server:path/to/script that generated the payload. |
| `path` | str | fully qualified path to the grib file with this message. |
| `directory` | str | directory of `path`. |
| `file_name` | str | file name of `path`. |
......@@ -64,7 +65,7 @@ This is the code that generates the payload. The payload is then converted into
| `center_desc` | str | name of the generating center. |
| `level` | str | level of the grid message. |
| `parameter` | str | short name of the grid message parameter. |
| `param_long` | str | long name of the grid message paramter. |
| `param_long` | str | long name of the grid message paramter. |
| `param_units` | str | units of the grid message parameter. |
| `grib_number` | int literal | grid edition number, e.g. 1 or 2. |
| `size` | str | number of rows in cols in grid message formatted as "{rows} x {cols}". |
......@@ -74,4 +75,4 @@ This is the code that generates the payload. The payload is then converted into
| `xcd_model` | str | name of the xcd grib model. |
| `xcd_model_id` | str | name of the xcd grib model id. |
| `grib_record_number` | int | the index of this grid message in the file. |
| `title` | str | the wgrib2 formatted string for this grid message. |
\ No newline at end of file
| `title` | str | the wgrib2 formatted string for this grid message. |
#! /bin/bash
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
BASE="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
PYTHON=${PYTHON:-python3}
exec $PYTHON $BASE/grib_processor.py "$@"
This diff is collapsed.
"""
grib_processor
~~~~~~~~~~~~~~
Ingest grib files and publish metadata for files to RabbitMQ.
"""
from grib_processor.grib import GribPayload, itergrib
from grib_processor.main import dump_message, publish_message
from grib_processor.utils import grib_file_watch
__author__ = "Max Drexler"
__email__ = "mndrexler@wisc.edu"
__version__ = "0.0.1"
__all__ = [
"GribPayload",
"itergrib",
"dump_message",
"publish_message",
"grib_file_watch",
]
"""
grib_processor.__main__
~~~~~~~~~~~~~~~~~~~~~~~
Entrypoint to grib processing when using python -m grib_processor.
"""
import sys
from grib_processor.main import main
if __name__ == "__main__":
sys.exit(main())
This diff is collapsed.
"""
grib_processor.amqp
~~~~~~~~~~~~~~~~~~~
How the processor generates grib messages.
"""
from __future__ import annotations
from datetime import datetime
import json
import os
import sys
from typing import Generator
import grib2io
from ssec_amqp import utils as amqp_utils
from typing_extensions import Literal, TypedDict
from grib_processor import data
if sys.version_info < (3, 9):
import importlib_resources as resources
else:
import importlib.resources as resources
# Contains a serializable mapping of first_lat, first_lon, rows, cols,
# and generating_process_ids to xcd model names and ids.
#
# format:
# dict[first_lat,
# dict[first_lon,
# dict[rows,
# dict[cols,
# dict[generating_process_id, list[str]]
# ]
# ]
# ]
# ]
# Loaded using load_xcd_models()
_XCD_MODELS = None
# default model_name, model_id
_XCD_MISSING = ("UNKWN", "UNKWN")
class GribMetadata(TypedDict):
"""Metadata extracted from each grib message."""
first_lat: float | None
first_lon: float | None
last_lat: float | None
last_lon: float | None
forecast_hour: int
run_hour: int
model_time: datetime
start_time: datetime
projection: str
center_id: int
center_desc: str
level: str
parameter: str
param_long: str
param_units: str
grib_number: Literal[1, 2]
size: str
ensemble: int | None
model_id: int
model: str
xcd_model: str
xcd_model_id: str
grib_record_number: int
title: str
class GribPayload(GribMetadata):
"""Amqp payload for each grib message.
Includes metadata about the grib message and metadata about
the file and server itself.
"""
__payload_gen_time__: datetime
__injector_script__: str
path: str
directory: str
file_name: str
server_ip: str
server_type: Literal["realtime", "archive"]
def load_xcd_models(
*addtnl_models: dict[str, dict[str, dict[str, dict[str, dict[str, list[str]]]]]],
) -> None:
"""Load the xcd models from the package."""
global _XCD_MODELS
if _XCD_MODELS is None:
# This MUST match name of xcd file in grib_processor.data
data_path = resources.files(data) / "xcd_model_info.json"
with data_path.open("r") as xcd_data:
_XCD_MODELS = json.load(xcd_data)
for addtnl in addtnl_models:
_XCD_MODELS.update(addtnl)
def xcd_lookup(
first_lat: float, first_lon: float, rows: int, cols: int, gen_proc_id: int
) -> tuple[str, str]:
"""Looks up xcd model names and ids based on grib message properties.
Args:
first_lat (float): The first latitude of the grib message.
first_lon (float): The first longitude of the grib message.
rows (int): The number of rows in the grib message.
cols (int): The number of columns in the grib message.
gen_proc_id (int): The generating process id of the grib message.
Returns:
tuple[str, str]: (xcd model name, xcd model id) or ("UNKWN", "UNKWN")
if can't find model info based on given properties.
"""
if _XCD_MODELS is None:
return _XCD_MISSING
try:
return tuple(
_XCD_MODELS[str(first_lat)][str(first_lon)][str(rows)][str(cols)][
str(gen_proc_id)
]
)
except KeyError:
return _XCD_MISSING
def extract_metadata(msg: grib2io.Grib2Message) -> GribMetadata:
"""Extract metadata about a grib2 message created using grib2io.
Returns:
dictionary with keys defined by GribMetadata.
"""
f_lat = getattr(msg, "latitudeFirstGridpoint", None)
f_lon = getattr(msg, "longitudeFirstGridpoint", None)
rows = msg.ny
cols = msg.nx
gen_proc = msg.generatingProcess
if f_lat is None or f_lon is None:
xcd_info = _XCD_MISSING
else:
xcd_info = xcd_lookup(f_lat, f_lon, rows, cols, gen_proc.value)
return GribMetadata(
first_lat=f_lat,
last_lat=getattr(msg, "latitudeLastGridpoint", None),
first_lon=f_lon,
last_lon=getattr(msg, "longitudeLastGridpoint", None),
forecast_hour=int(msg.valueOfForecastTime),
run_hour=int(msg.hour) * 100 + int(msg.minute),
model_time=amqp_utils.format_datetime(msg.refDate),
start_time=amqp_utils.format_datetime(msg.refDate),
projection=msg.gridDefinitionTemplateNumber.definition,
center_id=int(msg.originatingCenter.value),
center_desc=msg.originatingCenter.definition,
level=msg.level,
parameter=msg.shortName,
param_long=msg.fullName,
param_units=msg.units,
grib_number=2,
size="%d x %d" % (rows, cols),
ensemble=getattr(msg, "pertubationNumber", None),
model_id=int(gen_proc.value),
model=gen_proc.definition,
xcd_model=xcd_info[1],
xcd_model_id=xcd_info[0],
# resolution=msg._xydivisor, THIS IS NOT CORRECT!
grib_record_number=msg._msgnum,
title=str(msg),
)
def itergrib(
grib_path: str, start_message: int | None = None
) -> Generator[GribPayload, None, None]:
"""Generate AMQP payloads from a grib file, starting at a specific message.
Args:
grib_path (str): The path to the grib file to create payloads for.
start_message (int): Which grib message to start yielding payloads for.
Yields:
JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file.
"""
start_message = start_message or 0
with grib2io.open(grib_path) as grib_file:
for msg in grib_file[start_message:]:
meta = extract_metadata(msg)
yield GribPayload(
__payload_gen_time__=amqp_utils.format_datetime(datetime.now()),
__injector_script__=amqp_utils.INJECTOR_SCRIPT,
path=grib_path,
directory=os.path.dirname(grib_path),
file_name=os.path.basename(grib_path),
server_ip=amqp_utils.SERVER_NAME,
server_type="realtime",
**meta,
)
"""
grib_processor.main
~~~~~~~~~~~~~~~~~~~
Unified entrypoint to the grib processor.
"""
from __future__ import annotations
import argparse
from itertools import chain
import logging
import logging.handlers
import os
import signal
import sys
from typing import Callable, Iterable
import warnings
from dotenv import load_dotenv
import ssec_amqp.api as mq
from grib_processor.grib import GribPayload, itergrib, load_xcd_models
from grib_processor.utils import (
dump_message,
grib_file_watch,
publish_message,
realtime,
DEFAULT_WATCH_DEBOUNCE,
REALTIME_WATCH_DEBOUNCE,
signalcontext,
)
# Logging stuff
LOG = logging.getLogger("grib_ingest")
LOG_LEVELS = [
logging.CRITICAL,
logging.ERROR,
logging.WARNING,
logging.INFO,
logging.DEBUG,
]
LOG_NAME = "grib_processor.log"
# Where we're publishing data to
DEFAULT_AMQP_SERVERS = ["mq1.ssec.wisc.edu", "mq2.ssec.wisc.edu", "mq3.ssec.wisc.edu"]
# Where on the servers we're putting the data.
DEFAULT_AMQP_EXCHANGE = "model"
def initialize_logging(verbosity: int, rotating_dir: str | None = None) -> None:
"""Set up logging for the package. Don't use basicConfig so we don't
override other's logs.
Optionally, can specify an existing directory to put rotating log files into.
"""
# Set up logging
log_formatter = logging.Formatter(
fmt="[%(asctime)s][%(levelname)s]-%(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
if rotating_dir is not None:
file_handler = logging.handlers.TimedRotatingFileHandler(
os.path.join(rotating_dir, LOG_NAME), when="D", utc=True
)
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
LOG.addHandler(file_handler)
out_handler = logging.StreamHandler()
out_handler.setFormatter(log_formatter)
out_handler.setLevel(LOG_LEVELS[min(verbosity, len(LOG_LEVELS) - 1)])
LOG.addHandler(out_handler)
LOG.setLevel(logging.DEBUG)
def setup() -> tuple[Iterable[GribPayload], Callable[[GribPayload], None]]:
"""Initialize the script so it's ready to run."""
# Set up cmdline args
parser = argparse.ArgumentParser(
description="Set paramters for ingesting and publishing of grib data."
)
parser.add_argument(
"grib_src",
nargs="?",
default=None,
metavar="grib_source",
help="Where to get grib files from. Either stdin (default), a directory to watch, or a grib file itself.",
)
parser.add_argument(
"-v",
"--verbosity",
type=int,
default=3,
help="Specify verbosity to stderr from 0 (CRITICAL) to 5 (DEBUG).",
)
parser.add_argument(
"--log-dir",
default=None,
help="Set up a rotating file logger in this directory.",
)
parser.add_argument(
"-R",
"--realtime",
action="store_true",
help="Turns on parameters for more efficient realtime processing like caching and debounce. Also adds more robust error handling.",
)
parser.add_argument(
"-r",
"--recursive",
action="store_true",
help="When using a directory as the source, process all files in the directory recursively instead of setting up a watch.",
)
parser.add_argument(
"-o",
"--output",
choices=["amqp", "json"],
default="amqp",
help="Where to output processed grib messages to. Default is %(default)s.",
)
mq_group = parser.add_argument_group(
"RabbitMQ arguments",
"Options for how to output grib messages when using '-o=amqp'.",
)
mq_group.add_argument(
"--servers",
nargs="+",
default=DEFAULT_AMQP_SERVERS,
help="Servers to publish AMQP messages to. Defaults to %(default)s",
)
mq_group.add_argument(
"-X",
"--exchange",
default=DEFAULT_AMQP_EXCHANGE,
help='RabbitMQ exchange to publish the messages to. Defaults to "%(default)s"',
)
mq_group.add_argument(
"-u",
"--user",
default=None,
help="RabbitMQ user to login with. Can also specify using a .env file, see README#configuration.",
)
mq_group.add_argument(
"-p",
"--password",
default=None,
help="RabbitMQ password to login with. Can also specify using a .env file, see README#configuration.",
)
args = parser.parse_args()
if args.verbosity < 0:
parser.error("--verbosity cannot be negative!")
initialize_logging(args.verbosity, args.log_dir)
load_xcd_models()
# Get an iterator over grib files to process
file_iter: Iterable[str]
if args.grib_src is None:
LOG.info("Sourcing grib files from stdin, realtime=%s", args.realtime)
file_iter = map(str.strip, iter(sys.stdin.readline, ""))
elif os.path.isdir(args.grib_src):
LOG.info(
"Sourcing grib files from directory watch @ %s realtime=%s",
args.grib_src,
args.realtime,
)
if not args.realtime:
warnings.warn("Processing files from a directory without using --realtime!")
debounce = DEFAULT_WATCH_DEBOUNCE
else:
debounce = REALTIME_WATCH_DEBOUNCE
file_iter = grib_file_watch(
args.grib_src, debounce=debounce, recursive=args.recursive
)
elif os.path.isfile(args.grib_src):
LOG.info("Grib source is a file, parsing directly, %s", args.grib_src)
file_iter = (str(args.grib_src),)
else:
parser.error("{0} is not a valid source!".format(args.grib_src))
# Get an iterator of grib messages to emit
source: Iterable[GribPayload]
if args.realtime:
source = realtime(file_iter)
else:
source = chain.from_iterable(map(itergrib, file_iter))
# Possible to source connection parameters from the environment
if not load_dotenv():
LOG.warning("Couldn't find .env file")
if args.output == "amqp":
LOG.info("Emitting messages to amqp")
# will automatically disconnect on exit
mq.connect(
*args.servers,
user=args.user or os.getenv("RMQ_USER", None),
password=args.password or os.getenv("RMQ_PASS", None),
exchange=args.exchange,
)
LOG.info("Connected to %s", ", ".join(args.servers))
emit = publish_message
else:
LOG.info("Emitting messages to stdout")
emit = dump_message
return source, emit
def __raise_interrupt(_sig, _frame): # noqa: ARG001
"""Signal handler that raises KeybardInterrupt."""
raise KeyboardInterrupt
def main() -> None:
"""Main method, setup and start processing."""
try:
source, emit = setup()
except KeyboardInterrupt:
return
with signalcontext(signal.SIGTERM, __raise_interrupt):
try:
for grib_msg in source:
LOG.debug("Got %s from source", grib_msg)
emit(grib_msg)
except KeyboardInterrupt:
LOG.critical("Got interrupt, goodbye!")
return
This diff is collapsed.
This diff is collapsed.
......@@ -2,3 +2,5 @@ grib2io >= 2.2.0
watchfiles >= 0.20.0
python-dotenv >= 1.0.0
quickmq >= 1.1.0
typing-extensions
importlib_resoureces; python_version < '3.9'
ruff
pre-commit
mypy
pytest
coverage
This diff is collapsed.
todo 0 → 100644
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment