Skip to content
Snippets Groups Projects
Verified Commit 4163fae9 authored by David Hoese's avatar David Hoese
Browse files

Add initial proof of concept v2 query building and processing

Includes many other fixes
parent 469b8215
Branches
No related tags found
No related merge requests found
FROM python:3.11-slim as compile-image
RUN apt-get update
RUN apt-get install -y --no-install-recommends git
RUN python -m pip install -U setuptools pip
WORKDIR build_pkg
COPY pyproject.toml .
COPY requirements.txt .
COPY MANIFEST.in .
# need git history for proper version numbering
COPY .git/ .git/
RUN pip install --user -r requirements.txt
COPY metobsapi/ metobsapi/
RUN pip install --user . --no-deps
FROM python:3.11-slim as build-image
COPY --from=compile-image /root/.local /root/.local
COPY docker_run.sh .
# Make sure scripts in .local are usable:
ENV PATH=/root/.local/bin:$PATH
EXPOSE 8090
ENTRYPOINT ["bash", "./docker_run.sh"]
CMD []
recursive-include metobsapi/orderForm *
recursive-include metobsapi/static *
recursive-include metobsapi/templates *
#!/usr/bin/env bash
BIND_ADDR=${BIND_ADDR:-"0.0.0.0"}
BIND_PORT=${BIND_PORT:-"8090"}
NUM_WORKERS=${NUM_WORKERS:-"8"}
MAX_REQUESTS=${MAX_REQUESTS:-"1000"}
gunicorn --bind "${BIND_ADDR}:${BIND_PORT}" --workers "${NUM_WORKERS}" \
-n metobs_api metobsapi.server:app
......@@ -17,4 +17,5 @@ INFLUXDB_PASS = "root" # nosec B105
# If token is provided, user and password are ignored
# Security: This is expected to be overwritten either via environment variable or sub-configuration
INFLUXDB_TOKEN = "" # nosec B105
INFLUXDB_ORG = "metobs"
INFLUXDB_DB = "metobs"
......@@ -98,7 +98,7 @@ def _get_column_name_in_influxdb(site: str, inst: str, api_symbol: str) -> str:
return influx_name
def handle_influxdb_result(data_frame, influx_to_requested_symbols, interval):
def handle_influxdb_result(data_frame, influx_to_requested_symbols):
valid_requested_symbols = [symbol for symbol in influx_to_requested_symbols.values() if symbol is not None]
if data_frame is None:
# invalid query
......@@ -173,6 +173,7 @@ def handle_csv(frame, epoch, sep=",", message="", code=200, status="success", **
line_format = sep.join(["{time}", "{symbols}"])
if frame is not None and not frame.empty:
for t, row in frame.iterrows():
t = _time_column_to_epoch_str(t, epoch)
line = line_format.format(time=t, symbols=sep.join(str(x) for x in row.values))
data_lines.append(line)
......@@ -209,12 +210,10 @@ def handle_json(frame, epoch, order="columns", message="", code=200, status="suc
# replace NaNs with None
frame = frame.where(pd.notnull(frame), None)
package["timestamps"] = frame.index.values
if epoch:
newStamps = []
for stamp in package["timestamps"]:
newStamps.append(float(stamp))
package["timestamps"] = newStamps
newStamps = []
for stamp in frame.index.values:
newStamps.append(_time_column_to_epoch_str(stamp, epoch))
package["timestamps"] = newStamps
if order == "column":
package["data"] = dict(frame)
......@@ -277,7 +276,7 @@ def handle_xml(frame, epoch, sep=",", message="", code=200, status="success", **
for idx, (t, row) in enumerate(frame.iterrows()):
row_elem = doc.createElement("row")
row_elem.setAttribute("id", str(idx))
row_elem.appendChild(doc.createTextNode(str(t)))
row_elem.appendChild(doc.createTextNode(_time_column_to_epoch_str(t, epoch)))
for point in row:
row_elem.appendChild(doc.createTextNode(sep))
row_elem.appendChild(doc.createTextNode(str(point)))
......@@ -290,6 +289,24 @@ def handle_xml(frame, epoch, sep=",", message="", code=200, status="success", **
return res, code
def _time_column_to_epoch_str(time_val: datetime | np.datetime64 | pd.Timestamp | str, epoch: str) -> str:
if isinstance(time_val, str):
return time_val
if isinstance(time_val, np.datetime64):
time_val = time_val.astype("datetime64[us]").astype(datetime)
if isinstance(time_val, pd.Timestamp):
time_val = time_val.to_pydatetime()
if not epoch:
return time_val.strftime("%Y-%m-%dT%H:%M:%SZ")
# NOTE: Assumes host system is UTC
time_seconds = time_val.timestamp()
time_in_units = data_responses.seconds_to_epoch[epoch](time_seconds)
time_in_units = round(time_in_units) # round to nearest integer
return f"{time_in_units:d}"
def handle_error(fmt, error_str):
handler = RESPONSE_HANDLERS[fmt]
err_code, err_msg = data_responses.ERROR_MESSAGES.get(error_str, (400, error_str))
......@@ -319,7 +336,7 @@ def modify_data(fmt, begin, end, site, inst, symbols, interval, sep=",", order="
return handle_error(fmt, str(e))
data_frame, response_info = _query_time_series_db(begin, end, interval, influx_symbols, epoch)
data_frame = handle_influxdb_result(data_frame, influx_to_requested_symbols, interval)
data_frame = handle_influxdb_result(data_frame, influx_to_requested_symbols)
data_frame = _reorder_and_rename_result_dataframe(data_frame, fully_qualified_symbols, short_symbols)
handler = RESPONSE_HANDLERS[fmt]
return handler(data_frame, epoch, sep=sep, order=order, **response_info)
......
......@@ -76,30 +76,34 @@ def _patch_v2_query(fake_result_data: list[dict]) -> Iterator[mock.Mock]:
site_data_frames = []
for data_dict in fake_result_data:
site = data_dict["tags"]["site"]
inst = data_dict["tags"]["inst"]
data_row_dicts = []
for row_values in data_dict["values"]:
data_row = {}
for col_name, col_value in zip(data_dict["columns"], row_values):
data_row[f"{site}.{inst}.{col_name}"] = np.nan if col_value is None else col_value
data_row[col_name] = np.nan if col_value is None else col_value
data_row.update(data_dict["tags"])
data_row_dicts.append(data_row)
new_column_names = [f"{site}.{inst}.{col_name}" for col_name in data_dict["columns"]]
single_df = pd.DataFrame(data_row_dicts, columns=new_column_names)
single_df.set_index(f"{site}.{inst}.time", inplace=True)
single_df = pd.DataFrame(data_row_dicts, columns=["site", "inst"] + data_dict["columns"])
single_df.set_index("time", inplace=True)
single_df = single_df.dropna(axis=1, how="all")
site_data_frames.append(single_df)
merged_df = pd.concat(site_data_frames, axis=1, copy=False)
# merged_df = pd.concat(site_data_frames, axis=1, copy=False)
with mock.patch("metobsapi.util.query_influx._query_influxdbv2") as query_func:
query_func.return_value = merged_df
query_func.return_value = site_data_frames
yield query_func
def _fake_data_to_v1_resultset(fake_data: list[dict]) -> QueryResult:
return [ResultSet({"series": [single_result], "statement_id": 0}) for single_result in fake_data]
t_format = "%Y-%m-%dT%H:%M:%SZ"
result_sets = []
for single_result in fake_data:
for value_row in single_result["values"]:
# convert datetime object to isoformat for default epoch format
value_row[0] = value_row[0].strftime(t_format)
result_sets.append(single_result)
return [ResultSet({"series": [single_result], "statement_id": 0}) for single_result in result_sets]
@pytest.fixture
......@@ -127,14 +131,13 @@ def influxdb_wind_fields_9_values(mock_influxdb_query) -> Iterable[None]:
def _fake_data(interval: str, symbols: dict[tuple[str, str], list[str]], num_vals: int) -> list[dict[str, Any]]:
now = datetime(2017, 3, 5, 19, 0, 0)
t_format = "%Y-%m-%dT%H:%M:%SZ"
measurement_name = "metobs_" + interval
series = []
for (site, inst), columns in symbols.items():
tags = {"site": site, "inst": inst}
vals: list[list[str | float | None]] = []
vals: list[list[datetime | float | None]] = []
for i in range(num_vals):
vals.append([(now + timedelta(minutes=i)).strftime(t_format)] + _generate_fake_column_values(columns[1:]))
vals.append([(now + timedelta(minutes=i))] + _generate_fake_column_values(columns[1:]))
# make up some Nones/nulls (but not all the time)
r_idx = int(random.random() * len(columns) * 3)
# don't include time (index 0)
......
......@@ -78,6 +78,14 @@ epoch_translation = {
"ns": "nanoseconds",
}
epoch_keys = epoch_translation.keys()
seconds_to_epoch = {
"h": lambda seconds: seconds / (60.0 * 60.0),
"m": lambda seconds: seconds / 60.0,
"s": lambda seconds: seconds,
"ms": lambda seconds: seconds * 1000.0,
"u": lambda seconds: seconds * 1.0e6,
"ns": lambda seconds: seconds * 1.0e9,
}
INTERVALS = {
"1m": 60,
......
"""Helpers for querying an InfluxDB backend."""
from datetime import timedelta
from datetime import datetime, timedelta
from typing import TypeAlias
import numpy as np
......@@ -81,11 +81,23 @@ class QueryHandler:
return frame
def _convert_v2_result_to_api_dataframe(self, result: QueryResult) -> pd.DataFrame:
# TODO
return result
def _build_queries_v1(symbols, begin, end, value):
frames_to_concat = []
for (site, inst), symbol_names in self._symbols.items():
frames_for_inst = [df for df in result if df["site"][0] == site and df["inst"][0] == inst]
if not frames_for_inst:
data_frame = pd.DataFrame(columns=["_time"] + symbol_names)
data_frame = data_frame.set_index("_time")
else:
data_frame = frames_for_inst[0]
data_frame = data_frame.drop(columns=["site", "inst"])
# "_time" should already be set as the index so we don't need to rename it
new_column_names = [f"{site}.{inst}.{col_name}" for col_name in data_frame.columns]
data_frame.columns = new_column_names
frames_to_concat.append(data_frame)
return pd.concat(frames_to_concat, axis=1, copy=False)
def _build_queries_v1(symbols, begin, end, interval):
begin = parse_dt_v1(begin)
end = parse_dt_v1(end)
......@@ -100,7 +112,7 @@ def _build_queries_v1(symbols, begin, end, value):
wc.append("inst='{}'".format(inst))
query = QUERY_FORMAT.format(
symbol_list=", ".join(symbol_names),
interval=value,
interval=interval,
where_clause=" AND ".join(wc),
)
queries.append(query)
......@@ -117,6 +129,15 @@ def parse_dt_v1(d):
return d.strftime("'%Y-%m-%dT%H:%M:%SZ'")
def parse_dt_v2(d):
if d is None:
return "now()"
elif isinstance(d, timedelta):
return "-{:d}s".format(int(d.total_seconds()))
else:
return d.strftime("'%Y-%m-%dT%H:%M:%SZ'")
def _query_influxdbv1(query_str, epoch) -> QueryResult:
kwargs = {
"host": current_app.config["INFLUXDB_HOST"],
......@@ -140,14 +161,55 @@ def _query_influxdbv1(query_str, epoch) -> QueryResult:
return res
def _build_queries_v2(symbols, begin, end, value):
return None
def _build_queries_v2(
symbols: dict[tuple[str, str], list[str]],
begin: None | timedelta | datetime,
end: None | timedelta | datetime,
interval: str,
) -> str:
query_format = """
from(bucket:"metobs/forever")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "metobs_{interval}" and r["site"] == "{site}" and r["inst"] == "{inst}")
|> filter(fn: (r) => contains(value: r["_field"], set: ["_time", "site", "inst", {symbols_csv}]))
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "site", "inst", {symbols_csv}])
|> yield(name: "{site}-{inst}")
"""
range_start = parse_dt_v2(begin)
range_stop = parse_dt_v2(end)
queries = []
for (site, inst), symbol_names in symbols.items():
symbol_csv_str = ", ".join(f'"{sname}"' for sname in symbol_names)
this_query = query_format.format(
interval=interval,
symbols_csv=symbol_csv_str,
site=site,
inst=inst,
start=range_start,
stop=range_stop,
)
queries.append(this_query)
query_str = "".join(queries)
return query_str
def _query_influxdbv2(query_str, epoch) -> QueryResult:
# TODO: Pivot
# TODO: Keep
# TODO: Manually drop _result and _table (underscores?) columns
# TODO: Rename columns to site.inst.column
# TODO: Be careful with wind direction
return []
client_kwargs = {
"url": "http://{}:{}".format(current_app.config["INFLUXDB_HOST"], current_app.config["INFLUXDB_PORT"]),
"org": current_app.config["INFLUXDB_ORG"],
}
client_kwargs["token"] = current_app.config[
"INFLUXDB_TOKEN"
] # "HSHUoHr1MHIIvQrh7Wq-MrWorg1r5FqQegiuUKjGVDtf355eynUo8mNdTtpc2eG7oOw5p6xjlh-isRDQUuXWyA=="
print(client_kwargs)
with InfluxDBClientv2(**client_kwargs) as client:
query_api = client.query_api()
data_frame = query_api.query_data_frame(query_str, data_frame_index="_time")
if len(data_frame) == 0:
return []
data_frame = data_frame.drop(columns=["result", "table"])
# TODO: Shouldn't the query result in a list?
# What are the column names if we have multiple queries (tower and buoy)?
return [data_frame]
......@@ -50,6 +50,7 @@ classifiers = [
]
requires-python = ">=3.10"
dependencies = [
"numpy",
"flask",
"influxdb-client",
"pandas",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment