Skip to content
Snippets Groups Projects
Commit 255053ee authored by William Roberts's avatar William Roberts
Browse files

Finish making time interval more dynamic

parent 0f5bd87b
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ import logging.handlers ...@@ -8,6 +8,7 @@ import logging.handlers
import time import time
import sys import sys
import requests import requests
from datetime import timedelta
from metobscommon import influxdb from metobscommon import influxdb
from aosstower.level_00.parser import read_frames from aosstower.level_00.parser import read_frames
from metobscommon.util import calc from metobscommon.util import calc
...@@ -15,7 +16,6 @@ from metobscommon.util.nc import calculate_wind_gust ...@@ -15,7 +16,6 @@ from metobscommon.util.nc import calculate_wind_gust
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import warnings import warnings
from datetime import timedelta
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags # map station name to InfluxDB tags
...@@ -61,44 +61,32 @@ class Updater(object): ...@@ -61,44 +61,32 @@ class Updater(object):
and wind gusts need 2 minutes of data to calculate. and wind gusts need 2 minutes of data to calculate.
This class is created once at startup and calls rolling_average every time new data is available which tries to do This class is created once at startup and calls rolling_average every time new data is available which tries to do
averaging every 5 minutes of data added. averaging every submit_interval of data added.
""" """
def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)): def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)):
"""intervals are timedelta objects.""" """intervals are timedelta objects."""
self.data = {} self.data = {'timestamp': np.array([])}
self.start_time = None
self.data_interval = data_interval.total_seconds() self.data_interval = data_interval.total_seconds()
self.submit_interval = submit_interval.total_seconds() self.submit_interval = submit_interval.total_seconds()
def rolling_average(self, record): def rolling_average(self, record):
# Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame.
time_mask = True time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12)
if self.data.get('timestamp') is not None:
time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12)
for key in record: for key in record:
if self.data.get(key) is None: if self.data.get(key) is None:
self.data[key] = np.array([]) self.data[key] = np.array([])
self.data[key] = np.append(self.data[key][time_mask], record[key]) self.data[key] = np.append(self.data[key][time_mask], record[key])
reference = pd.datetime(1, 1, 1) reference = pd.datetime(record['timestamp'].year, 1, 1)
if (record['timestamp'] - reference).total_seconds() % self.submit_interval == 0: progress = (record['timestamp'] - reference).total_seconds() % self.submit_interval
submit = True # If data hits or will pass over a submit_interval interval, return data.
elif (record['timestamp'] - reference).total_seconds() % self.submit_interval + self.data_interval > \ if progress == 0 or progress > self.submit_interval - self.data_interval:
self.submit_interval:
submit = True
else:
submit = False
# If data hits a submit_interval interval, return data.
if submit:
return self._calculate_averages() return self._calculate_averages()
def _calculate_averages(self): def _calculate_averages(self):
KNOTS_9 = calc.knots_to_mps(9.)
KNOTS_5 = calc.knots_to_mps(5.)
frame = pd.DataFrame(self.data) frame = pd.DataFrame(self.data)
frame.set_index('timestamp', inplace=True) frame.set_index('timestamp', inplace=True)
frame.mask(frame == -99999., inplace=True) frame.mask(frame == -99999., inplace=True)
full_frame = frame.asfreq('{0}S'.format(self.data_interval))
# Add wind direction components so we can average wind direction properly. # Add wind direction components so we can average wind direction properly.
frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
frame['wind_dir']) frame['wind_dir'])
...@@ -114,21 +102,19 @@ class Updater(object): ...@@ -114,21 +102,19 @@ class Updater(object):
winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean() winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean()
frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
frame_2 = frame[frame.index > frame.index[-1] - timedelta(minutes=2)]
full_2 = full_frame[full_frame.index > full_frame.index[-1] - timedelta(minutes=2)]
# Makes 2 minute averages nans if given less than 2 minutes of data. # Makes 2 minute averages nans if given less than 2 minutes of data.
if len(full_2) < 120 / self.data_interval or len(frame_2) != len(full_2): if len(frame[frame.index > frame.index[-1] - timedelta(minutes=2)]) < 120 / self.data_interval:
frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True) frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True)
frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True) frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True)
# 1 minute rolling peaks # 1 minute rolling peaks
wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max() wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max()
# criteria for a fast wind to be considered a wind gust # criteria for a fast wind to be considered a wind gust
gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ gust_mask = (winds_frame_2m['wind_speed'] >= calc.knots_to_mps(9.)) &\
(wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) (wind_peak_1m >= winds_frame_2m['wind_speed'] + calc.knots_to_mps(5.))
frame['gust_1m'] = wind_peak_1m.mask(~gust_mask) frame['gust_1m'] = wind_peak_1m.mask(~gust_mask)
frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
# Makes 10 minute gusts before 12 minutes nans because data is insufficient. # Makes 10 minute gusts before 12 minutes nans because data is insufficient.
if len(full_frame) < 720 / self.data_interval or len(frame) != len(full_frame): if len(frame) < 720 / self.data_interval:
frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
return frame.fillna(value=np.nan) return frame.fillna(value=np.nan)
...@@ -256,18 +242,15 @@ def main(): ...@@ -256,18 +242,15 @@ def main():
try: try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk) influx_gen = influxdb.grouper(influx_gen, args.bulk)
updater = Updater(submit_interval=timedelta(minutes=30)) updater = Updater()
import time
t0 = time.time()
for record in influx_gen: for record in influx_gen:
# lines = influxdb.frame_records(record, **station_tags) lines = influxdb.frame_records(record, **station_tags)
# influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
# Record is in a list of size 1, but want just the record. # Record is in a list of size 1, but want just the record.
avg = updater.rolling_average(record[0]) avg = updater.rolling_average(record[0])
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
if avg is not None: if avg is not None:
url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) url = construct_url(get_url_data(avg, args.wu_id, wu_pw))
print(url)
if wu_pw and args.ldmp: if wu_pw and args.ldmp:
resp = requests.post(url) resp = requests.post(url)
if resp.status_code != 200: if resp.status_code != 200:
...@@ -276,10 +259,7 @@ def main(): ...@@ -276,10 +259,7 @@ def main():
if args.sleep_interval: if args.sleep_interval:
time.sleep(args.sleep_interval) time.sleep(args.sleep_interval)
t1 = time.time()
print(t1 - t0)
except (RuntimeError, ValueError, KeyError, requests.RequestException): except (RuntimeError, ValueError, KeyError, requests.RequestException):
raise
if hasattr(record_gen, 'close'): if hasattr(record_gen, 'close'):
record_gen.close() record_gen.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment