From 4ad214091fe047e81f13fc7299430879c8802a73 Mon Sep 17 00:00:00 2001 From: Max Drexler <mndrexler@wisc.edu> Date: Wed, 3 Jul 2024 16:59:43 +0000 Subject: [PATCH] comments --- grib_pipeline.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/grib_pipeline.py b/grib_pipeline.py index da9e67c..b071f67 100644 --- a/grib_pipeline.py +++ b/grib_pipeline.py @@ -22,12 +22,13 @@ from ssec_amqp import AmqpClient, AmqpExchange from dotenv import load_dotenv -if sys.version < (3,7): +if sys.version_info < (3,7): raise SystemError('Python version too low') HOSTNAME = gethostname() GRIB_ENDINGS = [".grb", ".grib", ".grb2", ".grib2"] + # (first_lat, first_lon, rows, cols, gen_proc_id) XCD_MODELS = { ("44.196", "174.759", "237", "377", "115"): ["DGEX-AKPS", "DGEX"], @@ -391,6 +392,8 @@ XCD_MODELS = { def watch_filter(change: Change, path: str) -> bool: + """Make sure files we ingest are grib files. + """ if change == Change.deleted: return False if Path(path).suffix in GRIB_ENDINGS: @@ -399,14 +402,27 @@ def watch_filter(change: Change, path: str) -> bool: def date_type_from_dt(dt: datetime) -> str: + """Format datetimes in a consistent manner. + + Args: + dt (datetime): the datetime object to format. + + Returns: + str: the formatted datetime. + """ return dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-5] -def date_type_from_str(date_str: str, format: str) -> str: - return date_type_from_dt(datetime.strptime(date_str, format)) +def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None]: + """Generate AMQP payloads from a grib file, starting at a specific message. + Args: + grib_path (str): The path to the grib file to create payloads for. + start_message (int): Which grib message to start yielding payloads for. -def amqp_grib(grib_path: str, start_message: int) -> Generator[dict, None, None]: + Yields: + JSON-able: The json formatted AMQP payload for each grib message from start_message to end of the file. + """ with grib2io.open(grib_path) as grib_file: # Grib2Message._msg_class_store might lead to memory leakage for msg in grib_file[start_message:]: f_lat = getattr(msg, "latitudeFirstGridpoint", None) -- GitLab