From 255053ee808e7dee16fec50ed1ea5f9aa16cba28 Mon Sep 17 00:00:00 2001 From: wroberts <wroberts4@wisc.edu> Date: Mon, 12 Aug 2019 09:23:16 -0500 Subject: [PATCH] Finish making time interval more dynamic --- aosstower/level_00/influxdb.py | 50 ++++++++++------------------------ 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index d549427..512635c 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -8,6 +8,7 @@ import logging.handlers import time import sys import requests +from datetime import timedelta from metobscommon import influxdb from aosstower.level_00.parser import read_frames from metobscommon.util import calc @@ -15,7 +16,6 @@ from metobscommon.util.nc import calculate_wind_gust import numpy as np import pandas as pd import warnings -from datetime import timedelta LOG = logging.getLogger(__name__) # map station name to InfluxDB tags @@ -61,44 +61,32 @@ class Updater(object): and wind gusts need 2 minutes of data to calculate. This class is created once at startup and calls rolling_average every time new data is available which tries to do - averaging every 5 minutes of data added. + averaging every submit_interval of data added. """ def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)): """intervals are timedelta objects.""" - self.data = {} - self.start_time = None + self.data = {'timestamp': np.array([])} self.data_interval = data_interval.total_seconds() self.submit_interval = submit_interval.total_seconds() def rolling_average(self, record): # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. - time_mask = True - if self.data.get('timestamp') is not None: - time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12) + time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12) for key in record: if self.data.get(key) is None: self.data[key] = np.array([]) self.data[key] = np.append(self.data[key][time_mask], record[key]) - reference = pd.datetime(1, 1, 1) - if (record['timestamp'] - reference).total_seconds() % self.submit_interval == 0: - submit = True - elif (record['timestamp'] - reference).total_seconds() % self.submit_interval + self.data_interval > \ - self.submit_interval: - submit = True - else: - submit = False - # If data hits a submit_interval interval, return data. - if submit: + reference = pd.datetime(record['timestamp'].year, 1, 1) + progress = (record['timestamp'] - reference).total_seconds() % self.submit_interval + # If data hits or will pass over a submit_interval interval, return data. + if progress == 0 or progress > self.submit_interval - self.data_interval: return self._calculate_averages() def _calculate_averages(self): - KNOTS_9 = calc.knots_to_mps(9.) - KNOTS_5 = calc.knots_to_mps(5.) frame = pd.DataFrame(self.data) frame.set_index('timestamp', inplace=True) frame.mask(frame == -99999., inplace=True) - full_frame = frame.asfreq('{0}S'.format(self.data_interval)) # Add wind direction components so we can average wind direction properly. frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) @@ -114,21 +102,19 @@ class Updater(object): winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean() frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) - frame_2 = frame[frame.index > frame.index[-1] - timedelta(minutes=2)] - full_2 = full_frame[full_frame.index > full_frame.index[-1] - timedelta(minutes=2)] # Makes 2 minute averages nans if given less than 2 minutes of data. - if len(full_2) < 120 / self.data_interval or len(frame_2) != len(full_2): + if len(frame[frame.index > frame.index[-1] - timedelta(minutes=2)]) < 120 / self.data_interval: frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True) frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True) # 1 minute rolling peaks wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max() # criteria for a fast wind to be considered a wind gust - gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ - (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) + gust_mask = (winds_frame_2m['wind_speed'] >= calc.knots_to_mps(9.)) &\ + (wind_peak_1m >= winds_frame_2m['wind_speed'] + calc.knots_to_mps(5.)) frame['gust_1m'] = wind_peak_1m.mask(~gust_mask) frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) # Makes 10 minute gusts before 12 minutes nans because data is insufficient. - if len(full_frame) < 720 / self.data_interval or len(frame) != len(full_frame): + if len(frame) < 720 / self.data_interval: frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) return frame.fillna(value=np.nan) @@ -256,18 +242,15 @@ def main(): try: influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = influxdb.grouper(influx_gen, args.bulk) - updater = Updater(submit_interval=timedelta(minutes=30)) - import time - t0 = time.time() + updater = Updater() for record in influx_gen: - # lines = influxdb.frame_records(record, **station_tags) - # influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) + lines = influxdb.frame_records(record, **station_tags) + influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) # Record is in a list of size 1, but want just the record. avg = updater.rolling_average(record[0]) # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. if avg is not None: url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) - print(url) if wu_pw and args.ldmp: resp = requests.post(url) if resp.status_code != 200: @@ -276,10 +259,7 @@ def main(): if args.sleep_interval: time.sleep(args.sleep_interval) - t1 = time.time() - print(t1 - t0) except (RuntimeError, ValueError, KeyError, requests.RequestException): - raise if hasattr(record_gen, 'close'): record_gen.close() -- GitLab