Skip to content
Snippets Groups Projects
Verified Commit c6632db2 authored by David Hoese's avatar David Hoese
Browse files

Add initial tests for wind direction handling with Influx v2

parent 8cc7dae2
No related branches found
No related tags found
No related merge requests found
...@@ -55,9 +55,9 @@ def handle_time_string(date_string): ...@@ -55,9 +55,9 @@ def handle_time_string(date_string):
return handle_date(date_string) return handle_date(date_string)
def handle_symbols(symbols): def handle_symbols(symbols: list[str]):
influx_symbols: dict[tuple[str, str], list[str]] = {} influx_symbols: dict[tuple[str, str], list[str]] = {}
influx_to_requested_symbols = {} influx_to_requested_symbols: dict[str, str | None] = {}
handled_symbols = set() handled_symbols = set()
add_winds = set() add_winds = set()
...@@ -72,48 +72,59 @@ def handle_symbols(symbols): ...@@ -72,48 +72,59 @@ def handle_symbols(symbols):
except ValueError: except ValueError:
raise ValueError("Symbols must have 3 period-separated parts: {}".format(symbol)) raise ValueError("Symbols must have 3 period-separated parts: {}".format(symbol))
api_to_influx_rename_map = data_responses.SYMBOL_TRANSLATIONS.get((site, inst)) influx_name = _get_column_name_in_influxdb(site, inst, s)
if not api_to_influx_rename_map: # this assignment must happen before we continue the loop after
raise ValueError("Unknown site/instrument: {},{}".format(site, inst)) # detecting 'wind_direction', otherwise we don't know we need to readd it
influx_name = api_to_influx_rename_map.get(s) influx_to_requested_symbols[f"{site}.{inst}.{influx_name}"] = symbol
if s == "wind_direction": if s == "wind_direction":
add_winds.add((site, inst)) add_winds.add((site, inst))
elif influx_name is None: continue
raise ValueError("Unknown symbol: {}".format(symbol)) influx_symbols.setdefault((site, inst), []).append(influx_name)
influx_name = influx_name or s
# NOTE: if wind speed/dir is specified InfluxDB should provide it with
# all fill values, so we can fill it in later
influx_to_requested_symbols[f"{si[0]}.{si[1]}.{influx_name}"] = symbol
influx_symbols.setdefault(si, []).append(influx_name)
# Add the symbols needed to compute the wind_speed and wind_direction # Add the symbols needed to compute the wind_speed and wind_direction
for si in add_winds: for si in add_winds:
influx_to_requested_symbols[f"{si[0]}.{si[1]}.wind_east"] = None influx_to_requested_symbols[f"{site}.{inst}.wind_east"] = None
influx_to_requested_symbols[f"{si[0]}.{si[1]}.wind_north"] = None influx_to_requested_symbols[f"{site}.{inst}.wind_north"] = None
influx_symbols[si].extend(("wind_east", "wind_north")) influx_symbols[si].extend(("wind_east", "wind_north"))
return influx_to_requested_symbols, influx_symbols return influx_to_requested_symbols, influx_symbols
def _get_column_name_in_influxdb(site: str, inst: str, api_symbol: str) -> str:
api_to_influx_rename_map = data_responses.SYMBOL_TRANSLATIONS.get((site, inst))
if not api_to_influx_rename_map:
raise ValueError("Unknown site/instrument: {},{}".format(site, inst))
influx_name = api_to_influx_rename_map.get(api_symbol)
if api_symbol != "wind_direction" and influx_name is None:
raise ValueError(f"Unknown symbol: {site}.{inst}.{api_symbol}")
influx_name = influx_name or api_symbol
return influx_name
def handle_influxdb_result(data_frame, influx_to_requested_symbols, interval): def handle_influxdb_result(data_frame, influx_to_requested_symbols, interval):
valid_requested_symbols = [symbol for symbol in influx_to_requested_symbols.values() if symbol is not None] valid_requested_symbols = [symbol for symbol in influx_to_requested_symbols.values() if symbol is not None]
if data_frame is None: if data_frame is None:
# invalid query # invalid query
columns = ["time"] + valid_requested_symbols return _create_empty_dataframe_for_columns(valid_requested_symbols)
data_frame = pd.DataFrame(columns=columns)
data_frame.set_index("time", inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
return data_frame
data_frame = _convert_wind_vectors_to_direction_if_needed(data_frame) data_frame = _convert_wind_vectors_to_direction_if_needed(data_frame)
data_frame = data_frame.round({s: ROUNDING.get(s, 1) for s in data_frame.columns}) data_frame = data_frame.round({s: ROUNDING.get(s, 1) for s in data_frame.columns})
# rename columns to API names requested by the user # rename columns to API names requested by the user
# could be "<site>.<inst>.name" or just "name" # User could request "<site>.<inst>.name" or just "name"
data_frame.columns = valid_requested_symbols data_frame.columns = valid_requested_symbols
return data_frame return data_frame
def _create_empty_dataframe_for_columns(columns: list[str]) -> pd.DataFrame:
columns = ["time"] + columns
data_frame = pd.DataFrame(columns=columns)
data_frame.set_index("time", inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
return data_frame
def _convert_wind_vectors_to_direction_if_needed(data_frame: pd.DataFrame) -> pd.DataFrame: def _convert_wind_vectors_to_direction_if_needed(data_frame: pd.DataFrame) -> pd.DataFrame:
for weast_col_name, wnorth_col_name, wdir_col_name in _wind_vector_column_groups(data_frame.columns): for weast_col_name, wnorth_col_name, wdir_col_name in _wind_vector_column_groups(data_frame.columns):
data_frame[wdir_col_name] = np.rad2deg(np.arctan2(data_frame[weast_col_name], data_frame[wnorth_col_name])) data_frame[wdir_col_name] = np.rad2deg(np.arctan2(data_frame[weast_col_name], data_frame[wnorth_col_name]))
...@@ -132,8 +143,6 @@ def _wind_vector_column_groups(columns: list[str]) -> Iterable[tuple[str, str, s ...@@ -132,8 +143,6 @@ def _wind_vector_column_groups(columns: list[str]) -> Iterable[tuple[str, str, s
wind_direction_column = f"{site}.{inst}.wind_direction" wind_direction_column = f"{site}.{inst}.wind_direction"
if wind_east_column not in columns: if wind_east_column not in columns:
continue continue
if wind_direction_column not in columns:
continue
yield wind_east_column, wind_north_column, wind_direction_column yield wind_east_column, wind_north_column, wind_direction_column
...@@ -141,10 +150,6 @@ def _all_wind_north_columns(columns: list[str]) -> list[str]: ...@@ -141,10 +150,6 @@ def _all_wind_north_columns(columns: list[str]) -> list[str]:
return [col_name for col_name in columns if "wind_north" in col_name] return [col_name for col_name in columns if "wind_north" in col_name]
def _convert_wind_vectors_to_direction(frame: pd.DataFrame) -> pd.DataFrame:
pass
def calc_num_records(begin, end, interval): def calc_num_records(begin, end, interval):
now = datetime.utcnow() now = datetime.utcnow()
if begin is None: if begin is None:
...@@ -319,14 +324,14 @@ def modify_data(fmt, begin, end, site, inst, symbols, interval, sep=",", order=" ...@@ -319,14 +324,14 @@ def modify_data(fmt, begin, end, site, inst, symbols, interval, sep=",", order="
try: try:
begin, end = _convert_begin_and_end(begin, end) begin, end = _convert_begin_and_end(begin, end)
_check_query_parameters(order, epoch, symbols, interval) _check_query_parameters(order, epoch, symbols, interval)
short_symbols, symbols = _parse_symbol_names(site, inst, symbols) short_symbols, fully_qualified_symbols = _parse_symbol_names(site, inst, symbols)
influx_to_requested_symbols, influx_symbols = handle_symbols(symbols) influx_to_requested_symbols, influx_symbols = handle_symbols(fully_qualified_symbols)
except ValueError as e: except ValueError as e:
return handle_error(fmt, str(e)) return handle_error(fmt, str(e))
data_frame, response_info = _query_time_series_db(begin, end, interval, influx_symbols, epoch) data_frame, response_info = _query_time_series_db(begin, end, interval, influx_symbols, epoch)
data_frame = handle_influxdb_result(data_frame, influx_to_requested_symbols, interval) data_frame = handle_influxdb_result(data_frame, influx_to_requested_symbols, interval)
data_frame = _reorder_and_rename_result_dataframe(data_frame, symbols, short_symbols) data_frame = _reorder_and_rename_result_dataframe(data_frame, fully_qualified_symbols, short_symbols)
handler = RESPONSE_HANDLERS[fmt] handler = RESPONSE_HANDLERS[fmt]
return handler(data_frame, epoch, sep=sep, order=order, **response_info) return handler(data_frame, epoch, sep=sep, order=order, **response_info)
...@@ -352,18 +357,18 @@ def _check_query_parameters(order, epoch, symbols, interval): ...@@ -352,18 +357,18 @@ def _check_query_parameters(order, epoch, symbols, interval):
raise ValueError("bad_interval") raise ValueError("bad_interval")
def _parse_symbol_names(site, inst, symbols): def _parse_symbol_names(site: str, inst: str, symbols: str) -> tuple[list[str] | None, list[str]]:
if site and inst: if site and inst:
# shorthand for symbols that all use the same site and inst # shorthand for symbols that all use the same site and inst
short_symbols = symbols.split(":") short_symbols = symbols.split(":")
symbols = ["{}.{}.{}".format(site, inst, s) for s in short_symbols] fully_qualified_symbols = ["{}.{}.{}".format(site, inst, s) for s in short_symbols]
elif not site and not inst: elif not site and not inst:
# each symbol is fully qualified with site.inst.symbol # each symbol is fully qualified with site.inst.symbol
short_symbols = None short_symbols = None
symbols = symbols.split(":") fully_qualified_symbols = symbols.split(":")
else: else:
raise ValueError("missing_site_inst") raise ValueError("missing_site_inst")
return short_symbols, symbols return short_symbols, fully_qualified_symbols
def _query_time_series_db(begin, end, interval, influx_symbols, epoch): def _query_time_series_db(begin, end, interval, influx_symbols, epoch):
......
...@@ -2,27 +2,106 @@ import contextlib ...@@ -2,27 +2,106 @@ import contextlib
import json import json
import random import random
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Callable, ContextManager, Iterable, Iterator from typing import Any, Callable, ContextManager, Iterable, Iterator
from unittest import mock from unittest import mock
import numpy as np import numpy as np
import pandas as pd
import pytest import pytest
from influxdb.resultset import ResultSet
try:
import influxdb_client
except ImportError:
influxdb_client = None
try:
from influxdb.resultset import ResultSet
except ImportError:
ResultSet = None
from metobsapi.util.query_influx import QueryResult from metobsapi.util.query_influx import QueryResult
@pytest.fixture(params=["1", "2"], ids=["influxdbv1", "influxdbv2"])
def influxdb_version(request):
return request.param
@pytest.fixture @pytest.fixture
def mock_influxdb_query() -> Callable[[QueryResult], ContextManager[mock.Mock]]: def mock_influxdb_query(influxdb_version) -> Callable[[list[dict]], ContextManager[mock.Mock]]:
@contextlib.contextmanager @contextlib.contextmanager
def _mock_influxdb_query_with_fake_data(fake_result_data: QueryResult) -> Iterator[mock.Mock]: def _mock_influxdb_query_with_fake_data(fake_result_data: list[dict]) -> Iterator[mock.Mock]:
with mock.patch("metobsapi.util.query_influx._query_influxdbv1") as query_func: with _mock_influxdb_library_availability(influxdb_version == "1", influxdb_version == "2"):
query_func.return_value = fake_result_data if influxdb_version == "1":
yield query_func with _patch_v1_query(fake_result_data) as query_func:
yield query_func
elif influxdb_version == "2":
with _patch_v2_query(fake_result_data) as query_func:
yield query_func
return _mock_influxdb_query_with_fake_data return _mock_influxdb_query_with_fake_data
@contextlib.contextmanager
def _mock_influxdb_library_availability(v1_available: bool, v2_available: bool) -> Iterator[None]:
with _mock_if(not v1_available, "metobsapi.util.query_influx.InfluxDBClientv1"), _mock_if(
not v2_available, "metobsapi.util.query_influx.InfluxDBClientv2"
):
yield None
@contextlib.contextmanager
def _mock_if(condition: bool, obj_path: str) -> Iterator[None]:
if not condition:
yield None
return
with mock.patch(obj_path, None):
yield None
@contextlib.contextmanager
def _patch_v1_query(fake_result_data: list[dict]) -> Iterator[mock.Mock]:
if ResultSet is None:
pytest.skip("Version 1 'influxdb' dependency missing. Skipping test.")
with mock.patch("metobsapi.util.query_influx._query_influxdbv1") as query_func:
fake_data_for_v1 = _fake_data_to_v1_resultset(fake_result_data)
query_func.return_value = fake_data_for_v1
yield query_func
@contextlib.contextmanager
def _patch_v2_query(fake_result_data: list[dict]) -> Iterator[mock.Mock]:
if influxdb_client is None:
pytest.skip("Version 2 'influxdb-client' dependency missing. Skipping test.")
site_data_frames = []
for data_dict in fake_result_data:
site = data_dict["tags"]["site"]
inst = data_dict["tags"]["inst"]
data_row_dicts = []
for row_values in data_dict["values"]:
data_row = {}
for col_name, col_value in zip(data_dict["columns"], row_values):
data_row[f"{site}.{inst}.{col_name}"] = np.nan if col_value is None else col_value
data_row.update(data_dict["tags"])
data_row_dicts.append(data_row)
new_column_names = [f"{site}.{inst}.{col_name}" for col_name in data_dict["columns"]]
single_df = pd.DataFrame(data_row_dicts, columns=new_column_names)
single_df.set_index(f"{site}.{inst}.time", inplace=True)
single_df = single_df.dropna(axis=1, how="all")
site_data_frames.append(single_df)
merged_df = pd.concat(site_data_frames, axis=1, copy=False)
with mock.patch("metobsapi.util.query_influx._query_influxdbv2") as query_func:
query_func.return_value = merged_df
yield query_func
def _fake_data_to_v1_resultset(fake_data: list[dict]) -> QueryResult:
return [ResultSet({"series": [single_result], "statement_id": 0}) for single_result in fake_data]
@pytest.fixture @pytest.fixture
def influxdb_air_temp_9_values(mock_influxdb_query) -> Iterable[None]: def influxdb_air_temp_9_values(mock_influxdb_query) -> Iterable[None]:
fake_result = _fake_data("1m", {("aoss", "tower"): ["time", "air_temp"]}, 9) fake_result = _fake_data("1m", {("aoss", "tower"): ["time", "air_temp"]}, 9)
...@@ -46,7 +125,7 @@ def influxdb_wind_fields_9_values(mock_influxdb_query) -> Iterable[None]: ...@@ -46,7 +125,7 @@ def influxdb_wind_fields_9_values(mock_influxdb_query) -> Iterable[None]:
yield yield
def _fake_data(interval: str, symbols: dict, num_vals: int, single_result: bool = False) -> QueryResult: def _fake_data(interval: str, symbols: dict[tuple[str, str], list[str]], num_vals: int) -> list[dict[str, Any]]:
now = datetime(2017, 3, 5, 19, 0, 0) now = datetime(2017, 3, 5, 19, 0, 0)
t_format = "%Y-%m-%dT%H:%M:%SZ" t_format = "%Y-%m-%dT%H:%M:%SZ"
measurement_name = "metobs_" + interval measurement_name = "metobs_" + interval
...@@ -67,26 +146,9 @@ def _fake_data(interval: str, symbols: dict, num_vals: int, single_result: bool ...@@ -67,26 +146,9 @@ def _fake_data(interval: str, symbols: dict, num_vals: int, single_result: bool
"tags": tags, "tags": tags,
"values": vals, "values": vals,
} }
if single_result: series.append(s)
series.append(s)
else:
series.append(
ResultSet(
{
"series": [s],
"statement_id": 0,
}
)
)
if single_result: return series
ret = {
"series": series,
"statement_id": 0,
}
return ResultSet(ret)
else:
return series
def _generate_fake_column_values(columns: list[str]) -> list[float | None]: def _generate_fake_column_values(columns: list[str]) -> list[float | None]:
...@@ -98,10 +160,12 @@ def _generate_fake_column_values(columns: list[str]) -> list[float | None]: ...@@ -98,10 +160,12 @@ def _generate_fake_column_values(columns: list[str]) -> list[float | None]:
@pytest.fixture @pytest.fixture
def app(): def app(influxdb_version):
from metobsapi import app as real_app from metobsapi import app as real_app
real_app.config.update({"TESTING": True, "DEBUG": True}) real_app.config.update({"TESTING": True, "DEBUG": True})
if influxdb_version == "2":
real_app.config.update({"INFLUXDB_TOKEN": "1234"})
yield real_app yield real_app
......
...@@ -22,62 +22,7 @@ except ImportError: ...@@ -22,62 +22,7 @@ except ImportError:
QUERY_FORMAT = "SELECT {symbol_list} FROM metobs.forever.metobs_{interval} WHERE {where_clause} GROUP BY site,inst" QUERY_FORMAT = "SELECT {symbol_list} FROM metobs.forever.metobs_{interval} WHERE {where_clause} GROUP BY site,inst"
QueryResult: TypeAlias = list[ResultSet] QueryResult: TypeAlias = list
def parse_dt_v1(d):
if d is None:
return "now()"
elif isinstance(d, timedelta):
return "now() - {:d}s".format(int(d.total_seconds()))
else:
return d.strftime("'%Y-%m-%dT%H:%M:%SZ'")
def build_queries_v1(symbols, begin, end, value):
begin = parse_dt_v1(begin)
end = parse_dt_v1(end)
where_clause = []
where_clause.append("time >= {}".format(begin))
where_clause.append("time <= {}".format(end))
queries = []
for (site, inst), symbol_names in symbols.items():
wc = where_clause.copy()
wc.append("site='{}'".format(site))
wc.append("inst='{}'".format(inst))
query = QUERY_FORMAT.format(
symbol_list=", ".join(symbol_names),
interval=value,
where_clause=" AND ".join(wc),
)
queries.append(query)
queries = "; ".join(queries)
return queries
def _query_influxdbv1(query_str, epoch) -> QueryResult:
kwargs = {
"host": current_app.config["INFLUXDB_HOST"],
"port": current_app.config["INFLUXDB_PORT"],
"database": current_app.config["INFLUXDB_DB"],
}
if current_app.config["INFLUXDB_TOKEN"]:
# version 2.x, ignore user/pass, use token
kwargs["username"] = None
kwargs["password"] = None
kwargs["headers"] = {
"Authorization": "Token {}".format(current_app.config["INFLUXDB_TOKEN"]),
}
client = InfluxDBClientv1(
**kwargs,
)
res = client.query(query_str, epoch=epoch)
if not isinstance(res, list):
res = [res]
return res
def query(symbols, begin, end, interval, epoch): def query(symbols, begin, end, interval, epoch):
...@@ -109,14 +54,16 @@ class QueryHandler: ...@@ -109,14 +54,16 @@ class QueryHandler:
"Missing 'influxdb-client' and legacy 'influxdb' dependencies to communicate " "with InfluxDB database" "Missing 'influxdb-client' and legacy 'influxdb' dependencies to communicate " "with InfluxDB database"
) )
if True or InfluxDBClientv2 is None: if InfluxDBClientv2 is None:
# fallback to v1 library # fallback to v1 library
query_str = build_queries_v1(self._symbols, self._begin, self._end, self._interval) query_str = _build_queries_v1(self._symbols, self._begin, self._end, self._interval)
result = _query_influxdbv1(query_str, epoch) result = _query_influxdbv1(query_str, epoch)
return self._convert_v1_result_to_dataframe(result) return self._convert_v1_result_to_dataframe(result)
# version 2 library # version 2 library
raise NotImplementedError() query_str = _build_queries_v2(self._symbols, self._begin, self._end, self._interval)
result = _query_influxdbv2(query_str, epoch)
return self._convert_v2_result_to_api_dataframe(result)
def _convert_v1_result_to_dataframe(self, result: QueryResult) -> pd.DataFrame: def _convert_v1_result_to_dataframe(self, result: QueryResult) -> pd.DataFrame:
frames = [] frames = []
...@@ -132,3 +79,75 @@ class QueryHandler: ...@@ -132,3 +79,75 @@ class QueryHandler:
frames.append(frame) frames.append(frame)
frame = pd.concat(frames, axis=1, copy=False) frame = pd.concat(frames, axis=1, copy=False)
return frame return frame
def _convert_v2_result_to_api_dataframe(self, result: QueryResult) -> pd.DataFrame:
# TODO
return result
def _build_queries_v1(symbols, begin, end, value):
begin = parse_dt_v1(begin)
end = parse_dt_v1(end)
where_clause = []
where_clause.append("time >= {}".format(begin))
where_clause.append("time <= {}".format(end))
queries = []
for (site, inst), symbol_names in symbols.items():
wc = where_clause.copy()
wc.append("site='{}'".format(site))
wc.append("inst='{}'".format(inst))
query = QUERY_FORMAT.format(
symbol_list=", ".join(symbol_names),
interval=value,
where_clause=" AND ".join(wc),
)
queries.append(query)
queries = "; ".join(queries)
return queries
def parse_dt_v1(d):
if d is None:
return "now()"
elif isinstance(d, timedelta):
return "now() - {:d}s".format(int(d.total_seconds()))
else:
return d.strftime("'%Y-%m-%dT%H:%M:%SZ'")
def _query_influxdbv1(query_str, epoch) -> QueryResult:
kwargs = {
"host": current_app.config["INFLUXDB_HOST"],
"port": current_app.config["INFLUXDB_PORT"],
"database": current_app.config["INFLUXDB_DB"],
}
if current_app.config["INFLUXDB_TOKEN"]:
# version 2.x, ignore user/pass, use token
kwargs["username"] = None
kwargs["password"] = None
kwargs["headers"] = {
"Authorization": "Token {}".format(current_app.config["INFLUXDB_TOKEN"]),
}
client = InfluxDBClientv1(
**kwargs,
)
res = client.query(query_str, epoch=epoch)
if not isinstance(res, list):
res = [res]
return res
def _build_queries_v2(symbols, begin, end, value):
return None
def _query_influxdbv2(query_str, epoch) -> QueryResult:
# TODO: Pivot
# TODO: Keep
# TODO: Manually drop _result and _table (underscores?) columns
# TODO: Rename columns to site.inst.column
# TODO: Be careful with wind direction
return []
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment