Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • wroberts/AossTower
  • davidh/AossTower
2 results
Show changes
Commits on Source (4)
#!/usr/bin/env python
"""Insert Tower data in to an InfluxDB for real time use."""
import logging
import logging.handlers
import sys
import time
from collections.abc import Iterable
from datetime import timedelta
from pathlib import Path
from urllib.parse import urlencode
import numpy as np
......@@ -17,6 +18,7 @@ from metobscommon.util.nc import calculate_wind_gust
from aosstower.level_00.parser import read_frames
LOG = logging.getLogger(__name__)
HTTP_SUCCESS = 200
# map station name to InfluxDB tags
STATIONS = {
"AOSS Tower": {"inst": "tower", "site": "aoss"},
......@@ -63,8 +65,12 @@ class Updater:
averaging every submit_interval of data added.
"""
def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)):
"""intervals are timedelta objects."""
def __init__(
self,
data_interval: timedelta = timedelta(seconds=5),
submit_interval: timedelta = timedelta(minutes=5),
):
"""Initialize helper for updating collection of data as new data arrives."""
self.data = {"timestamp": np.array([])}
self.data_interval = data_interval.total_seconds()
self.submit_interval = submit_interval.total_seconds()
......@@ -100,7 +106,7 @@ class Updater:
if "air_temp" in frame and "rh" in frame and ("dewpoint" in frame or "dewpoint_mean" in frame):
LOG.info(
"'dewpoint' is missing from the input file, will calculate " "it from air temp and relative humidity",
"'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity",
)
frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"])
......@@ -129,10 +135,10 @@ class Updater:
def convert_to_influx_frame(record_gen, symbols, debug=False):
for idx, record in enumerate(record_gen):
LOG.info("Inserting records for frame %d", idx)
record = {symbols[k] or k: v for k, v in record.items() if k in symbols}
record_with_known_symbols = {symbols[k] or k: v for k, v in record.items() if k in symbols}
if debug:
print(idx, record)
yield record
print(idx, record_with_known_symbols) # noqa: T201
yield record_with_known_symbols
def construct_url(data):
......@@ -189,6 +195,47 @@ def get_url_data(avg, wu_id, wu_pw):
def main():
parser = _create_arg_parser()
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
LOG.info("source: %s", args.src)
LOG.info("tail: %s", args.tail)
station_tags = STATIONS[args.station]
symbols = SYMBOL_CONVERSIONS
wu_pw = None
# don't even load the password unless we're also doing LDMP realtime ingest
if args.weather_underground and args.ldmp:
with Path(args.wu_password_file).open() as wu_pw_file:
wu_pw = wu_pw_file.read().strip()
record_gen = _get_record_generator(
args.src,
use_source_as_ldmp=args.ldmp,
ldmp_tables=args.tables,
tail_file=args.tail,
)
influxdb_conn_params = {"host": args.host, "port": args.port, "dbname": args.dbname}
try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk)
_ingest_loggernet_to_influxdb_and_weatherunderground(
influx_gen,
influxdb_conn_params,
station_tags,
wu_id=args.wu_id,
wu_pw=wu_pw,
debug=args.debug,
sleep_interval=args.sleep_interval,
)
except (RuntimeError, ValueError, KeyError, requests.RequestException):
if hasattr(record_gen, "close"):
record_gen.close()
def _create_arg_parser():
import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
......@@ -210,7 +257,7 @@ def main():
parser.add_argument(
"--ldmp",
action="store_true",
help="Treat `src` file as a station name and read records from" "LoggerNet LDMP server (port: 1024)",
help="Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)",
)
parser.add_argument("--tables", nargs="*", default=["1"], help="LoggerNet LDMP tables to read in")
parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection")
......@@ -248,68 +295,76 @@ def main():
"the database. For large inserts this should be "
"between 5000 to 10000. Default: 1",
)
parser.add_argument("src", help="Level 0 raw data file or station name " "for LDMP reading")
parser.add_argument("src", help="Level 0 raw data file or station name for LDMP reading")
return parser
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
LOG.info("source: %s", args.src)
LOG.info("tail: %s", args.tail)
station_tags = STATIONS[args.station]
symbols = SYMBOL_CONVERSIONS
def _get_record_generator(
src_file_or_ldmp_station_name,
*,
use_source_as_ldmp: bool,
ldmp_tables: list[str],
tail_file: bool,
):
if use_source_as_ldmp:
from aosstower.level_00.parser import LDMPGenerator
wu_pw = None
if args.weather_underground:
wu_pw = open(args.wu_password_file).read().strip()
return LDMPGenerator(src_file_or_ldmp_station_name, ldmp_tables)
src = Path(src_file_or_ldmp_station_name).open()
return read_frames(src, tail=tail_file)
if args.ldmp:
from aosstower.level_00.parser import LDMPGenerator
record_gen = LDMPGenerator(args.src, args.tables)
else:
src = open(args.src)
record_gen = read_frames(src, tail=args.tail)
def _ingest_loggernet_to_influxdb_and_weatherunderground(
influx_gen: Iterable,
influxdb_conn_params: dict,
station_tags: dict,
*,
wu_id: str,
wu_pw: str,
debug: bool,
sleep_interval: int,
) -> None:
updater = Updater()
for record in influx_gen:
if not debug:
lines = influxdb.frame_records(record, **station_tags)
influxdb.insert(lines, **influxdb_conn_params)
if not debug and not wu_pw:
# we don't plan on doing anything with averaged weatherunderground uploads
continue
# Record is in a list of size 1, but want just the record.
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
if (avg := updater.rolling_average(record[0])) is not None:
url = construct_url(get_url_data(avg, wu_id, wu_pw))
if debug:
print(url) # noqa: T201
continue
_send_data_to_weatherunderground(url)
if sleep_interval:
time.sleep(sleep_interval)
def _send_data_to_weatherunderground(url: str) -> None:
LOG.info("Uploading new data to wunderground...")
try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk)
updater = Updater()
for record in influx_gen:
if not args.debug:
lines = influxdb.frame_records(record, **station_tags)
influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
# Record is in a list of size 1, but want just the record.
avg = updater.rolling_average(record[0])
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
if avg is not None and (args.debug or wu_pw and args.ldmp):
url = construct_url(get_url_data(avg, args.wu_id, wu_pw))
if args.debug:
print(url)
else:
LOG.info("Uploading new data to wunderground...")
try:
resp = requests.post(url, timeout=15)
if resp.status_code != 200:
LOG.warning(
"Data failed to upload to {} with status code {}: {}".format(
url,
resp.status_code,
resp.text,
),
)
else:
LOG.info("Upload successful")
except requests.Timeout:
LOG.error("Data upload to wunderground timed out", exc_info=True)
except requests.ConnectionError:
LOG.error("Data upload to wunderground had a connection error", exc_info=True)
if args.sleep_interval:
time.sleep(args.sleep_interval)
except (RuntimeError, ValueError, KeyError, requests.RequestException):
if hasattr(record_gen, "close"):
record_gen.close()
resp = requests.post(url, timeout=15)
if resp.status_code != HTTP_SUCCESS:
LOG.warning(
"Data failed to upload to {} with status code {}: {}".format(
url,
resp.status_code,
resp.text,
),
)
else:
LOG.info("Upload successful")
except requests.Timeout:
LOG.error("Data upload to wunderground timed out", exc_info=True)
except requests.ConnectionError:
LOG.error("Data upload to wunderground had a connection error", exc_info=True)
if __name__ == "__main__":
......
#!/usr/bin/env python
"""Insert RIG Tower data from a level 0 (raw) data file into a MetObs database.
All error messages will go to stderr, all other logging will go to stdout. So
......@@ -10,6 +9,7 @@ to make sure errors are logged:
import logging
import logging.handlers
import sys
from pathlib import Path
from metobscommon import legacy_db
......@@ -123,7 +123,7 @@ def main():
record_gen = LDMPGenerator(args.src, args.tables, symbol_names=[x[1] for x in _symbol_record_map])
else:
src = open(args.src)
src = Path(args.src).open()
record_gen = read_frames(src, tail=args.tail)
for _idx, record in enumerate(record_gen):
......
......@@ -38,7 +38,7 @@ we have 2 altimeter values but as far as I know altimeter2 is not used.
XXX: Fill value in version 2 seems to be -99999.
"""
import contextlib
import io
import logging
import re
......@@ -77,8 +77,9 @@ def _make_frame(data, new_symbols=None, rename_timestamp=False):
try:
new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key
frame[new_key] = database[key].type(value)
except (ValueError, TypeError):
raise LineParseError("error converting '%s' using %s", value, database[key].type)
except (ValueError, TypeError) as e:
msg = f"error converting '{value}' using {database[key].type}"
raise LineParseError(msg) from e
return frame
......@@ -86,6 +87,7 @@ class ParserV0:
"""Parses Version 0 data lines."""
fill_value = -99999.0
num_elements = 32
# maps v0 names to names in schema db
names = {
......@@ -112,10 +114,11 @@ class ParserV0:
def make_frame(self, line):
parts = line.split()
if len(parts) != 32:
raise LineParseError("Expected 32 components", line)
if len(parts) != self.num_elements:
msg = f"Expected {self.num_elements} components"
raise LineParseError(msg, line)
raw_data = [("version", 0)]
for k1, v1 in zip(parts[0::2], parts[1::2]):
for k1, v1 in zip(parts[0::2], parts[1::2], strict=False):
if k1 == "TIME":
continue
if k1 in self.names:
......@@ -126,8 +129,9 @@ class ParserV0:
time_str = parts[1]
unix_time = int(time_str)
raw_data.append(("stamp", datetime.utcfromtimestamp(unix_time)))
except (ValueError, TypeError):
raise LineParseError("Could not parse stamp", line)
except (ValueError, TypeError) as e:
msg = "Could not parse stamp"
raise LineParseError(msg, line) from e
return _make_frame(raw_data)
......@@ -195,23 +199,28 @@ class ParserV1V2:
def make_frame(self, line):
parts = line.split(",")
if len(parts) not in [28, 29, 33, 34]:
raise LineParseError("Expected 28, 29, 33, or 34 parts", line)
msg = "Expected 28, 29, 33, or 34 parts"
raise LineParseError(msg, line)
version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)]
raw_data = [("version", version), *list(zip(self.names, parts))]
raw_data = [("version", version), *list(zip(self.names, parts, strict=False))]
try:
raw_data.append(("stamp", self._get_stamp(parts)))
except (TypeError, ValueError):
raise LineParseError("Could not parse timesamp", line)
except (TypeError, ValueError) as e:
msg = "Could not parse timesamp"
raise LineParseError(msg, line) from e
return _make_frame(raw_data)
def read_frames(source, error_handler=lambda *a: None, tail=False):
"""Returns a generator for reading frames from `source`. Frames are
checked line-by-line so frame line versions may be mixed.
def read_frames(source, error_handler=None, tail=False):
"""Generate frames from `source`.
Frames are checked line-by-line so frame line versions may be mixed.
:param tail: starting from the end of the source (if 'seek' method) read lines forever
"""
fptr = source if hasattr(source, "readlines") else open(source)
if error_handler is None:
error_handler = _default_error_handler
fptr = source if hasattr(source, "readlines") else open(source) # noqa: SIM115, PTH123
if tail and hasattr(fptr, "seek"):
LOG.debug("Seeking to end of frame source")
fptr.seek(0, io.SEEK_END)
......@@ -234,7 +243,11 @@ def read_frames(source, error_handler=lambda *a: None, tail=False):
continue
yield idx, line
for idx, line in gen():
yield from _parse_lines(gen(), error_handler)
def _parse_lines(line_generator, error_handler):
for idx, line in line_generator:
if line.startswith("#"):
continue
for parser in [ParserV1V2(), ParserV0()]:
......@@ -251,18 +264,29 @@ def read_frames(source, error_handler=lambda *a: None, tail=False):
error_handler(idx + 1, line, RuntimeError("no parser found", line))
def _default_error_handler(*_):
return None
def loggernet_to_tower(rec_dict, symbol_names):
"""Convert loggernet record dictionary to our standard naming."""
# assume that the next record after the traditional frame is the timestamp
old_symbols = ["timestamp", *ParserV1V2.names]
new_symbols = ["timestamp", *symbol_names]
return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True)
return _make_frame(zip(old_symbols, rec_dict.values(), strict=False), new_symbols, rename_timestamp=True)
class LDMPGenerator:
"""Class to manage receiving records from Loggernet LDMP server."""
def __init__(self, station_name, tables, symbol_names=ParserV1V2.names, host="localhost", port=1024):
def __init__( # noqa: PLR0913
self,
station_name,
tables,
symbol_names=ParserV1V2.names,
host="localhost",
port=1024,
):
from metobscommon.archive.loggernet_receiver import LDMPReceiver
self.station_name = station_name
......@@ -283,7 +307,5 @@ class LDMPGenerator:
def __del__(self):
"""Last effort to kill the background thread if not done already."""
try:
with contextlib.suppress(ValueError, RuntimeError, OSError):
self.close()
except (ValueError, RuntimeError, OSError):
pass
......@@ -21,58 +21,54 @@ def knots_to_mps(knots):
return knots * 0.51444
def dewpoint(tempC, relhum):
"""Algorithm from Tom Whittaker tempC is the temperature in degrees Celsius,
relhum is the relative humidity as a percentage.
def dewpoint(temp_c, relhum):
"""Convert air temperature and relative humidity to dewpoint.
:param tempC: temperature in celsius
Algorithm from Tom Whittaker.
:param temp_c: temperature in celsius
:param relhum: relative humidity as a percentage
"""
if tempC is None or relhum is None:
if temp_c is None or relhum is None:
return NaN
gasconst = 461.5
latheat = 2500800.0
dp = 1.0 / (1.0 / (273.15 + tempC) - gasconst * np.log((0.0 + relhum) / 100) / (latheat - tempC * 2397.5))
dp = 1.0 / (1.0 / (273.15 + temp_c) - gasconst * np.log((0.0 + relhum) / 100) / (latheat - temp_c * 2397.5))
if pd is not None and isinstance(dp, pd.Series):
return pd.concat([dp - 273.15, tempC], axis=1).min(axis=1)
return np.min(dp - 273.15, tempC)
return pd.concat([dp - 273.15, temp_c], axis=1).min(axis=1)
return np.min(dp - 273.15, temp_c)
def relhum(airTempK, dewpointTempK):
"""Algorithm derived by David Hoese from the above
dewpoint(tempC, relhum) function, both parameters are in Kelvin units.
def relhum(air_temp_k, dewpoint_temp_k):
"""Calculate relative humidity from air temperature and dewpoint temperature.
:param airTempK: air temperature in Kelvin
:param dewpointTempK: dewpoint temp in Kelvin
:param air_temp_k: air temperature in Kelvin
:param dewpoint_temp_k: dewpoint temp in Kelvin
"""
if airTempK is None or dewpointTempK is None:
if air_temp_k is None or dewpoint_temp_k is None:
return NaN
gas_constant = 461.5
latheat = 2500800.0
# Only one section of the equation
latpart = latheat - (airTempK - 273.15) * 2397.5
relativehum = 100 * math.e ** ((latpart / airTempK - latpart / dewpointTempK) / gas_constant)
return relativehum
latpart = latheat - (air_temp_k - 273.15) * 2397.5
return 100 * math.e ** ((latpart / air_temp_k - latpart / dewpoint_temp_k) / gas_constant)
def potentialtemp(airTempK, pressureMB):
def potentialtemp(air_temp_k, pressure_mb):
"""Algorithm from David Hoese to calculate potential temperature.
:param airTempK: air temperature in Kelvin
:param pressureMB: air pressure in millibars
:param air_temp_k: air temperature in Kelvin
:param pressure_mb: air pressure in millibars
"""
if airTempK is None or pressureMB is None:
if air_temp_k is None or pressure_mb is None:
return NaN
pT = airTempK * (pressureMB.max() / pressureMB) ** 0.286
return pT
return air_temp_k * (pressure_mb.max() / pressure_mb) ** 0.286
def altimeter(p, alt):
......@@ -108,44 +104,44 @@ def dir2txt(val):
>>> dir2txt(359)
'N'
"""
assert val >= 0 and val < 360, "'%s' out of range" % val
dirs = ("NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW")
if not (val >= 0 and val < 360): # noqa: PLR2004
msg = f"'{val}' out of range"
raise ValueError(msg)
cardinal_dirs = ("NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW")
if (val >= 348.75 and val <= 360) or val >= 0 and val < 11.25:
if (val >= 348.75 and val <= 360) or val >= 0 and val < 11.25: # noqa: PLR2004
return "N"
# 1/2 degree increment between the directions
i = 11.25
for dir in dirs:
for card_dir in cardinal_dirs:
if val >= i and val < (i + 22.5):
return dir
return card_dir
i += 22.5
return None
def wind_vector_components(windspd, winddir):
"""Decompose scalar or list/array polar wind direction and speed data
into the horizontal and vertical vector components and speed vector.
"""Decompose polar wind direction and speed into the horizontal and vertical vector components and speed vector.
Inputs can be scalar or arrays.
"""
dir_rad = np.deg2rad(winddir)
spd_arr = np.array(windspd)
V_e = spd_arr * np.sin(dir_rad)
V_n = spd_arr * np.cos(dir_rad)
U_spd = np.sqrt(pow(V_e, 2) + pow(V_n, 2))
return V_e, V_n, U_spd
v_e = spd_arr * np.sin(dir_rad)
v_n = spd_arr * np.cos(dir_rad)
u_spd = np.sqrt(pow(v_e, 2) + pow(v_n, 2))
return v_e, v_n, u_spd
def wind_vector_degrees(vector_east, vector_north):
"""Re-compose horizontal (east/west) and vertical (north/south) vector
components into wind direction in degrees.
"""Re-compose horizontal (east/west) and vertical (north/south) vector components into wind direction in degrees.
Inputs can be scalar or arrays.
"""
rads = np.arctan2(vector_east, vector_north)
winddir = np.rad2deg(rads)
if isinstance(winddir, (np.ndarray, Series)):
if isinstance(winddir, np.ndarray | Series):
winddir[np.less(winddir, 0)] += 360
elif winddir < 0:
winddir += 360
......@@ -153,7 +149,7 @@ def wind_vector_degrees(vector_east, vector_north):
def mean_wind_vector(windspd, winddir):
V_e, V_n, V_spd = wind_vector_components(windspd, winddir)
avg_dir = wind_vector_degrees(np.mean(V_e), np.mean(V_n))
v_e, v_n, v_spd = wind_vector_components(windspd, winddir)
avg_dir = wind_vector_degrees(np.mean(v_e), np.mean(v_n))
return avg_dir, np.mean(V_spd)
return avg_dir, np.mean(v_spd)
#!/usr/bin/env python
"""Generate AOSS Tower NetCDF4 files from Level 00 ASCII files."""
import logging
import os
import platform
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
......@@ -37,7 +37,8 @@ def _get_data(input_files):
bad_files += 1
continue
if bad_files == len(input_files):
raise ValueError("No valid input data files found")
msg = "No valid input data files found"
raise ValueError(msg)
def get_data(input_files):
......@@ -75,14 +76,14 @@ def write_global_attributes(nc_file, input_sources, interval=None, datastream=No
nc_file.command_line = " ".join(sys.argv)
# generate history
nc_file.history = " ".join(platform.uname()) + " " + os.path.basename(__file__)
nc_file.history = " ".join(platform.uname()) + " " + Path(__file__).name
nc_file.input_source = input_sources[0]
nc_file.input_sources = ", ".join(input_sources)
def create_giant_netcdf(
input_files,
output_fn,
input_files: list[Path],
output_fn: Path,
zlib,
chunk_size,
start=None,
......@@ -94,13 +95,15 @@ def create_giant_netcdf(
):
frame = get_data(input_files)
if frame.empty:
raise ValueError("No data found from input files: {}".format(", ".join(input_files)))
in_files_str = ", ".join(str(in_file) for in_file in input_files)
msg = f"No data found from input files: {in_files_str}"
raise ValueError(msg)
# Add wind direction components so we can average wind direction properly
frame["wind_east"], frame["wind_north"], _ = calc.wind_vector_components(frame["wind_speed"], frame["wind_dir"])
if "air_temp" in frame and "rh" in frame and ("dewpoint" in database or "dewpoint_mean" in database):
LOG.info("'dewpoint' is missing from the input file, will calculate " "it from air temp and relative humidity")
LOG.info("'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity")
frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"])
# round up each 1 minute group so data at time T is the average of data
......@@ -128,7 +131,7 @@ def create_giant_netcdf(
if start and end:
frame = frame[start.strftime("%Y-%m-%d %H:%M:%S") : end.strftime("%Y-%m-%d %H:%M:%S")]
chunk_sizes = [chunk_size] if chunk_size and not isinstance(chunk_size, (list, tuple)) else [frame.shape[0]]
chunk_sizes = [chunk_size] if chunk_size and not isinstance(chunk_size, list | tuple) else [frame.shape[0]]
first_stamp = datetime.strptime(str(frame.index[0]), "%Y-%m-%d %H:%M:%S")
# NETCDF4_CLASSIC was chosen so that MFDataset reading would work. See:
......@@ -147,7 +150,7 @@ def create_giant_netcdf(
write_global_attributes(
nc_file,
[os.path.basename(x) for x in input_files],
[x.name for x in input_files],
interval=interval_width,
datastream=datastream,
)
......@@ -195,13 +198,13 @@ def main():
"--start-time",
type=_dt_convert,
help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given"
+ ". Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
". Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
)
parser.add_argument(
"-e",
"--end-time",
type=_dt_convert,
help="End time of massive netcdf file. Formats allowed:" + "'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
help="End time of massive netcdf file. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
)
parser.add_argument(
"-n",
......@@ -257,7 +260,8 @@ each input file is mapped to the corresponding output file.
if args.start_time and not args.end_time:
args.end_time = args.start_time.replace(hour=23, minute=59, second=59)
elif not args.start_time and args.end_time:
raise ValueError("start time must be specified when end time is specified")
msg = "start time must be specified when end time is specified"
raise ValueError(msg)
mini_database = {k: schema.database_dict[k] for k in args.fields}
if args.summary:
......@@ -266,18 +270,19 @@ each input file is mapped to the corresponding output file.
# Case 1: All inputs to 1 output file
# Case 2: Each input in to a separate output file
if args.output_files and len(args.output_files) not in [1, len(args.input_files)]:
raise ValueError("Output filenames must be 1 or the same length as input files")
elif args.output_files and len(args.output_files) == len(args.input_files):
msg = "Output filenames must be 1 or the same length as input files"
raise ValueError(msg)
if args.output_files and len(args.output_files) == len(args.input_files):
args.input_files = [[i] for i in args.input_files]
else:
args.input_files = [args.input_files]
success = False
for in_files, out_fn in zip(args.input_files, args.output_files):
for in_files, out_fn in zip(args.input_files, args.output_files, strict=True):
try:
create_giant_netcdf(
in_files,
out_fn,
[Path(in_file) for in_file in in_files],
Path(out_fn),
args.zlib,
args.chunk_size,
args.start_time,
......@@ -291,7 +296,8 @@ each input file is mapped to the corresponding output file.
except (ValueError, TypeError):
LOG.error(f"Could not generate NetCDF file for {in_files}", exc_info=True)
if not success:
raise OSError("All ASCII files were empty or could not be read")
msg = "All ASCII files were empty or could not be read"
raise OSError(msg)
if __name__ == "__main__":
......
import matplotlib
matplotlib.use("agg")
import logging
import math
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import matplotlib as mpl
import matplotlib.dates as md
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from netCDF4 import MFDataset, MFTime
mpl.use("agg")
LOG = logging.getLogger(__name__)
FIGURE_TITLE_SIZE = 13
TN_SIZE = (1, 1)
......@@ -59,31 +59,27 @@ class PlotMaker:
delta = (end_time - start_time).total_seconds()
if delta < timedelta(hours=24).total_seconds():
return start_time.strftime("%Y-%m-%d")
else:
return f"{start_time:%Y-%m-%d %H:%M} to {end_time:%Y-%m-%d %H:%M}"
def get_title(self, frame, is_subplot, start_time, end_time):
if self._title:
title_prefix = "AO&SS Building Tower " if not is_subplot else ""
title_name = TITLES.get(self.name, self.name.replace("_", " ").title())
unit_str = f"({self.units})" if self.units and is_subplot else ""
date_string = self.get_date_string(start_time, end_time)
title = self._title.format(
title_prefix=title_prefix,
title_name=title_name,
units=unit_str,
date_string=date_string,
)
else:
title = ""
return title
return f"{start_time:%Y-%m-%d %H:%M} to {end_time:%Y-%m-%d %H:%M}"
def get_title(self, frame, is_subplot, start_time, end_time): # noqa: ARG002
if not self._title:
return ""
title_prefix = "AO&SS Building Tower " if not is_subplot else ""
title_name = TITLES.get(self.name, self.name.replace("_", " ").title())
unit_str = f"({self.units})" if self.units and is_subplot else ""
date_string = self.get_date_string(start_time, end_time)
return self._title.format(
title_prefix=title_prefix,
title_name=title_name,
units=unit_str,
date_string=date_string,
)
def get_yticks(self, ymin, ymax, num_plots):
if ymin == ymax:
return [ymin, ymin + 0.05, ymin + 0.1]
delta = math.ceil((ymax - ymin) / num_plots)
new_ticks = np.arange(ymin, (ymin + delta * num_plots), delta)
return new_ticks
return np.arange(ymin, (ymin + delta * num_plots), delta)
def _get_ylabel(self, is_subplot=False):
y_label = TITLES.get(self.name, self.name.replace("_", " ").title())
......@@ -103,8 +99,7 @@ class PlotMaker:
ax.text(0.008, 0.9, self.units, horizontalalignment="left", va="top", transform=ax.transAxes, size=8)
def _call_plot(self, frame, ax):
lines = ax.plot(frame.index, frame, "k")
return lines
return ax.plot(frame.index, frame, "k")
def _set_ylim(self, frame, ax):
ymin = np.floor(frame.min().min())
......@@ -148,18 +143,17 @@ class PlotMaker:
xloc = md.AutoDateLocator(minticks=5, maxticks=8, interval_multiples=True)
xfmt = md.AutoDateFormatter(xloc)
def _fmt(interval, x, pos=None):
def _fmt(interval, x, pos=None): # noqa: ARG001
x_num = md.num2date(x).replace(tzinfo=None)
delta_seconds = (x_num - start_time.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds()
num_hours = delta_seconds / 3600.0
if interval == md.HOURLY:
return f"{num_hours:.0f}"
elif interval == md.MINUTELY:
if interval == md.MINUTELY:
num_minutes = delta_seconds / 60.0
num_minutes -= int(num_hours) * 60.0
return f"{int(num_hours):02.0f}:{num_minutes:02.0f}"
else:
return x.strftime("{%Y-%m-%d}")
return x.strftime("{%Y-%m-%d}")
from functools import partial
......@@ -212,15 +206,11 @@ class PlotMaker:
return fig
def create_plot(self, frame, fig, start_time=None, end_time=None, is_subplot=None, shared_x=None, title=None):
""":param frame:
:param fig:
:param is_subplot: None or (num plots, num columns, num_rows)
:param shared_x:
:return:
"""
"""Create series of plots."""
specific_frame = frame[[x for x in frame.columns if x in self.deps]]
if frame.empty or specific_frame.empty or specific_frame.isnull().all().any():
raise ValueError(f"No valid data found or missing necessary data to make {self.name}")
if frame.empty or specific_frame.empty or specific_frame.isna().all().any():
msg = f"No valid data found or missing necessary data to make {self.name}"
raise ValueError(msg)
if start_time is None:
start_time = frame.index[0].to_pydatetime()
if end_time is None:
......@@ -263,15 +253,14 @@ class TDPlotMaker(PlotMaker):
class WindDirPlotMaker(PlotMaker):
def _set_ylim(self, frame, ax):
def _set_ylim(self, frame, ax): # noqa: ARG002
return 0, 360
def _set_yticks(self, ax, ymin, ymax, is_subplot):
def _set_yticks(self, ax, ymin, ymax, is_subplot): # noqa: ARG002
ax.yaxis.set_ticks([0, 90, 180, 270])
def _call_plot(self, frame, ax):
lines = ax.plot(frame.index, frame, "k.", markersize=3, linewidth=0)
return lines
return ax.plot(frame.index, frame, "k.", markersize=3, linewidth=0)
class MeteorogramPlotMaker(PlotMaker):
......@@ -282,8 +271,8 @@ class MeteorogramPlotMaker(PlotMaker):
super().__init__(name, dependencies, title=title)
def _convert_ax_to_thumbnail(self, fig):
if hasattr(fig, "_my_axes"):
for k, ax in fig._my_axes.items():
if hasattr(fig, "my_axes"):
for k, ax in fig.my_axes.items():
if k not in self.thumbnail_deps:
fig.delaxes(ax)
continue
......@@ -305,7 +294,8 @@ class MeteorogramPlotMaker(PlotMaker):
def create_plot(self, frame, fig, start_time=None, end_time=None, is_subplot=False, shared_x=None, title=None):
if is_subplot or shared_x:
raise ValueError("Meteorogram Plot can not be a subplot or share X-axis")
msg = "Meteorogram Plot can not be a subplot or share X-axis"
raise ValueError(msg)
if start_time is None:
start_time = frame.index[0].to_pydatetime()
......@@ -317,8 +307,8 @@ class MeteorogramPlotMaker(PlotMaker):
num_plots = len(self.plot_deps)
shared_x = None
# some hacky book keeping so we can create a thumbnail
fig._my_axes = {}
# some hacky bookkeeping so we can create a thumbnail
fig.my_axes = {}
for idx, plot_name in enumerate(self.plot_deps):
plot_maker = PLOT_TYPES.get(plot_name, PlotMaker(plot_name, (plot_name,)))
title_name = TITLES.get(plot_name, plot_name.replace("_", " ").title())
......@@ -329,14 +319,14 @@ class MeteorogramPlotMaker(PlotMaker):
shared_x=shared_x,
title=title_name,
)
fig._my_axes[plot_name] = ax
fig.my_axes[plot_name] = ax
if idx == 0:
shared_x = ax
if idx != num_plots - 1:
# Disable the x-axis ticks so we don't interfere with other subplots
kwargs = {"visible": False}
for l in ax.get_xticklabels():
l.update(kwargs)
for label in ax.get_xticklabels():
label.update(kwargs)
# make the top y-tick label invisible
ax.set_xlabel("Time (UTC)")
......@@ -392,16 +382,15 @@ def get_data(input_files, columns):
def create_plot(plot_names, frame, output, start_time=None, end_time=None, thumbnail=False):
"""Args:
plot_names:
frame:
output:
start_time:
end_time:
daily: Whether or not this plot should represent one day of data.
"""Create a series of plots.
Returns
-------
Args:
plot_names (list of str): Plot names to generate
frame (pd.DataFrame): DataFrame of data
output (str): Output pattern for the created files
start_time (datetime): Start time of the data to use
end_time (datetime): End time of the data to use
thumbnail (bool): Additionally generate a thumbnail
"""
if start_time is None:
......@@ -414,7 +403,8 @@ def create_plot(plot_names, frame, output, start_time=None, end_time=None, thumb
var_names = []
for var_name in plot_maker.deps:
if var_name not in frame:
raise ValueError(f"Missing required variable '{var_name}' for plot '{name}'")
msg = f"Missing required variable '{var_name}' for plot '{name}'"
raise ValueError(msg)
var_names.append(var_name)
# write NaNs where QC values are not 0
qc_name = "qc_" + var_name
......@@ -424,7 +414,7 @@ def create_plot(plot_names, frame, output, start_time=None, end_time=None, thumb
# create a frame that doesn't include any of the bad values
plot_frame = frame[var_names]
plot_frame = plot_frame[~plot_frame.isnull().any(axis=1)]
plot_frame = plot_frame[~plot_frame.isna().any(axis=1)]
fig = plt.figure()
try:
......@@ -438,7 +428,9 @@ def create_plot(plot_names, frame, output, start_time=None, end_time=None, thumb
fig.savefig(out_fn)
if thumbnail:
stem, ext = os.path.splitext(out_fn)
out_path = Path(out_fn)
stem = out_path.stem
ext = out_path.suffix
out_fn = f"{stem}_thumbnail{ext}"
plot_maker.convert_to_thumbnail(fig)
LOG.info(f"Saving thumbnail '{name}' to filename '{out_fn}'")
......@@ -468,7 +460,7 @@ def main():
action="count",
default=int(os.environ.get("VERBOSITY", 2)),
dest="verbosity",
help=("each occurence increases verbosity 1 level through" + " ERROR-WARNING-INFO-DEBUG (default INFO)"),
help=("each occurence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)"),
)
parser.add_argument(
"-l",
......@@ -481,14 +473,14 @@ def main():
"--start-time",
type=_dt_convert,
help="Start time of plot. If only -s is given, a plot of "
+ "only that day is created. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
"only that day is created. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
)
parser.add_argument(
"-e",
"--end-time",
type=_dt_convert,
help="End time of plot. If only -e is given, a plot of only that day is "
+ "created. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD', 'YYYYMMDD'",
"created. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD', 'YYYYMMDD'",
)
parser.add_argument("--met-plots", nargs="+", help="Override plots to use in the combined meteorogram plot")
parser.add_argument("input_files", nargs="+", help="aoss_tower_level_b1 files")
......@@ -506,19 +498,21 @@ def main():
"--daily",
action="store_true",
help="creates a plot for every day. Usually used to create plots "
+ "that will line up for aoss tower quicklooks page",
"that will line up for aoss tower quicklooks page",
)
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
setup_logging(args.log_filepath, level=levels[min(3, args.verbosity)])
if not os.path.splitext(args.output)[-1]:
if not Path(args.output).suffix:
LOG.warning("File pattern provided does not have a file extension")
# check the dependencies for the meteorogram
if args.met_plots:
assert "meteorogram" not in args.met_plots
if "meteorogram" in args.met_plots:
msg = "The 'meteorogram' plot can not be a sub-plot of itself."
raise ValueError(msg)
PLOT_TYPES["meteorogram"].deps = args.met_plots
plot_deps = [PLOT_TYPES[k].deps if k in PLOT_TYPES else (k,) for k in args.plot_names]
......@@ -536,11 +530,8 @@ def main():
end_time = args.start_time.replace(hour=23, minute=59, second=59, microsecond=999999)
frame = frame[args.start_time : end_time]
if not args.daily:
# allow plotting methods to write inplace on a copy
frames = [frame.copy()]
else:
frames = (group[1] for group in frame.groupby(frame.index.day))
# allow plotting methods to write inplace on a copy
frames = [frame.copy()] if not args.daily else (group[1] for group in frame.groupby(frame.index.day))
for frame in frames:
if args.daily:
......
#!/usr/bin/env python
import datetime
import unittest
......@@ -8,9 +7,9 @@ import pandas as pd
from aosstower.level_00.influxdb import Updater, construct_url, get_url_data
class TestCase:
def __init__(self, input, expected_avg, expected_url):
self.input = input
class TestDataCase:
def __init__(self, input_data, expected_avg, expected_url):
self.input_data = input_data
self.expected_avg = expected_avg
self.expected_url = expected_url
......@@ -53,7 +52,7 @@ def _isnan(x):
class TestInfluxdb(unittest.TestCase):
def setUp(self):
self.updater = Updater()
self.test_data = TestCase(
self.test_data = TestDataCase(
create_data(209),
[
{
......@@ -211,7 +210,7 @@ class TestInfluxdb(unittest.TestCase):
def test_updater(self):
output = []
for record in self.test_data.input:
for record in self.test_data.input_data:
avg = self.updater.rolling_average(record)
if avg is not None:
output.append({key: avg[key][-1] for key in avg})
......@@ -225,22 +224,10 @@ class TestInfluxdb(unittest.TestCase):
def test_construct_url(self):
output = []
for record in self.test_data.input:
for record in self.test_data.input_data:
avg = self.updater.rolling_average(record)
if avg is not None:
output.append(construct_url(get_url_data(avg, "", "")))
assert len(self.test_data.expected_url) >= len(output)
assert self.test_data.expected_url[len(output) - 1] == output[-1]
assert len(self.test_data.expected_url) == len(output)
def suite():
"""The test suite for influxdb."""
loader = unittest.TestLoader()
mysuite = unittest.TestSuite()
mysuite.addTest(loader.loadTestsFromTestCase(TestInfluxdb))
return mysuite
if __name__ == "__main__":
unittest.main()
#!/usr/bin/env python
"""Test basic NetCDF generation."""
import os
from datetime import datetime
import numpy as np
......@@ -13,11 +11,10 @@ def get_nc_schema_database(fields=None):
if fields is None:
fields = schema.met_vars
mini_database = {k: schema.database_dict[k] for k in fields}
return mini_database
return {k: schema.database_dict[k] for k in fields}
def test_nc_basic1(tmpdir):
def test_nc_basic1(tmp_path):
"""Test basic usage of the NetCDF generation."""
from netCDF4 import Dataset
......@@ -25,7 +22,7 @@ def test_nc_basic1(tmpdir):
from aosstower.tests.utils import get_cached_level_00
input_files = list(get_cached_level_00(num_files=2))
nc_out = tmpdir.join("test.nc")
nc_out = tmp_path / "test.nc"
create_giant_netcdf(
input_files,
str(nc_out),
......@@ -35,7 +32,7 @@ def test_nc_basic1(tmpdir):
interval_width="1T",
database=get_nc_schema_database(),
)
assert os.path.isfile(nc_out)
assert nc_out.is_file()
with Dataset(nc_out, "r") as nc:
sflux = nc["solar_flux"][:]
assert np.count_nonzero(sflux.mask) == 2
#!/usr/bin/env python
"""Utilities for running tests."""
import os
import urllib.request
from pathlib import Path
CACHE_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "test_data")
CACHE_DIR = Path(__file__).resolve().parent / "test_data"
def get_cached_level_00(num_files=2):
......@@ -14,11 +13,12 @@ def get_cached_level_00(num_files=2):
"https://metobs-test.ssec.wisc.edu/pub/cache/aoss/tower/level_00/version_00/2020/01/02/aoss_tower.2020-01-02.ascii",
)
if num_files > len(file_urls):
raise ValueError(f"Maximum of {len(file_urls)} can be loaded.")
msg = f"Maximum of {len(file_urls)} can be loaded."
raise ValueError(msg)
file_urls = file_urls[:num_files]
for u in file_urls:
fn = os.path.join(CACHE_DIR, os.path.basename(u))
if not os.path.isfile(fn):
fn = CACHE_DIR / u.rsplit("/", 1)[-1]
if not fn.is_file():
urllib.request.urlretrieve(u, fn)
yield fn
......@@ -38,6 +38,11 @@ ignore = ["D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203"
[tool.ruff.per-file-ignores]
"aosstower/tests/*" = ["S", "PLR2004"]
"aosstower/level_b1/quicklook.py" = ["PLR0913"]
"aosstower/level_b1/nc.py" = ["PLR0913"]
[tool.ruff.pydocstyle]
convention = "google"
[tool.mypy]
python_version = "3.10"
......