From 740b5cb0916530977fb7184db59634d776e85e41 Mon Sep 17 00:00:00 2001
From: David Hoese <david.hoese@ssec.wisc.edu>
Date: Fri, 10 Mar 2023 15:51:23 -0600
Subject: [PATCH] Update and fix influxdb inserts, tasks, and manual averages

---
 README.rst               |  93 ++++++++++++++
 metobscommon/influxdb.py | 266 +++++++++++++++++----------------------
 2 files changed, 211 insertions(+), 148 deletions(-)

diff --git a/README.rst b/README.rst
index 06d2ac1..5253768 100644
--- a/README.rst
+++ b/README.rst
@@ -21,6 +21,99 @@ rain01 and rain02 servers.
     these should use a python with the mentioned packages installed and use
     the full path to that environment's python executable.
 
+Initialize InfluxDB v2 Database
+-------------------------------
+
+This should only be needed once when setting up a server with a fresh InfluxDB
+database.
+
+Prerequisites
+^^^^^^^^^^^^^
+
+Follow the installation instructions for InfluxDB v2.x:
+
+https://docs.influxdata.com/influxdb
+
+For operational use we currently install InfluxDB as a systemd service:
+
+https://docs.influxdata.com/influxdb/v2.6/install/?t=Linux#install-influxdb-as-a-service-with-systemd
+
+We'll also need the influx CLI to initalize the database.
+
+https://docs.influxdata.com/influxdb/v2.6/reference/cli/influx/
+
+Setup database
+^^^^^^^^^^^^^^
+
+These commands should be run as the metobs user (I think) so configuration is
+stored there.
+
+.. code-block::
+
+   influx setup -u metobs -p PASSWORD -o metobs -b metobs_raw --retention 0 -n metobs_operator -f
+
+Where PASSWORD should be replaced by the password in the
+``/home/metobs/influxdb_operator_password.txt`` file. This creates a user
+"metobs" and an organization named "metobs". It creates a initial bucket
+called "metobs_raw". The retention period for this bucket is set to 0 (infinite) initially
+but will be changed later once data has been added and aggregated to "realtime" buckets.
+This configuration is saved under the profile "metobs_operator". It is named this because this is
+an operator account that has permissions to all buckets on all organizations. It should not be
+used for normal usage. Lastly, the ``-f`` stops you from being prompted for confirmation.
+
+Since no token was specified in this command InfluxDB will generate one for us
+and store it in ``~/.influxdbv2/configs``.
+
+.. note::
+
+   The password file should be read-only by the "metobs" user on the server.
+
+Create realtime bucket
+^^^^^^^^^^^^^^^^^^^^^^
+
+.. code-block::
+
+   influx bucket create -o metobs -n metobs_realtime -d "Aggregated data for realtime displays" -r 0
+
+This way we have a "metobs_raw" bucket for all full resolution data and a "metobs_realtime" for all
+aggregated/averaged data. List the buckets to get the bucket IDs to be used when creating users.
+
+.. code-block::
+
+    influx bucket ls
+
+Create operational tokens
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+A read-only token for the metobs_raw and metobs_realtime buckets:
+
+.. code-block::
+
+   influx auth create -o metobs --read-bucket abcd --read-bucket efgh -d "Read-only access to metobs buckets"
+
+This creates a token (printed to the terminal) in the
+"metobs" organization that has read access to the buckets with IDs "abcd" and
+"efgh". Replace these IDs with the IDs for the raw and realtime buckets from
+the previous ``influx bucket ls`` command.
+
+A read-write token for ingest purposes:
+
+.. code-block::
+
+    influx auth create -o metobs --read-bucket abcd --read-bucket efgh --write-bucket abcd --write-bucket efgh --read-tasks --write-tasks -d "Read-write access to metobs buckets"
+
+Make sure to note the tokens from these commands. They will not be readable anywhere else.
+
+Store the read-only token in ``/home/metobs/influxdbv2_metobs_ro.txt`` and the
+read-write in ``/home/metobs/influxdbv2_metobs_rw.txt``.
+
+Change the permissions for these two files to read-only for the metobs user:
+
+.. code-block::
+
+   chmod 400 /home/metobs/influxdbv2_metobs_ro.txt
+   chmod 400 /home/metobs/influxdbv2_metobs_rw.txt
+
 Backfill InfluxDB Database
 --------------------------
 
diff --git a/metobscommon/influxdb.py b/metobscommon/influxdb.py
index 4ea4915..e046d9a 100644
--- a/metobscommon/influxdb.py
+++ b/metobscommon/influxdb.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-# encoding: utf8
 """Utilities for working with an InfluxDB database for realtime data.
 
 There should only be one database named 'metobs'
@@ -10,10 +8,14 @@ see https://github.com/influxdata/influxdb/issues/6932.
 See the README for more information about how to use this and other scripts.
 
 """
+from __future__ import annotations
 
+import argparse
 import os
 import sys
 import logging
+from typing import Any
+
 import requests
 import importlib
 import math
@@ -21,21 +23,20 @@ from itertools import zip_longest
 from datetime import datetime
 from metobscommon.util.calc import wind_vector_components
 
+from influxdb_client import InfluxDBClient, TaskCreateRequest
+from influxdb_client.client.write_api import SYNCHRONOUS
+
 LOG = logging.getLogger(__name__)
-DB_NAME = 'metobs'
 DB_HOST = 'localhost'
 DB_PORT = 8086
+ORG_NAME = "metobs"
+REALTIME_BUCKET_NAME = "metobs_realtime"
+RAW_BUCKET_NAME = "metobs_raw"
 # name of the single measurement that all data is written to
 MEASUREMENT_NAME = "metobs"
 # don't include time when writing data, used for testing CQs and RPs
 DEBUG_TIME = bool(os.environ.get("DEBUG_INFLUX_TIME", False))
 
-FAKE_CQ_TEMPLATE = "SELECT {ops} INTO {dbname}.{rp}.metobs_{duration} " \
-                   "FROM {dbname}.one_year.metobs WHERE " \
-                   "time >= '{start_time}' and time < '{end_time}' and " \
-                   "{site_conditions} " \
-                   "GROUP BY time({duration}), inst, site"
-
 
 def _mktime(frame, timestamp="stamp"):
     return int((frame[timestamp] - datetime(1970, 1, 1)).total_seconds() * 10**9)
@@ -81,41 +82,11 @@ def frame_records(frames,
                 measurement_name, fields=fields_str, timestamp=nanos, tags=tags_str)
 
 
-def insert(lines, host=DB_HOST, port=DB_PORT, dbname=DB_NAME, **kwargs):
-    resp = requests.post(
-        'http://{host}:{port:d}/write?db={dbname}'.format(host=host, port=port, dbname=dbname),
-        data='\n'.join(lines))
-    resp.raise_for_status()
-
-
-def _query(query_string, host=DB_HOST, port=DB_PORT, dbname=DB_NAME):
-    resp = requests.get(
-        'http://{host}:{port:d}/query?db={dbname}&q={query}'.format(host=host,
-                                                                    port=port,
-                                                                    dbname=dbname,
-                                                                    query=query_string)
-    )
-    resp.raise_for_status()
-
-
-def create_database(args):
-    LOG.info("Creating database '%s'...", args.dbname)
-    query_str = "CREATE DATABASE {dbname}".format(dbname=args.dbname)
-    _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
-    # LOG.info("Setting six week default retention policy...")
-    # query_str = "CREATE RETENTION POLICY six_week ON {dbname} DURATION 6w REPLICATION 1 DEFAULT".format(dbname=args.dbname)
-    # _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
-    LOG.info("Setting one year retention policy...")
-    query_str = "CREATE RETENTION POLICY one_year ON {dbname} DURATION 52w REPLICATION 1 DEFAULT".format(dbname=args.dbname)
-    _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
-    # LOG.info("Setting five year retention policy...")
-    # query_str = "CREATE RETENTION POLICY five_year ON {dbname} DURATION 5y REPLICATION 1".format(dbname=args.dbname)
-    # _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
-    LOG.info("Setting forever retention policy...")
-    query_str = "CREATE RETENTION POLICY forever ON {dbname} DURATION INF REPLICATION 1".format(dbname=args.dbname)
-    _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
-
-    create_continuous_queries(args)
+def insert(lines: list[str], influxdb_conn_params: dict[str, str]):
+    """Insert InfluxDB 'line protocol' lines into an InfluxDB v2 database."""
+    with InfluxDBClient(**influxdb_conn_params) as client:
+        write_api = client.write_api(write_options=SYNCHRONOUS)
+        write_api.write(bucket=RAW_BUCKET_NAME, record=lines)
 
 
 DURATIONS = ['1m', '5m', '1h']
@@ -146,90 +117,92 @@ def _get_symbols_list(args):
     return symbols_list
 
 
-def create_continuous_queries(args):
-    """Create continuous queries to create averaged fields.
-
-    Due to limitations of CQs in InfluxDB CQs can not be specified with
-    an execution offset (ex. run CQ 30s after the specified interval). To
-    get around this we resample the CQ every 2 intervals. So for a 1m
-    interval we run the query to cover 2m and take the average for each 1m
-    interval. This results in points being calculated twice and the most
-    recent point being modified (new data or updated averages).
+def create_tasks(args):
+    """Create recurring tasks that create averaged versions of the raw data.
 
-    Any real time applications using the API and not requesting the whole
-    data interval (only getting the newest point) should either be able to
-    update the newest point or should ignore the newest point until there
-    is a point after it (assuming data came in in a semi-regular manner).
-
-    https://github.com/influxdata/influxdb/issues/6932
-    https://docs.influxdata.com/influxdb/v1.5/query_language/continuous_queries/#advanced-syntax
+    The task as configured (see raw string variable below) assumes realtime
+    data never arrives later than 1 minute after the observation time. This
+    task/tasks only apply to newly arriving data and will not be applied to
+    backfilled data. The aggregation/averaging function must be called
+    separately.
 
     """
-    symbols_list = _get_symbols_list(args)
-
-    for idx, symbols in enumerate(symbols_list):
-        if "wind_dir" in symbols:
-            symbols.extend(["wind_east", "wind_north"])
-            # symbols.remove("wind_speed")
-            symbols.remove("wind_dir")
+    influxdb_conn_params = convert_influxdb_args_to_kwargs(args)
+    with InfluxDBClient(**influxdb_conn_params) as client:
+        tasks_api = client.tasks_api()
         for duration in DURATIONS:
-            factor = 3 if duration == '1m' else 2
-            # see docstring above, 3m for 1m, 10m for 5m, etc
-            resample_for = "{:d}{}".format(int(duration[:-1]) * factor, duration[-1])
-
-            LOG.info("Adding continuous query for '%s' averaged data", duration)
-            operation_str = ",".join(["mean({field}) as {field}".format(field=field) for field in symbols])
-            cq_name = "cq_{idx}_{duration}".format(idx=idx, duration=duration)
-            if hasattr(args, "drop_previous") and args.drop_previous:
-                LOG.info("Dropping previous CQ '%s'", cq_name)
-                query_str = "DROP CONTINUOUS QUERY {cq_name} ON {dbname}".format(cq_name=cq_name, dbname=args.dbname)
-                _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
-            rp = DURATION_TO_RP.get(duration, 'one_year')
-            query_str = "CREATE CONTINUOUS QUERY {cq_name} ON {dbname} " \
-                "RESAMPLE EVERY {duration} FOR {resample_for} " \
-                "BEGIN SELECT {ops} INTO " \
-                "{dbname}.{rp}.metobs_{duration} FROM " \
-                "{dbname}.one_hour.metobs " \
-                "GROUP BY time({duration}), inst, site END".format(
-                    dbname=args.dbname, duration=duration, ops=operation_str,
-                    rp=rp, cq_name=cq_name, resample_for=resample_for)
-            _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
+            LOG.info(f"Adding task for {duration!r} averaged data")
+            task_for_duration = AGGREGATE_FLUX_TASK_FORMAT.format(
+                interval=duration,
+                raw_bucket=RAW_BUCKET_NAME,
+                realtime_bucket=REALTIME_BUCKET_NAME,
+                org=ORG_NAME,
+            )
+            task_request = TaskCreateRequest(
+                flux=task_for_duration,
+                org=ORG_NAME,
+                description=f"Aggregate raw data into {duration} averages",
+                status="active",
+            )
+            tasks_api.create_task(task_create_request=task_request)
+
+
+# Assume that data coming from realtime sources are no more than 1 minute late
+# XXX: Do I need to get 2x the interal of data?
+# XXX: Do I need to do anything about per-site/inst pair or per-symbol?
+AGGREGATE_FLUX_TASK_FORMAT = """
+option task = {{
+    name: "Average data ({interval})",
+    every: {interval},
+    offset: 1m,
+}}
+
+from(bucket: "{raw_bucket}")
+    |> range(start: -(task.every * 2))
+    |> filter(fn: (r) => r["_measurement"] == "metobs")
+    |> aggregateWindow(every: {interval}, fn: mean)
+    |> set(key: "_measurement", value: "metobs_{interval}")
+    |> to(org: "{org}", bucket: "{realtime_bucket}", tagColumns: ["site", "inst"])
+"""
+
+MANUAL_AGGREGATE_QUERY = """
+from(bucket: "{raw_bucket}")
+    |> range(start: {start}, stop: {stop})
+    |> filter(fn: (r) => r["_measurement"] == "metobs")
+    |> filter(fn: (r) => r["site"] == "{site}" and r["inst"] == "{inst}")
+    |> aggregateWindow(every: {interval}, fn: mean)
+    |> set(key: "_measurement", value: "metobs_{interval}")
+    |> to(bucket: "{realtime_bucket}", tagColumns: ["site", "inst"])
+"""
 
 
 def run_average_queries(args):
     stations = [x.split('.') for x in args.stations]
-    assert(all(len(x) == 2 for x in stations))
-
-    site_cond = ["(site='{site}' AND inst='{inst}')".format(site=site, inst=inst)
-                 for site, inst in stations]
-    site_cond = " OR ".join(site_cond)
-
-    symbols_list = _get_symbols_list(args)
-    # flatten symbols list to get all of them together
-    symbols = sorted(set([x for y in symbols_list for x in y]))
-
-    if "wind_dir" in symbols:
-        symbols.extend(["wind_east", "wind_north"])
-        # symbols.remove("wind_speed")
-        symbols.remove("wind_dir")
-
-    start_time = args.start_time.strftime("%Y-%m-%d %H:%M:%S")
-    end_time = args.end_time.strftime("%Y-%m-%d %H:%M:%S")
-    operation_str = ",".join(["mean({field}) as {field}".format(field=field) for field in symbols])
-    for duration in args.durations:
-        LOG.info("Running average query for '%s' averaged data", duration)
-        rp = DURATION_TO_RP.get(duration, 'one_year')
-        query_str = FAKE_CQ_TEMPLATE.format(
-            dbname=args.dbname, duration=duration, rp=rp,
-            start_time=start_time, end_time=end_time,
-            site_conditions=site_cond, ops=operation_str,
-        )
-        if args.debug:
-            print(query_str)
-            continue
-        else:
-            LOG.debug(query_str)
-        _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
+    if not (all(len(x) == 2 for x in stations)):
+        raise ValueError("Stations must be specified as 2-element pairs separated by a '.'.")
+
+    start_time = args.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+    end_time = args.end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+    influxdb_conn_params = convert_influxdb_args_to_kwargs(args)
+    with InfluxDBClient(**influxdb_conn_params) as client:
+        query_api = client.query_api()
+        for site, inst in stations:
+            for duration in args.durations:
+                LOG.info(f"Running average query for {duration!r} averaged data")
+                aggregate_query = MANUAL_AGGREGATE_QUERY.format(
+                    raw_bucket=RAW_BUCKET_NAME,
+                    start=start_time,
+                    stop=end_time,
+                    site=site,
+                    inst=inst,
+                    interval=duration,
+                    realtime_bucket=REALTIME_BUCKET_NAME,
+                )
+                if args.debug:
+                    print(aggregate_query)
+                    continue
+                LOG.debug(aggregate_query)
+                query_api.query(aggregate_query)
 
 
 def _dt_convert(datetime_str):
@@ -238,35 +211,16 @@ def _dt_convert(datetime_str):
 
 
 def main():
-    import argparse
     parser = argparse.ArgumentParser(description=__doc__)
-    parser.add_argument("--host", default=DB_HOST,
-                        help="Hostname of database connection")
-    parser.add_argument("--port", default=DB_PORT, type=int,
-                        help="Port of database connection")
-    parser.add_argument("--dbname", default=DB_NAME,
-                        help="Name of database to modify")
     parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
                         help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
+    add_influxdb_command_line_arguments(parser)
 
     subparsers = parser.add_subparsers()
-    subparser = subparsers.add_parser("create", help="Create the initial database and CQs")
-    subparser.add_argument("--symbol-list", nargs="+",
-                           help="Lists of importable paths for the symbols to use in continuous queries")
-    subparser.add_argument("--symbols", nargs="+",
-                           help="List of symbols to include in the continuous queries (alternative to --symbol-list)")
-    subparser.set_defaults(func=create_database)
-
-    subparser = subparsers.add_parser("create_cqs", help="Create the continuous queries")
-    subparser.add_argument("--symbol-list", nargs="+",
-                           help="Lists of importable paths for the symbols to use in continuous queries")
-    subparser.add_argument("--symbols", nargs="+",
-                           help="List of symbols to include in the continuous queries (alternative to --symbol-list)")
-    subparser.add_argument("--drop-previous", action="store_true",
-                           help="Attempt to drop previous CQs with the same names")
-    subparser.set_defaults(func=create_continuous_queries)
-
-    subparser = subparsers.add_parser("run_manual_cqs", help="Manually run averaging CQs")
+    subparser = subparsers.add_parser("create_tasks", help="Create the continuous queries")
+    subparser.set_defaults(func=create_tasks)
+
+    subparser = subparsers.add_parser("run_manual_average", help="Manually run averaging queries")
     subparser.add_argument('--debug', action='store_true',
                            help='Don\'t actually send the commands, print them to stdout')
     subparser.add_argument('--stations', nargs="+",
@@ -282,10 +236,6 @@ def main():
     subparser.add_argument('-d', '--durations', nargs='+', default=['1m'],
                            choices=DURATIONS,
                            help="Which average intervals to compute")
-    subparser.add_argument("--symbol-list", nargs="+",
-                           help="Lists of importable paths for the symbols to use in continuous queries")
-    subparser.add_argument("--symbols", nargs="+",
-                           help="List of symbols to include in the continuous queries (alternative to --symbol-list)")
     subparser.set_defaults(func=run_average_queries)
 
     args = parser.parse_args()
@@ -297,5 +247,25 @@ def main():
     return args.func(args)
 
 
+def add_influxdb_command_line_arguments(parser: argparse.ArgumentParser) -> None:
+    parser.add_argument("--influxdb-host", default=DB_HOST,
+                        help="Hostname of database connection")
+    parser.add_argument("--influxdb-port", default=DB_PORT, type=int,
+                        help="Port of database connection")
+    parser.add_argument("--influxdb-org", default="metobs",
+                        help="InfluxDB organization to do work under")
+    parser.add_argument("--influxdb-token", required=True,
+                        help="Authentication token for InfluxDB connections."
+                             "This should have write permissions to all metobs buckets.")
+
+
+def convert_influxdb_args_to_kwargs(args: argparse.Namespace) -> dict[str, Any]:
+    return {
+        "url": f"http://{args.influxdb_host}:{args.influxdb_port}",
+        "org": args.influxdb_org,
+        "token": args.influxdb_token,
+    }
+
+
 if __name__ == "__main__":
     sys.exit(main())
-- 
GitLab