Skip to content
Snippets Groups Projects
Verified Commit 740b5cb0 authored by David Hoese's avatar David Hoese
Browse files

Update and fix influxdb inserts, tasks, and manual averages

parent c749cc63
Branches
No related tags found
No related merge requests found
......@@ -21,6 +21,99 @@ rain01 and rain02 servers.
these should use a python with the mentioned packages installed and use
the full path to that environment's python executable.
Initialize InfluxDB v2 Database
-------------------------------
This should only be needed once when setting up a server with a fresh InfluxDB
database.
Prerequisites
^^^^^^^^^^^^^
Follow the installation instructions for InfluxDB v2.x:
https://docs.influxdata.com/influxdb
For operational use we currently install InfluxDB as a systemd service:
https://docs.influxdata.com/influxdb/v2.6/install/?t=Linux#install-influxdb-as-a-service-with-systemd
We'll also need the influx CLI to initalize the database.
https://docs.influxdata.com/influxdb/v2.6/reference/cli/influx/
Setup database
^^^^^^^^^^^^^^
These commands should be run as the metobs user (I think) so configuration is
stored there.
.. code-block::
influx setup -u metobs -p PASSWORD -o metobs -b metobs_raw --retention 0 -n metobs_operator -f
Where PASSWORD should be replaced by the password in the
``/home/metobs/influxdb_operator_password.txt`` file. This creates a user
"metobs" and an organization named "metobs". It creates a initial bucket
called "metobs_raw". The retention period for this bucket is set to 0 (infinite) initially
but will be changed later once data has been added and aggregated to "realtime" buckets.
This configuration is saved under the profile "metobs_operator". It is named this because this is
an operator account that has permissions to all buckets on all organizations. It should not be
used for normal usage. Lastly, the ``-f`` stops you from being prompted for confirmation.
Since no token was specified in this command InfluxDB will generate one for us
and store it in ``~/.influxdbv2/configs``.
.. note::
The password file should be read-only by the "metobs" user on the server.
Create realtime bucket
^^^^^^^^^^^^^^^^^^^^^^
.. code-block::
influx bucket create -o metobs -n metobs_realtime -d "Aggregated data for realtime displays" -r 0
This way we have a "metobs_raw" bucket for all full resolution data and a "metobs_realtime" for all
aggregated/averaged data. List the buckets to get the bucket IDs to be used when creating users.
.. code-block::
influx bucket ls
Create operational tokens
^^^^^^^^^^^^^^^^^^^^^^^^^
A read-only token for the metobs_raw and metobs_realtime buckets:
.. code-block::
influx auth create -o metobs --read-bucket abcd --read-bucket efgh -d "Read-only access to metobs buckets"
This creates a token (printed to the terminal) in the
"metobs" organization that has read access to the buckets with IDs "abcd" and
"efgh". Replace these IDs with the IDs for the raw and realtime buckets from
the previous ``influx bucket ls`` command.
A read-write token for ingest purposes:
.. code-block::
influx auth create -o metobs --read-bucket abcd --read-bucket efgh --write-bucket abcd --write-bucket efgh --read-tasks --write-tasks -d "Read-write access to metobs buckets"
Make sure to note the tokens from these commands. They will not be readable anywhere else.
Store the read-only token in ``/home/metobs/influxdbv2_metobs_ro.txt`` and the
read-write in ``/home/metobs/influxdbv2_metobs_rw.txt``.
Change the permissions for these two files to read-only for the metobs user:
.. code-block::
chmod 400 /home/metobs/influxdbv2_metobs_ro.txt
chmod 400 /home/metobs/influxdbv2_metobs_rw.txt
Backfill InfluxDB Database
--------------------------
......
#!/usr/bin/env python
# encoding: utf8
"""Utilities for working with an InfluxDB database for realtime data.
There should only be one database named 'metobs'
......@@ -10,10 +8,14 @@ see https://github.com/influxdata/influxdb/issues/6932.
See the README for more information about how to use this and other scripts.
"""
from __future__ import annotations
import argparse
import os
import sys
import logging
from typing import Any
import requests
import importlib
import math
......@@ -21,21 +23,20 @@ from itertools import zip_longest
from datetime import datetime
from metobscommon.util.calc import wind_vector_components
from influxdb_client import InfluxDBClient, TaskCreateRequest
from influxdb_client.client.write_api import SYNCHRONOUS
LOG = logging.getLogger(__name__)
DB_NAME = 'metobs'
DB_HOST = 'localhost'
DB_PORT = 8086
ORG_NAME = "metobs"
REALTIME_BUCKET_NAME = "metobs_realtime"
RAW_BUCKET_NAME = "metobs_raw"
# name of the single measurement that all data is written to
MEASUREMENT_NAME = "metobs"
# don't include time when writing data, used for testing CQs and RPs
DEBUG_TIME = bool(os.environ.get("DEBUG_INFLUX_TIME", False))
FAKE_CQ_TEMPLATE = "SELECT {ops} INTO {dbname}.{rp}.metobs_{duration} " \
"FROM {dbname}.one_year.metobs WHERE " \
"time >= '{start_time}' and time < '{end_time}' and " \
"{site_conditions} " \
"GROUP BY time({duration}), inst, site"
def _mktime(frame, timestamp="stamp"):
return int((frame[timestamp] - datetime(1970, 1, 1)).total_seconds() * 10**9)
......@@ -81,41 +82,11 @@ def frame_records(frames,
measurement_name, fields=fields_str, timestamp=nanos, tags=tags_str)
def insert(lines, host=DB_HOST, port=DB_PORT, dbname=DB_NAME, **kwargs):
resp = requests.post(
'http://{host}:{port:d}/write?db={dbname}'.format(host=host, port=port, dbname=dbname),
data='\n'.join(lines))
resp.raise_for_status()
def _query(query_string, host=DB_HOST, port=DB_PORT, dbname=DB_NAME):
resp = requests.get(
'http://{host}:{port:d}/query?db={dbname}&q={query}'.format(host=host,
port=port,
dbname=dbname,
query=query_string)
)
resp.raise_for_status()
def create_database(args):
LOG.info("Creating database '%s'...", args.dbname)
query_str = "CREATE DATABASE {dbname}".format(dbname=args.dbname)
_query(query_str, host=args.host, port=args.port, dbname=args.dbname)
# LOG.info("Setting six week default retention policy...")
# query_str = "CREATE RETENTION POLICY six_week ON {dbname} DURATION 6w REPLICATION 1 DEFAULT".format(dbname=args.dbname)
# _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
LOG.info("Setting one year retention policy...")
query_str = "CREATE RETENTION POLICY one_year ON {dbname} DURATION 52w REPLICATION 1 DEFAULT".format(dbname=args.dbname)
_query(query_str, host=args.host, port=args.port, dbname=args.dbname)
# LOG.info("Setting five year retention policy...")
# query_str = "CREATE RETENTION POLICY five_year ON {dbname} DURATION 5y REPLICATION 1".format(dbname=args.dbname)
# _query(query_str, host=args.host, port=args.port, dbname=args.dbname)
LOG.info("Setting forever retention policy...")
query_str = "CREATE RETENTION POLICY forever ON {dbname} DURATION INF REPLICATION 1".format(dbname=args.dbname)
_query(query_str, host=args.host, port=args.port, dbname=args.dbname)
create_continuous_queries(args)
def insert(lines: list[str], influxdb_conn_params: dict[str, str]):
"""Insert InfluxDB 'line protocol' lines into an InfluxDB v2 database."""
with InfluxDBClient(**influxdb_conn_params) as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=RAW_BUCKET_NAME, record=lines)
DURATIONS = ['1m', '5m', '1h']
......@@ -146,90 +117,92 @@ def _get_symbols_list(args):
return symbols_list
def create_continuous_queries(args):
"""Create continuous queries to create averaged fields.
Due to limitations of CQs in InfluxDB CQs can not be specified with
an execution offset (ex. run CQ 30s after the specified interval). To
get around this we resample the CQ every 2 intervals. So for a 1m
interval we run the query to cover 2m and take the average for each 1m
interval. This results in points being calculated twice and the most
recent point being modified (new data or updated averages).
def create_tasks(args):
"""Create recurring tasks that create averaged versions of the raw data.
Any real time applications using the API and not requesting the whole
data interval (only getting the newest point) should either be able to
update the newest point or should ignore the newest point until there
is a point after it (assuming data came in in a semi-regular manner).
https://github.com/influxdata/influxdb/issues/6932
https://docs.influxdata.com/influxdb/v1.5/query_language/continuous_queries/#advanced-syntax
The task as configured (see raw string variable below) assumes realtime
data never arrives later than 1 minute after the observation time. This
task/tasks only apply to newly arriving data and will not be applied to
backfilled data. The aggregation/averaging function must be called
separately.
"""
symbols_list = _get_symbols_list(args)
for idx, symbols in enumerate(symbols_list):
if "wind_dir" in symbols:
symbols.extend(["wind_east", "wind_north"])
# symbols.remove("wind_speed")
symbols.remove("wind_dir")
influxdb_conn_params = convert_influxdb_args_to_kwargs(args)
with InfluxDBClient(**influxdb_conn_params) as client:
tasks_api = client.tasks_api()
for duration in DURATIONS:
factor = 3 if duration == '1m' else 2
# see docstring above, 3m for 1m, 10m for 5m, etc
resample_for = "{:d}{}".format(int(duration[:-1]) * factor, duration[-1])
LOG.info("Adding continuous query for '%s' averaged data", duration)
operation_str = ",".join(["mean({field}) as {field}".format(field=field) for field in symbols])
cq_name = "cq_{idx}_{duration}".format(idx=idx, duration=duration)
if hasattr(args, "drop_previous") and args.drop_previous:
LOG.info("Dropping previous CQ '%s'", cq_name)
query_str = "DROP CONTINUOUS QUERY {cq_name} ON {dbname}".format(cq_name=cq_name, dbname=args.dbname)
_query(query_str, host=args.host, port=args.port, dbname=args.dbname)
rp = DURATION_TO_RP.get(duration, 'one_year')
query_str = "CREATE CONTINUOUS QUERY {cq_name} ON {dbname} " \
"RESAMPLE EVERY {duration} FOR {resample_for} " \
"BEGIN SELECT {ops} INTO " \
"{dbname}.{rp}.metobs_{duration} FROM " \
"{dbname}.one_hour.metobs " \
"GROUP BY time({duration}), inst, site END".format(
dbname=args.dbname, duration=duration, ops=operation_str,
rp=rp, cq_name=cq_name, resample_for=resample_for)
_query(query_str, host=args.host, port=args.port, dbname=args.dbname)
LOG.info(f"Adding task for {duration!r} averaged data")
task_for_duration = AGGREGATE_FLUX_TASK_FORMAT.format(
interval=duration,
raw_bucket=RAW_BUCKET_NAME,
realtime_bucket=REALTIME_BUCKET_NAME,
org=ORG_NAME,
)
task_request = TaskCreateRequest(
flux=task_for_duration,
org=ORG_NAME,
description=f"Aggregate raw data into {duration} averages",
status="active",
)
tasks_api.create_task(task_create_request=task_request)
# Assume that data coming from realtime sources are no more than 1 minute late
# XXX: Do I need to get 2x the interal of data?
# XXX: Do I need to do anything about per-site/inst pair or per-symbol?
AGGREGATE_FLUX_TASK_FORMAT = """
option task = {{
name: "Average data ({interval})",
every: {interval},
offset: 1m,
}}
from(bucket: "{raw_bucket}")
|> range(start: -(task.every * 2))
|> filter(fn: (r) => r["_measurement"] == "metobs")
|> aggregateWindow(every: {interval}, fn: mean)
|> set(key: "_measurement", value: "metobs_{interval}")
|> to(org: "{org}", bucket: "{realtime_bucket}", tagColumns: ["site", "inst"])
"""
MANUAL_AGGREGATE_QUERY = """
from(bucket: "{raw_bucket}")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "metobs")
|> filter(fn: (r) => r["site"] == "{site}" and r["inst"] == "{inst}")
|> aggregateWindow(every: {interval}, fn: mean)
|> set(key: "_measurement", value: "metobs_{interval}")
|> to(bucket: "{realtime_bucket}", tagColumns: ["site", "inst"])
"""
def run_average_queries(args):
stations = [x.split('.') for x in args.stations]
assert(all(len(x) == 2 for x in stations))
site_cond = ["(site='{site}' AND inst='{inst}')".format(site=site, inst=inst)
for site, inst in stations]
site_cond = " OR ".join(site_cond)
symbols_list = _get_symbols_list(args)
# flatten symbols list to get all of them together
symbols = sorted(set([x for y in symbols_list for x in y]))
if "wind_dir" in symbols:
symbols.extend(["wind_east", "wind_north"])
# symbols.remove("wind_speed")
symbols.remove("wind_dir")
start_time = args.start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time = args.end_time.strftime("%Y-%m-%d %H:%M:%S")
operation_str = ",".join(["mean({field}) as {field}".format(field=field) for field in symbols])
for duration in args.durations:
LOG.info("Running average query for '%s' averaged data", duration)
rp = DURATION_TO_RP.get(duration, 'one_year')
query_str = FAKE_CQ_TEMPLATE.format(
dbname=args.dbname, duration=duration, rp=rp,
start_time=start_time, end_time=end_time,
site_conditions=site_cond, ops=operation_str,
)
if args.debug:
print(query_str)
continue
else:
LOG.debug(query_str)
_query(query_str, host=args.host, port=args.port, dbname=args.dbname)
if not (all(len(x) == 2 for x in stations)):
raise ValueError("Stations must be specified as 2-element pairs separated by a '.'.")
start_time = args.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
end_time = args.end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
influxdb_conn_params = convert_influxdb_args_to_kwargs(args)
with InfluxDBClient(**influxdb_conn_params) as client:
query_api = client.query_api()
for site, inst in stations:
for duration in args.durations:
LOG.info(f"Running average query for {duration!r} averaged data")
aggregate_query = MANUAL_AGGREGATE_QUERY.format(
raw_bucket=RAW_BUCKET_NAME,
start=start_time,
stop=end_time,
site=site,
inst=inst,
interval=duration,
realtime_bucket=REALTIME_BUCKET_NAME,
)
if args.debug:
print(aggregate_query)
continue
LOG.debug(aggregate_query)
query_api.query(aggregate_query)
def _dt_convert(datetime_str):
......@@ -238,35 +211,16 @@ def _dt_convert(datetime_str):
def main():
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default=DB_HOST,
help="Hostname of database connection")
parser.add_argument("--port", default=DB_PORT, type=int,
help="Port of database connection")
parser.add_argument("--dbname", default=DB_NAME,
help="Name of database to modify")
parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
add_influxdb_command_line_arguments(parser)
subparsers = parser.add_subparsers()
subparser = subparsers.add_parser("create", help="Create the initial database and CQs")
subparser.add_argument("--symbol-list", nargs="+",
help="Lists of importable paths for the symbols to use in continuous queries")
subparser.add_argument("--symbols", nargs="+",
help="List of symbols to include in the continuous queries (alternative to --symbol-list)")
subparser.set_defaults(func=create_database)
subparser = subparsers.add_parser("create_cqs", help="Create the continuous queries")
subparser.add_argument("--symbol-list", nargs="+",
help="Lists of importable paths for the symbols to use in continuous queries")
subparser.add_argument("--symbols", nargs="+",
help="List of symbols to include in the continuous queries (alternative to --symbol-list)")
subparser.add_argument("--drop-previous", action="store_true",
help="Attempt to drop previous CQs with the same names")
subparser.set_defaults(func=create_continuous_queries)
subparser = subparsers.add_parser("run_manual_cqs", help="Manually run averaging CQs")
subparser = subparsers.add_parser("create_tasks", help="Create the continuous queries")
subparser.set_defaults(func=create_tasks)
subparser = subparsers.add_parser("run_manual_average", help="Manually run averaging queries")
subparser.add_argument('--debug', action='store_true',
help='Don\'t actually send the commands, print them to stdout')
subparser.add_argument('--stations', nargs="+",
......@@ -282,10 +236,6 @@ def main():
subparser.add_argument('-d', '--durations', nargs='+', default=['1m'],
choices=DURATIONS,
help="Which average intervals to compute")
subparser.add_argument("--symbol-list", nargs="+",
help="Lists of importable paths for the symbols to use in continuous queries")
subparser.add_argument("--symbols", nargs="+",
help="List of symbols to include in the continuous queries (alternative to --symbol-list)")
subparser.set_defaults(func=run_average_queries)
args = parser.parse_args()
......@@ -297,5 +247,25 @@ def main():
return args.func(args)
def add_influxdb_command_line_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--influxdb-host", default=DB_HOST,
help="Hostname of database connection")
parser.add_argument("--influxdb-port", default=DB_PORT, type=int,
help="Port of database connection")
parser.add_argument("--influxdb-org", default="metobs",
help="InfluxDB organization to do work under")
parser.add_argument("--influxdb-token", required=True,
help="Authentication token for InfluxDB connections."
"This should have write permissions to all metobs buckets.")
def convert_influxdb_args_to_kwargs(args: argparse.Namespace) -> dict[str, Any]:
return {
"url": f"http://{args.influxdb_host}:{args.influxdb_port}",
"org": args.influxdb_org,
"token": args.influxdb_token,
}
if __name__ == "__main__":
sys.exit(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment