diff --git a/metobsapi/data_api.py b/metobsapi/data_api.py index 71c4296831c05e9b3dfe304bc4b007a29a6f02e2..fea067743233edcb24bcd6c728987763e2b13e06 100644 --- a/metobsapi/data_api.py +++ b/metobsapi/data_api.py @@ -55,9 +55,9 @@ def handle_time_string(date_string): return handle_date(date_string) -def handle_symbols(symbols): +def handle_symbols(symbols: list[str]): influx_symbols: dict[tuple[str, str], list[str]] = {} - influx_to_requested_symbols = {} + influx_to_requested_symbols: dict[str, str | None] = {} handled_symbols = set() add_winds = set() @@ -72,48 +72,59 @@ def handle_symbols(symbols): except ValueError: raise ValueError("Symbols must have 3 period-separated parts: {}".format(symbol)) - api_to_influx_rename_map = data_responses.SYMBOL_TRANSLATIONS.get((site, inst)) - if not api_to_influx_rename_map: - raise ValueError("Unknown site/instrument: {},{}".format(site, inst)) - influx_name = api_to_influx_rename_map.get(s) + influx_name = _get_column_name_in_influxdb(site, inst, s) + # this assignment must happen before we continue the loop after + # detecting 'wind_direction', otherwise we don't know we need to readd it + influx_to_requested_symbols[f"{site}.{inst}.{influx_name}"] = symbol + if s == "wind_direction": add_winds.add((site, inst)) - elif influx_name is None: - raise ValueError("Unknown symbol: {}".format(symbol)) - influx_name = influx_name or s - - # NOTE: if wind speed/dir is specified InfluxDB should provide it with - # all fill values, so we can fill it in later - influx_to_requested_symbols[f"{si[0]}.{si[1]}.{influx_name}"] = symbol - influx_symbols.setdefault(si, []).append(influx_name) + continue + influx_symbols.setdefault((site, inst), []).append(influx_name) # Add the symbols needed to compute the wind_speed and wind_direction for si in add_winds: - influx_to_requested_symbols[f"{si[0]}.{si[1]}.wind_east"] = None - influx_to_requested_symbols[f"{si[0]}.{si[1]}.wind_north"] = None + influx_to_requested_symbols[f"{site}.{inst}.wind_east"] = None + influx_to_requested_symbols[f"{site}.{inst}.wind_north"] = None influx_symbols[si].extend(("wind_east", "wind_north")) return influx_to_requested_symbols, influx_symbols +def _get_column_name_in_influxdb(site: str, inst: str, api_symbol: str) -> str: + api_to_influx_rename_map = data_responses.SYMBOL_TRANSLATIONS.get((site, inst)) + if not api_to_influx_rename_map: + raise ValueError("Unknown site/instrument: {},{}".format(site, inst)) + influx_name = api_to_influx_rename_map.get(api_symbol) + + if api_symbol != "wind_direction" and influx_name is None: + raise ValueError(f"Unknown symbol: {site}.{inst}.{api_symbol}") + influx_name = influx_name or api_symbol + return influx_name + + def handle_influxdb_result(data_frame, influx_to_requested_symbols, interval): valid_requested_symbols = [symbol for symbol in influx_to_requested_symbols.values() if symbol is not None] if data_frame is None: # invalid query - columns = ["time"] + valid_requested_symbols - data_frame = pd.DataFrame(columns=columns) - data_frame.set_index("time", inplace=True) - data_frame.fillna(value=np.nan, inplace=True) - return data_frame + return _create_empty_dataframe_for_columns(valid_requested_symbols) data_frame = _convert_wind_vectors_to_direction_if_needed(data_frame) data_frame = data_frame.round({s: ROUNDING.get(s, 1) for s in data_frame.columns}) # rename columns to API names requested by the user - # could be "<site>.<inst>.name" or just "name" + # User could request "<site>.<inst>.name" or just "name" data_frame.columns = valid_requested_symbols return data_frame +def _create_empty_dataframe_for_columns(columns: list[str]) -> pd.DataFrame: + columns = ["time"] + columns + data_frame = pd.DataFrame(columns=columns) + data_frame.set_index("time", inplace=True) + data_frame.fillna(value=np.nan, inplace=True) + return data_frame + + def _convert_wind_vectors_to_direction_if_needed(data_frame: pd.DataFrame) -> pd.DataFrame: for weast_col_name, wnorth_col_name, wdir_col_name in _wind_vector_column_groups(data_frame.columns): data_frame[wdir_col_name] = np.rad2deg(np.arctan2(data_frame[weast_col_name], data_frame[wnorth_col_name])) @@ -132,8 +143,6 @@ def _wind_vector_column_groups(columns: list[str]) -> Iterable[tuple[str, str, s wind_direction_column = f"{site}.{inst}.wind_direction" if wind_east_column not in columns: continue - if wind_direction_column not in columns: - continue yield wind_east_column, wind_north_column, wind_direction_column @@ -141,10 +150,6 @@ def _all_wind_north_columns(columns: list[str]) -> list[str]: return [col_name for col_name in columns if "wind_north" in col_name] -def _convert_wind_vectors_to_direction(frame: pd.DataFrame) -> pd.DataFrame: - pass - - def calc_num_records(begin, end, interval): now = datetime.utcnow() if begin is None: @@ -319,14 +324,14 @@ def modify_data(fmt, begin, end, site, inst, symbols, interval, sep=",", order=" try: begin, end = _convert_begin_and_end(begin, end) _check_query_parameters(order, epoch, symbols, interval) - short_symbols, symbols = _parse_symbol_names(site, inst, symbols) - influx_to_requested_symbols, influx_symbols = handle_symbols(symbols) + short_symbols, fully_qualified_symbols = _parse_symbol_names(site, inst, symbols) + influx_to_requested_symbols, influx_symbols = handle_symbols(fully_qualified_symbols) except ValueError as e: return handle_error(fmt, str(e)) data_frame, response_info = _query_time_series_db(begin, end, interval, influx_symbols, epoch) data_frame = handle_influxdb_result(data_frame, influx_to_requested_symbols, interval) - data_frame = _reorder_and_rename_result_dataframe(data_frame, symbols, short_symbols) + data_frame = _reorder_and_rename_result_dataframe(data_frame, fully_qualified_symbols, short_symbols) handler = RESPONSE_HANDLERS[fmt] return handler(data_frame, epoch, sep=sep, order=order, **response_info) @@ -352,18 +357,18 @@ def _check_query_parameters(order, epoch, symbols, interval): raise ValueError("bad_interval") -def _parse_symbol_names(site, inst, symbols): +def _parse_symbol_names(site: str, inst: str, symbols: str) -> tuple[list[str] | None, list[str]]: if site and inst: # shorthand for symbols that all use the same site and inst short_symbols = symbols.split(":") - symbols = ["{}.{}.{}".format(site, inst, s) for s in short_symbols] + fully_qualified_symbols = ["{}.{}.{}".format(site, inst, s) for s in short_symbols] elif not site and not inst: # each symbol is fully qualified with site.inst.symbol short_symbols = None - symbols = symbols.split(":") + fully_qualified_symbols = symbols.split(":") else: raise ValueError("missing_site_inst") - return short_symbols, symbols + return short_symbols, fully_qualified_symbols def _query_time_series_db(begin, end, interval, influx_symbols, epoch): diff --git a/metobsapi/tests/test_data_api.py b/metobsapi/tests/test_data_api.py index 9b1d19fcd3e0781e982299216e4515d3e4480f18..7e6f2ef22bb2bd6ad4e4271be0b85cf6f67bee2b 100644 --- a/metobsapi/tests/test_data_api.py +++ b/metobsapi/tests/test_data_api.py @@ -2,27 +2,106 @@ import contextlib import json import random from datetime import datetime, timedelta -from typing import Callable, ContextManager, Iterable, Iterator +from typing import Any, Callable, ContextManager, Iterable, Iterator from unittest import mock import numpy as np +import pandas as pd import pytest -from influxdb.resultset import ResultSet + +try: + import influxdb_client +except ImportError: + influxdb_client = None + +try: + from influxdb.resultset import ResultSet +except ImportError: + ResultSet = None from metobsapi.util.query_influx import QueryResult +@pytest.fixture(params=["1", "2"], ids=["influxdbv1", "influxdbv2"]) +def influxdb_version(request): + return request.param + + @pytest.fixture -def mock_influxdb_query() -> Callable[[QueryResult], ContextManager[mock.Mock]]: +def mock_influxdb_query(influxdb_version) -> Callable[[list[dict]], ContextManager[mock.Mock]]: @contextlib.contextmanager - def _mock_influxdb_query_with_fake_data(fake_result_data: QueryResult) -> Iterator[mock.Mock]: - with mock.patch("metobsapi.util.query_influx._query_influxdbv1") as query_func: - query_func.return_value = fake_result_data - yield query_func + def _mock_influxdb_query_with_fake_data(fake_result_data: list[dict]) -> Iterator[mock.Mock]: + with _mock_influxdb_library_availability(influxdb_version == "1", influxdb_version == "2"): + if influxdb_version == "1": + with _patch_v1_query(fake_result_data) as query_func: + yield query_func + elif influxdb_version == "2": + with _patch_v2_query(fake_result_data) as query_func: + yield query_func return _mock_influxdb_query_with_fake_data +@contextlib.contextmanager +def _mock_influxdb_library_availability(v1_available: bool, v2_available: bool) -> Iterator[None]: + with _mock_if(not v1_available, "metobsapi.util.query_influx.InfluxDBClientv1"), _mock_if( + not v2_available, "metobsapi.util.query_influx.InfluxDBClientv2" + ): + yield None + + +@contextlib.contextmanager +def _mock_if(condition: bool, obj_path: str) -> Iterator[None]: + if not condition: + yield None + return + with mock.patch(obj_path, None): + yield None + + +@contextlib.contextmanager +def _patch_v1_query(fake_result_data: list[dict]) -> Iterator[mock.Mock]: + if ResultSet is None: + pytest.skip("Version 1 'influxdb' dependency missing. Skipping test.") + with mock.patch("metobsapi.util.query_influx._query_influxdbv1") as query_func: + fake_data_for_v1 = _fake_data_to_v1_resultset(fake_result_data) + query_func.return_value = fake_data_for_v1 + yield query_func + + +@contextlib.contextmanager +def _patch_v2_query(fake_result_data: list[dict]) -> Iterator[mock.Mock]: + if influxdb_client is None: + pytest.skip("Version 2 'influxdb-client' dependency missing. Skipping test.") + + site_data_frames = [] + for data_dict in fake_result_data: + site = data_dict["tags"]["site"] + inst = data_dict["tags"]["inst"] + data_row_dicts = [] + for row_values in data_dict["values"]: + data_row = {} + for col_name, col_value in zip(data_dict["columns"], row_values): + data_row[f"{site}.{inst}.{col_name}"] = np.nan if col_value is None else col_value + data_row.update(data_dict["tags"]) + data_row_dicts.append(data_row) + + new_column_names = [f"{site}.{inst}.{col_name}" for col_name in data_dict["columns"]] + single_df = pd.DataFrame(data_row_dicts, columns=new_column_names) + single_df.set_index(f"{site}.{inst}.time", inplace=True) + single_df = single_df.dropna(axis=1, how="all") + site_data_frames.append(single_df) + + merged_df = pd.concat(site_data_frames, axis=1, copy=False) + with mock.patch("metobsapi.util.query_influx._query_influxdbv2") as query_func: + query_func.return_value = merged_df + yield query_func + + +def _fake_data_to_v1_resultset(fake_data: list[dict]) -> QueryResult: + return [ResultSet({"series": [single_result], "statement_id": 0}) for single_result in fake_data] + + @pytest.fixture def influxdb_air_temp_9_values(mock_influxdb_query) -> Iterable[None]: fake_result = _fake_data("1m", {("aoss", "tower"): ["time", "air_temp"]}, 9) @@ -46,7 +125,7 @@ def influxdb_wind_fields_9_values(mock_influxdb_query) -> Iterable[None]: yield -def _fake_data(interval: str, symbols: dict, num_vals: int, single_result: bool = False) -> QueryResult: +def _fake_data(interval: str, symbols: dict[tuple[str, str], list[str]], num_vals: int) -> list[dict[str, Any]]: now = datetime(2017, 3, 5, 19, 0, 0) t_format = "%Y-%m-%dT%H:%M:%SZ" measurement_name = "metobs_" + interval @@ -67,26 +146,9 @@ def _fake_data(interval: str, symbols: dict, num_vals: int, single_result: bool "tags": tags, "values": vals, } - if single_result: - series.append(s) - else: - series.append( - ResultSet( - { - "series": [s], - "statement_id": 0, - } - ) - ) + series.append(s) - if single_result: - ret = { - "series": series, - "statement_id": 0, - } - return ResultSet(ret) - else: - return series + return series def _generate_fake_column_values(columns: list[str]) -> list[float | None]: @@ -98,10 +160,12 @@ def _generate_fake_column_values(columns: list[str]) -> list[float | None]: @pytest.fixture -def app(): +def app(influxdb_version): from metobsapi import app as real_app real_app.config.update({"TESTING": True, "DEBUG": True}) + if influxdb_version == "2": + real_app.config.update({"INFLUXDB_TOKEN": "1234"}) yield real_app diff --git a/metobsapi/util/query_influx.py b/metobsapi/util/query_influx.py index f1a3d69733af230c4ff6a79a95885dcd640a0d44..5d5b6848f2955daf492293bbd5ef2868ef694dcf 100644 --- a/metobsapi/util/query_influx.py +++ b/metobsapi/util/query_influx.py @@ -22,62 +22,7 @@ except ImportError: QUERY_FORMAT = "SELECT {symbol_list} FROM metobs.forever.metobs_{interval} WHERE {where_clause} GROUP BY site,inst" -QueryResult: TypeAlias = list[ResultSet] - - -def parse_dt_v1(d): - if d is None: - return "now()" - elif isinstance(d, timedelta): - return "now() - {:d}s".format(int(d.total_seconds())) - else: - return d.strftime("'%Y-%m-%dT%H:%M:%SZ'") - - -def build_queries_v1(symbols, begin, end, value): - begin = parse_dt_v1(begin) - end = parse_dt_v1(end) - - where_clause = [] - where_clause.append("time >= {}".format(begin)) - where_clause.append("time <= {}".format(end)) - - queries = [] - for (site, inst), symbol_names in symbols.items(): - wc = where_clause.copy() - wc.append("site='{}'".format(site)) - wc.append("inst='{}'".format(inst)) - query = QUERY_FORMAT.format( - symbol_list=", ".join(symbol_names), - interval=value, - where_clause=" AND ".join(wc), - ) - queries.append(query) - queries = "; ".join(queries) - return queries - - -def _query_influxdbv1(query_str, epoch) -> QueryResult: - kwargs = { - "host": current_app.config["INFLUXDB_HOST"], - "port": current_app.config["INFLUXDB_PORT"], - "database": current_app.config["INFLUXDB_DB"], - } - if current_app.config["INFLUXDB_TOKEN"]: - # version 2.x, ignore user/pass, use token - kwargs["username"] = None - kwargs["password"] = None - kwargs["headers"] = { - "Authorization": "Token {}".format(current_app.config["INFLUXDB_TOKEN"]), - } - - client = InfluxDBClientv1( - **kwargs, - ) - res = client.query(query_str, epoch=epoch) - if not isinstance(res, list): - res = [res] - return res +QueryResult: TypeAlias = list def query(symbols, begin, end, interval, epoch): @@ -109,14 +54,16 @@ class QueryHandler: "Missing 'influxdb-client' and legacy 'influxdb' dependencies to communicate " "with InfluxDB database" ) - if True or InfluxDBClientv2 is None: + if InfluxDBClientv2 is None: # fallback to v1 library - query_str = build_queries_v1(self._symbols, self._begin, self._end, self._interval) + query_str = _build_queries_v1(self._symbols, self._begin, self._end, self._interval) result = _query_influxdbv1(query_str, epoch) return self._convert_v1_result_to_dataframe(result) # version 2 library - raise NotImplementedError() + query_str = _build_queries_v2(self._symbols, self._begin, self._end, self._interval) + result = _query_influxdbv2(query_str, epoch) + return self._convert_v2_result_to_api_dataframe(result) def _convert_v1_result_to_dataframe(self, result: QueryResult) -> pd.DataFrame: frames = [] @@ -132,3 +79,75 @@ class QueryHandler: frames.append(frame) frame = pd.concat(frames, axis=1, copy=False) return frame + + def _convert_v2_result_to_api_dataframe(self, result: QueryResult) -> pd.DataFrame: + # TODO + return result + + +def _build_queries_v1(symbols, begin, end, value): + begin = parse_dt_v1(begin) + end = parse_dt_v1(end) + + where_clause = [] + where_clause.append("time >= {}".format(begin)) + where_clause.append("time <= {}".format(end)) + + queries = [] + for (site, inst), symbol_names in symbols.items(): + wc = where_clause.copy() + wc.append("site='{}'".format(site)) + wc.append("inst='{}'".format(inst)) + query = QUERY_FORMAT.format( + symbol_list=", ".join(symbol_names), + interval=value, + where_clause=" AND ".join(wc), + ) + queries.append(query) + queries = "; ".join(queries) + return queries + + +def parse_dt_v1(d): + if d is None: + return "now()" + elif isinstance(d, timedelta): + return "now() - {:d}s".format(int(d.total_seconds())) + else: + return d.strftime("'%Y-%m-%dT%H:%M:%SZ'") + + +def _query_influxdbv1(query_str, epoch) -> QueryResult: + kwargs = { + "host": current_app.config["INFLUXDB_HOST"], + "port": current_app.config["INFLUXDB_PORT"], + "database": current_app.config["INFLUXDB_DB"], + } + if current_app.config["INFLUXDB_TOKEN"]: + # version 2.x, ignore user/pass, use token + kwargs["username"] = None + kwargs["password"] = None + kwargs["headers"] = { + "Authorization": "Token {}".format(current_app.config["INFLUXDB_TOKEN"]), + } + + client = InfluxDBClientv1( + **kwargs, + ) + res = client.query(query_str, epoch=epoch) + if not isinstance(res, list): + res = [res] + return res + + +def _build_queries_v2(symbols, begin, end, value): + return None + + +def _query_influxdbv2(query_str, epoch) -> QueryResult: + # TODO: Pivot + # TODO: Keep + # TODO: Manually drop _result and _table (underscores?) columns + # TODO: Rename columns to site.inst.column + # TODO: Be careful with wind direction + return []