diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index 8b5177c54fa53b92cbbf34cde73cbca4c81b576b..c5c48c5d5aa7c4a1a8240f2e6ad91128d9ce50d7 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -1,10 +1,11 @@ -#!/usr/bin/env python """Insert Tower data in to an InfluxDB for real time use.""" import logging import logging.handlers import sys import time +from collections.abc import Iterable from datetime import timedelta +from pathlib import Path from urllib.parse import urlencode import numpy as np @@ -17,6 +18,7 @@ from metobscommon.util.nc import calculate_wind_gust from aosstower.level_00.parser import read_frames LOG = logging.getLogger(__name__) +HTTP_SUCCESS = 200 # map station name to InfluxDB tags STATIONS = { "AOSS Tower": {"inst": "tower", "site": "aoss"}, @@ -63,8 +65,12 @@ class Updater: averaging every submit_interval of data added. """ - def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)): - """intervals are timedelta objects.""" + def __init__( + self, + data_interval: timedelta = timedelta(seconds=5), + submit_interval: timedelta = timedelta(minutes=5), + ): + """Initialize helper for updating collection of data as new data arrives.""" self.data = {"timestamp": np.array([])} self.data_interval = data_interval.total_seconds() self.submit_interval = submit_interval.total_seconds() @@ -100,7 +106,7 @@ class Updater: if "air_temp" in frame and "rh" in frame and ("dewpoint" in frame or "dewpoint_mean" in frame): LOG.info( - "'dewpoint' is missing from the input file, will calculate " "it from air temp and relative humidity", + "'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity", ) frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"]) @@ -129,10 +135,10 @@ class Updater: def convert_to_influx_frame(record_gen, symbols, debug=False): for idx, record in enumerate(record_gen): LOG.info("Inserting records for frame %d", idx) - record = {symbols[k] or k: v for k, v in record.items() if k in symbols} + record_with_known_symbols = {symbols[k] or k: v for k, v in record.items() if k in symbols} if debug: - print(idx, record) - yield record + print(idx, record_with_known_symbols) # noqa: T201 + yield record_with_known_symbols def construct_url(data): @@ -189,6 +195,47 @@ def get_url_data(avg, wu_id, wu_pw): def main(): + parser = _create_arg_parser() + args = parser.parse_args() + levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] + logging.basicConfig(level=levels[min(3, args.verbosity)]) + + LOG.info("source: %s", args.src) + LOG.info("tail: %s", args.tail) + station_tags = STATIONS[args.station] + symbols = SYMBOL_CONVERSIONS + + wu_pw = None + # don't even load the password unless we're also doing LDMP realtime ingest + if args.weather_underground and args.ldmp: + with Path(args.wu_password_file).open() as wu_pw_file: + wu_pw = wu_pw_file.read().strip() + + record_gen = _get_record_generator( + args.src, + use_source_as_ldmp=args.ldmp, + ldmp_tables=args.tables, + tail_file=args.tail, + ) + influxdb_conn_params = {"host": args.host, "port": args.port, "dbname": args.dbname} + try: + influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) + influx_gen = influxdb.grouper(influx_gen, args.bulk) + _ingest_loggernet_to_influxdb_and_weatherunderground( + influx_gen, + influxdb_conn_params, + station_tags, + wu_id=args.wu_id, + wu_pw=wu_pw, + debug=args.debug, + sleep_interval=args.sleep_interval, + ) + except (RuntimeError, ValueError, KeyError, requests.RequestException): + if hasattr(record_gen, "close"): + record_gen.close() + + +def _create_arg_parser(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -210,7 +257,7 @@ def main(): parser.add_argument( "--ldmp", action="store_true", - help="Treat `src` file as a station name and read records from" "LoggerNet LDMP server (port: 1024)", + help="Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)", ) parser.add_argument("--tables", nargs="*", default=["1"], help="LoggerNet LDMP tables to read in") parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection") @@ -248,68 +295,76 @@ def main(): "the database. For large inserts this should be " "between 5000 to 10000. Default: 1", ) - parser.add_argument("src", help="Level 0 raw data file or station name " "for LDMP reading") + parser.add_argument("src", help="Level 0 raw data file or station name for LDMP reading") + return parser - args = parser.parse_args() - levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] - logging.basicConfig(level=levels[min(3, args.verbosity)]) - LOG.info("source: %s", args.src) - LOG.info("tail: %s", args.tail) - station_tags = STATIONS[args.station] - symbols = SYMBOL_CONVERSIONS +def _get_record_generator( + src_file_or_ldmp_station_name, + *, + use_source_as_ldmp: bool, + ldmp_tables: list[str], + tail_file: bool, +): + if use_source_as_ldmp: + from aosstower.level_00.parser import LDMPGenerator - wu_pw = None - if args.weather_underground: - wu_pw = open(args.wu_password_file).read().strip() + return LDMPGenerator(src_file_or_ldmp_station_name, ldmp_tables) + src = Path(src_file_or_ldmp_station_name).open() + return read_frames(src, tail=tail_file) - if args.ldmp: - from aosstower.level_00.parser import LDMPGenerator - record_gen = LDMPGenerator(args.src, args.tables) - else: - src = open(args.src) - record_gen = read_frames(src, tail=args.tail) +def _ingest_loggernet_to_influxdb_and_weatherunderground( + influx_gen: Iterable, + influxdb_conn_params: dict, + station_tags: dict, + *, + wu_id: str, + wu_pw: str, + debug: bool, + sleep_interval: int, +) -> None: + updater = Updater() + for record in influx_gen: + if not debug: + lines = influxdb.frame_records(record, **station_tags) + influxdb.insert(lines, **influxdb_conn_params) + if not debug and not wu_pw: + # we don't plan on doing anything with averaged weatherunderground uploads + continue + + # Record is in a list of size 1, but want just the record. + # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. + if (avg := updater.rolling_average(record[0])) is not None: + url = construct_url(get_url_data(avg, wu_id, wu_pw)) + if debug: + print(url) # noqa: T201 + continue + _send_data_to_weatherunderground(url) + + if sleep_interval: + time.sleep(sleep_interval) + + +def _send_data_to_weatherunderground(url: str) -> None: + LOG.info("Uploading new data to wunderground...") try: - influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) - influx_gen = influxdb.grouper(influx_gen, args.bulk) - updater = Updater() - for record in influx_gen: - if not args.debug: - lines = influxdb.frame_records(record, **station_tags) - influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) - # Record is in a list of size 1, but want just the record. - avg = updater.rolling_average(record[0]) - # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. - if avg is not None and (args.debug or wu_pw and args.ldmp): - url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) - if args.debug: - print(url) - else: - LOG.info("Uploading new data to wunderground...") - try: - resp = requests.post(url, timeout=15) - if resp.status_code != 200: - LOG.warning( - "Data failed to upload to {} with status code {}: {}".format( - url, - resp.status_code, - resp.text, - ), - ) - else: - LOG.info("Upload successful") - except requests.Timeout: - LOG.error("Data upload to wunderground timed out", exc_info=True) - except requests.ConnectionError: - LOG.error("Data upload to wunderground had a connection error", exc_info=True) - - if args.sleep_interval: - time.sleep(args.sleep_interval) - except (RuntimeError, ValueError, KeyError, requests.RequestException): - if hasattr(record_gen, "close"): - record_gen.close() + resp = requests.post(url, timeout=15) + if resp.status_code != HTTP_SUCCESS: + LOG.warning( + "Data failed to upload to {} with status code {}: {}".format( + url, + resp.status_code, + resp.text, + ), + ) + else: + LOG.info("Upload successful") + except requests.Timeout: + LOG.error("Data upload to wunderground timed out", exc_info=True) + except requests.ConnectionError: + LOG.error("Data upload to wunderground had a connection error", exc_info=True) if __name__ == "__main__": diff --git a/aosstower/level_00/legacy_db_insert.py b/aosstower/level_00/legacy_db_insert.py index aca2246efe4f6b21ea4e7d0b2c55a40aa5b6c224..d84263d91563d1308b567533ba65150edc6fa566 100644 --- a/aosstower/level_00/legacy_db_insert.py +++ b/aosstower/level_00/legacy_db_insert.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python """Insert RIG Tower data from a level 0 (raw) data file into a MetObs database. All error messages will go to stderr, all other logging will go to stdout. So @@ -10,6 +9,7 @@ to make sure errors are logged: import logging import logging.handlers import sys +from pathlib import Path from metobscommon import legacy_db @@ -123,7 +123,7 @@ def main(): record_gen = LDMPGenerator(args.src, args.tables, symbol_names=[x[1] for x in _symbol_record_map]) else: - src = open(args.src) + src = Path(args.src).open() record_gen = read_frames(src, tail=args.tail) for _idx, record in enumerate(record_gen): diff --git a/aosstower/level_00/parser.py b/aosstower/level_00/parser.py index 58eb26a59c154d0823287121b5286b0c477a0cbc..288b549fea18f3d37bc8da6ceeafe47fa1634bc6 100644 --- a/aosstower/level_00/parser.py +++ b/aosstower/level_00/parser.py @@ -38,7 +38,7 @@ we have 2 altimeter values but as far as I know altimeter2 is not used. XXX: Fill value in version 2 seems to be -99999. """ - +import contextlib import io import logging import re @@ -77,8 +77,9 @@ def _make_frame(data, new_symbols=None, rename_timestamp=False): try: new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key frame[new_key] = database[key].type(value) - except (ValueError, TypeError): - raise LineParseError("error converting '%s' using %s", value, database[key].type) + except (ValueError, TypeError) as e: + msg = f"error converting '{value}' using {database[key].type}" + raise LineParseError(msg) from e return frame @@ -86,6 +87,7 @@ class ParserV0: """Parses Version 0 data lines.""" fill_value = -99999.0 + num_elements = 32 # maps v0 names to names in schema db names = { @@ -112,10 +114,11 @@ class ParserV0: def make_frame(self, line): parts = line.split() - if len(parts) != 32: - raise LineParseError("Expected 32 components", line) + if len(parts) != self.num_elements: + msg = f"Expected {self.num_elements} components" + raise LineParseError(msg, line) raw_data = [("version", 0)] - for k1, v1 in zip(parts[0::2], parts[1::2]): + for k1, v1 in zip(parts[0::2], parts[1::2], strict=False): if k1 == "TIME": continue if k1 in self.names: @@ -126,8 +129,9 @@ class ParserV0: time_str = parts[1] unix_time = int(time_str) raw_data.append(("stamp", datetime.utcfromtimestamp(unix_time))) - except (ValueError, TypeError): - raise LineParseError("Could not parse stamp", line) + except (ValueError, TypeError) as e: + msg = "Could not parse stamp" + raise LineParseError(msg, line) from e return _make_frame(raw_data) @@ -195,23 +199,28 @@ class ParserV1V2: def make_frame(self, line): parts = line.split(",") if len(parts) not in [28, 29, 33, 34]: - raise LineParseError("Expected 28, 29, 33, or 34 parts", line) + msg = "Expected 28, 29, 33, or 34 parts" + raise LineParseError(msg, line) version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)] - raw_data = [("version", version), *list(zip(self.names, parts))] + raw_data = [("version", version), *list(zip(self.names, parts, strict=False))] try: raw_data.append(("stamp", self._get_stamp(parts))) - except (TypeError, ValueError): - raise LineParseError("Could not parse timesamp", line) + except (TypeError, ValueError) as e: + msg = "Could not parse timesamp" + raise LineParseError(msg, line) from e return _make_frame(raw_data) -def read_frames(source, error_handler=lambda *a: None, tail=False): - """Returns a generator for reading frames from `source`. Frames are - checked line-by-line so frame line versions may be mixed. +def read_frames(source, error_handler=None, tail=False): + """Generate frames from `source`. + + Frames are checked line-by-line so frame line versions may be mixed. :param tail: starting from the end of the source (if 'seek' method) read lines forever """ - fptr = source if hasattr(source, "readlines") else open(source) + if error_handler is None: + error_handler = _default_error_handler + fptr = source if hasattr(source, "readlines") else open(source) # noqa: SIM115, PTH123 if tail and hasattr(fptr, "seek"): LOG.debug("Seeking to end of frame source") fptr.seek(0, io.SEEK_END) @@ -234,7 +243,11 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): continue yield idx, line - for idx, line in gen(): + yield from _parse_lines(gen(), error_handler) + + +def _parse_lines(line_generator, error_handler): + for idx, line in line_generator: if line.startswith("#"): continue for parser in [ParserV1V2(), ParserV0()]: @@ -251,18 +264,29 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): error_handler(idx + 1, line, RuntimeError("no parser found", line)) +def _default_error_handler(*_): + return None + + def loggernet_to_tower(rec_dict, symbol_names): """Convert loggernet record dictionary to our standard naming.""" # assume that the next record after the traditional frame is the timestamp old_symbols = ["timestamp", *ParserV1V2.names] new_symbols = ["timestamp", *symbol_names] - return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True) + return _make_frame(zip(old_symbols, rec_dict.values(), strict=False), new_symbols, rename_timestamp=True) class LDMPGenerator: """Class to manage receiving records from Loggernet LDMP server.""" - def __init__(self, station_name, tables, symbol_names=ParserV1V2.names, host="localhost", port=1024): + def __init__( # noqa: PLR0913 + self, + station_name, + tables, + symbol_names=ParserV1V2.names, + host="localhost", + port=1024, + ): from metobscommon.archive.loggernet_receiver import LDMPReceiver self.station_name = station_name @@ -283,7 +307,5 @@ class LDMPGenerator: def __del__(self): """Last effort to kill the background thread if not done already.""" - try: + with contextlib.suppress(ValueError, RuntimeError, OSError): self.close() - except (ValueError, RuntimeError, OSError): - pass