Skip to content
Snippets Groups Projects
Verified Commit a5a38865 authored by David Hoese's avatar David Hoese
Browse files

Refactor the rest of the package to be cleaner and pass pre-commit

parent 1f1c03dc
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
"""Insert Tower data in to an InfluxDB for real time use.""" """Insert Tower data in to an InfluxDB for real time use."""
import logging import logging
import logging.handlers import logging.handlers
import sys import sys
import time import time
from collections.abc import Iterable
from datetime import timedelta from datetime import timedelta
from pathlib import Path
from urllib.parse import urlencode from urllib.parse import urlencode
import numpy as np import numpy as np
...@@ -17,6 +18,7 @@ from metobscommon.util.nc import calculate_wind_gust ...@@ -17,6 +18,7 @@ from metobscommon.util.nc import calculate_wind_gust
from aosstower.level_00.parser import read_frames from aosstower.level_00.parser import read_frames
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
HTTP_SUCCESS = 200
# map station name to InfluxDB tags # map station name to InfluxDB tags
STATIONS = { STATIONS = {
"AOSS Tower": {"inst": "tower", "site": "aoss"}, "AOSS Tower": {"inst": "tower", "site": "aoss"},
...@@ -63,8 +65,12 @@ class Updater: ...@@ -63,8 +65,12 @@ class Updater:
averaging every submit_interval of data added. averaging every submit_interval of data added.
""" """
def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)): def __init__(
"""intervals are timedelta objects.""" self,
data_interval: timedelta = timedelta(seconds=5),
submit_interval: timedelta = timedelta(minutes=5),
):
"""Initialize helper for updating collection of data as new data arrives."""
self.data = {"timestamp": np.array([])} self.data = {"timestamp": np.array([])}
self.data_interval = data_interval.total_seconds() self.data_interval = data_interval.total_seconds()
self.submit_interval = submit_interval.total_seconds() self.submit_interval = submit_interval.total_seconds()
...@@ -100,7 +106,7 @@ class Updater: ...@@ -100,7 +106,7 @@ class Updater:
if "air_temp" in frame and "rh" in frame and ("dewpoint" in frame or "dewpoint_mean" in frame): if "air_temp" in frame and "rh" in frame and ("dewpoint" in frame or "dewpoint_mean" in frame):
LOG.info( LOG.info(
"'dewpoint' is missing from the input file, will calculate " "it from air temp and relative humidity", "'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity",
) )
frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"]) frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"])
...@@ -129,10 +135,10 @@ class Updater: ...@@ -129,10 +135,10 @@ class Updater:
def convert_to_influx_frame(record_gen, symbols, debug=False): def convert_to_influx_frame(record_gen, symbols, debug=False):
for idx, record in enumerate(record_gen): for idx, record in enumerate(record_gen):
LOG.info("Inserting records for frame %d", idx) LOG.info("Inserting records for frame %d", idx)
record = {symbols[k] or k: v for k, v in record.items() if k in symbols} record_with_known_symbols = {symbols[k] or k: v for k, v in record.items() if k in symbols}
if debug: if debug:
print(idx, record) print(idx, record_with_known_symbols) # noqa: T201
yield record yield record_with_known_symbols
def construct_url(data): def construct_url(data):
...@@ -189,6 +195,47 @@ def get_url_data(avg, wu_id, wu_pw): ...@@ -189,6 +195,47 @@ def get_url_data(avg, wu_id, wu_pw):
def main(): def main():
parser = _create_arg_parser()
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
LOG.info("source: %s", args.src)
LOG.info("tail: %s", args.tail)
station_tags = STATIONS[args.station]
symbols = SYMBOL_CONVERSIONS
wu_pw = None
# don't even load the password unless we're also doing LDMP realtime ingest
if args.weather_underground and args.ldmp:
with Path(args.wu_password_file).open() as wu_pw_file:
wu_pw = wu_pw_file.read().strip()
record_gen = _get_record_generator(
args.src,
use_source_as_ldmp=args.ldmp,
ldmp_tables=args.tables,
tail_file=args.tail,
)
influxdb_conn_params = {"host": args.host, "port": args.port, "dbname": args.dbname}
try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk)
_ingest_loggernet_to_influxdb_and_weatherunderground(
influx_gen,
influxdb_conn_params,
station_tags,
wu_id=args.wu_id,
wu_pw=wu_pw,
debug=args.debug,
sleep_interval=args.sleep_interval,
)
except (RuntimeError, ValueError, KeyError, requests.RequestException):
if hasattr(record_gen, "close"):
record_gen.close()
def _create_arg_parser():
import argparse import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
...@@ -210,7 +257,7 @@ def main(): ...@@ -210,7 +257,7 @@ def main():
parser.add_argument( parser.add_argument(
"--ldmp", "--ldmp",
action="store_true", action="store_true",
help="Treat `src` file as a station name and read records from" "LoggerNet LDMP server (port: 1024)", help="Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)",
) )
parser.add_argument("--tables", nargs="*", default=["1"], help="LoggerNet LDMP tables to read in") parser.add_argument("--tables", nargs="*", default=["1"], help="LoggerNet LDMP tables to read in")
parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection") parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection")
...@@ -248,68 +295,76 @@ def main(): ...@@ -248,68 +295,76 @@ def main():
"the database. For large inserts this should be " "the database. For large inserts this should be "
"between 5000 to 10000. Default: 1", "between 5000 to 10000. Default: 1",
) )
parser.add_argument("src", help="Level 0 raw data file or station name " "for LDMP reading") parser.add_argument("src", help="Level 0 raw data file or station name for LDMP reading")
return parser
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
LOG.info("source: %s", args.src) def _get_record_generator(
LOG.info("tail: %s", args.tail) src_file_or_ldmp_station_name,
station_tags = STATIONS[args.station] *,
symbols = SYMBOL_CONVERSIONS use_source_as_ldmp: bool,
ldmp_tables: list[str],
tail_file: bool,
):
if use_source_as_ldmp:
from aosstower.level_00.parser import LDMPGenerator
wu_pw = None return LDMPGenerator(src_file_or_ldmp_station_name, ldmp_tables)
if args.weather_underground: src = Path(src_file_or_ldmp_station_name).open()
wu_pw = open(args.wu_password_file).read().strip() return read_frames(src, tail=tail_file)
if args.ldmp:
from aosstower.level_00.parser import LDMPGenerator
record_gen = LDMPGenerator(args.src, args.tables) def _ingest_loggernet_to_influxdb_and_weatherunderground(
else: influx_gen: Iterable,
src = open(args.src) influxdb_conn_params: dict,
record_gen = read_frames(src, tail=args.tail) station_tags: dict,
*,
wu_id: str,
wu_pw: str,
debug: bool,
sleep_interval: int,
) -> None:
updater = Updater()
for record in influx_gen:
if not debug:
lines = influxdb.frame_records(record, **station_tags)
influxdb.insert(lines, **influxdb_conn_params)
if not debug and not wu_pw:
# we don't plan on doing anything with averaged weatherunderground uploads
continue
# Record is in a list of size 1, but want just the record.
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
if (avg := updater.rolling_average(record[0])) is not None:
url = construct_url(get_url_data(avg, wu_id, wu_pw))
if debug:
print(url) # noqa: T201
continue
_send_data_to_weatherunderground(url)
if sleep_interval:
time.sleep(sleep_interval)
def _send_data_to_weatherunderground(url: str) -> None:
LOG.info("Uploading new data to wunderground...")
try: try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) resp = requests.post(url, timeout=15)
influx_gen = influxdb.grouper(influx_gen, args.bulk) if resp.status_code != HTTP_SUCCESS:
updater = Updater() LOG.warning(
for record in influx_gen: "Data failed to upload to {} with status code {}: {}".format(
if not args.debug: url,
lines = influxdb.frame_records(record, **station_tags) resp.status_code,
influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) resp.text,
# Record is in a list of size 1, but want just the record. ),
avg = updater.rolling_average(record[0]) )
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. else:
if avg is not None and (args.debug or wu_pw and args.ldmp): LOG.info("Upload successful")
url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) except requests.Timeout:
if args.debug: LOG.error("Data upload to wunderground timed out", exc_info=True)
print(url) except requests.ConnectionError:
else: LOG.error("Data upload to wunderground had a connection error", exc_info=True)
LOG.info("Uploading new data to wunderground...")
try:
resp = requests.post(url, timeout=15)
if resp.status_code != 200:
LOG.warning(
"Data failed to upload to {} with status code {}: {}".format(
url,
resp.status_code,
resp.text,
),
)
else:
LOG.info("Upload successful")
except requests.Timeout:
LOG.error("Data upload to wunderground timed out", exc_info=True)
except requests.ConnectionError:
LOG.error("Data upload to wunderground had a connection error", exc_info=True)
if args.sleep_interval:
time.sleep(args.sleep_interval)
except (RuntimeError, ValueError, KeyError, requests.RequestException):
if hasattr(record_gen, "close"):
record_gen.close()
if __name__ == "__main__": if __name__ == "__main__":
......
#!/usr/bin/env python
"""Insert RIG Tower data from a level 0 (raw) data file into a MetObs database. """Insert RIG Tower data from a level 0 (raw) data file into a MetObs database.
All error messages will go to stderr, all other logging will go to stdout. So All error messages will go to stderr, all other logging will go to stdout. So
...@@ -10,6 +9,7 @@ to make sure errors are logged: ...@@ -10,6 +9,7 @@ to make sure errors are logged:
import logging import logging
import logging.handlers import logging.handlers
import sys import sys
from pathlib import Path
from metobscommon import legacy_db from metobscommon import legacy_db
...@@ -123,7 +123,7 @@ def main(): ...@@ -123,7 +123,7 @@ def main():
record_gen = LDMPGenerator(args.src, args.tables, symbol_names=[x[1] for x in _symbol_record_map]) record_gen = LDMPGenerator(args.src, args.tables, symbol_names=[x[1] for x in _symbol_record_map])
else: else:
src = open(args.src) src = Path(args.src).open()
record_gen = read_frames(src, tail=args.tail) record_gen = read_frames(src, tail=args.tail)
for _idx, record in enumerate(record_gen): for _idx, record in enumerate(record_gen):
......
...@@ -38,7 +38,7 @@ we have 2 altimeter values but as far as I know altimeter2 is not used. ...@@ -38,7 +38,7 @@ we have 2 altimeter values but as far as I know altimeter2 is not used.
XXX: Fill value in version 2 seems to be -99999. XXX: Fill value in version 2 seems to be -99999.
""" """
import contextlib
import io import io
import logging import logging
import re import re
...@@ -77,8 +77,9 @@ def _make_frame(data, new_symbols=None, rename_timestamp=False): ...@@ -77,8 +77,9 @@ def _make_frame(data, new_symbols=None, rename_timestamp=False):
try: try:
new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key
frame[new_key] = database[key].type(value) frame[new_key] = database[key].type(value)
except (ValueError, TypeError): except (ValueError, TypeError) as e:
raise LineParseError("error converting '%s' using %s", value, database[key].type) msg = f"error converting '{value}' using {database[key].type}"
raise LineParseError(msg) from e
return frame return frame
...@@ -86,6 +87,7 @@ class ParserV0: ...@@ -86,6 +87,7 @@ class ParserV0:
"""Parses Version 0 data lines.""" """Parses Version 0 data lines."""
fill_value = -99999.0 fill_value = -99999.0
num_elements = 32
# maps v0 names to names in schema db # maps v0 names to names in schema db
names = { names = {
...@@ -112,10 +114,11 @@ class ParserV0: ...@@ -112,10 +114,11 @@ class ParserV0:
def make_frame(self, line): def make_frame(self, line):
parts = line.split() parts = line.split()
if len(parts) != 32: if len(parts) != self.num_elements:
raise LineParseError("Expected 32 components", line) msg = f"Expected {self.num_elements} components"
raise LineParseError(msg, line)
raw_data = [("version", 0)] raw_data = [("version", 0)]
for k1, v1 in zip(parts[0::2], parts[1::2]): for k1, v1 in zip(parts[0::2], parts[1::2], strict=False):
if k1 == "TIME": if k1 == "TIME":
continue continue
if k1 in self.names: if k1 in self.names:
...@@ -126,8 +129,9 @@ class ParserV0: ...@@ -126,8 +129,9 @@ class ParserV0:
time_str = parts[1] time_str = parts[1]
unix_time = int(time_str) unix_time = int(time_str)
raw_data.append(("stamp", datetime.utcfromtimestamp(unix_time))) raw_data.append(("stamp", datetime.utcfromtimestamp(unix_time)))
except (ValueError, TypeError): except (ValueError, TypeError) as e:
raise LineParseError("Could not parse stamp", line) msg = "Could not parse stamp"
raise LineParseError(msg, line) from e
return _make_frame(raw_data) return _make_frame(raw_data)
...@@ -195,23 +199,28 @@ class ParserV1V2: ...@@ -195,23 +199,28 @@ class ParserV1V2:
def make_frame(self, line): def make_frame(self, line):
parts = line.split(",") parts = line.split(",")
if len(parts) not in [28, 29, 33, 34]: if len(parts) not in [28, 29, 33, 34]:
raise LineParseError("Expected 28, 29, 33, or 34 parts", line) msg = "Expected 28, 29, 33, or 34 parts"
raise LineParseError(msg, line)
version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)] version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)]
raw_data = [("version", version), *list(zip(self.names, parts))] raw_data = [("version", version), *list(zip(self.names, parts, strict=False))]
try: try:
raw_data.append(("stamp", self._get_stamp(parts))) raw_data.append(("stamp", self._get_stamp(parts)))
except (TypeError, ValueError): except (TypeError, ValueError) as e:
raise LineParseError("Could not parse timesamp", line) msg = "Could not parse timesamp"
raise LineParseError(msg, line) from e
return _make_frame(raw_data) return _make_frame(raw_data)
def read_frames(source, error_handler=lambda *a: None, tail=False): def read_frames(source, error_handler=None, tail=False):
"""Returns a generator for reading frames from `source`. Frames are """Generate frames from `source`.
checked line-by-line so frame line versions may be mixed.
Frames are checked line-by-line so frame line versions may be mixed.
:param tail: starting from the end of the source (if 'seek' method) read lines forever :param tail: starting from the end of the source (if 'seek' method) read lines forever
""" """
fptr = source if hasattr(source, "readlines") else open(source) if error_handler is None:
error_handler = _default_error_handler
fptr = source if hasattr(source, "readlines") else open(source) # noqa: SIM115, PTH123
if tail and hasattr(fptr, "seek"): if tail and hasattr(fptr, "seek"):
LOG.debug("Seeking to end of frame source") LOG.debug("Seeking to end of frame source")
fptr.seek(0, io.SEEK_END) fptr.seek(0, io.SEEK_END)
...@@ -234,7 +243,11 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): ...@@ -234,7 +243,11 @@ def read_frames(source, error_handler=lambda *a: None, tail=False):
continue continue
yield idx, line yield idx, line
for idx, line in gen(): yield from _parse_lines(gen(), error_handler)
def _parse_lines(line_generator, error_handler):
for idx, line in line_generator:
if line.startswith("#"): if line.startswith("#"):
continue continue
for parser in [ParserV1V2(), ParserV0()]: for parser in [ParserV1V2(), ParserV0()]:
...@@ -251,18 +264,29 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): ...@@ -251,18 +264,29 @@ def read_frames(source, error_handler=lambda *a: None, tail=False):
error_handler(idx + 1, line, RuntimeError("no parser found", line)) error_handler(idx + 1, line, RuntimeError("no parser found", line))
def _default_error_handler(*_):
return None
def loggernet_to_tower(rec_dict, symbol_names): def loggernet_to_tower(rec_dict, symbol_names):
"""Convert loggernet record dictionary to our standard naming.""" """Convert loggernet record dictionary to our standard naming."""
# assume that the next record after the traditional frame is the timestamp # assume that the next record after the traditional frame is the timestamp
old_symbols = ["timestamp", *ParserV1V2.names] old_symbols = ["timestamp", *ParserV1V2.names]
new_symbols = ["timestamp", *symbol_names] new_symbols = ["timestamp", *symbol_names]
return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True) return _make_frame(zip(old_symbols, rec_dict.values(), strict=False), new_symbols, rename_timestamp=True)
class LDMPGenerator: class LDMPGenerator:
"""Class to manage receiving records from Loggernet LDMP server.""" """Class to manage receiving records from Loggernet LDMP server."""
def __init__(self, station_name, tables, symbol_names=ParserV1V2.names, host="localhost", port=1024): def __init__( # noqa: PLR0913
self,
station_name,
tables,
symbol_names=ParserV1V2.names,
host="localhost",
port=1024,
):
from metobscommon.archive.loggernet_receiver import LDMPReceiver from metobscommon.archive.loggernet_receiver import LDMPReceiver
self.station_name = station_name self.station_name = station_name
...@@ -283,7 +307,5 @@ class LDMPGenerator: ...@@ -283,7 +307,5 @@ class LDMPGenerator:
def __del__(self): def __del__(self):
"""Last effort to kill the background thread if not done already.""" """Last effort to kill the background thread if not done already."""
try: with contextlib.suppress(ValueError, RuntimeError, OSError):
self.close() self.close()
except (ValueError, RuntimeError, OSError):
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment