Skip to content
Snippets Groups Projects
Commit 1a5f5e3e authored by William Roberts's avatar William Roberts
Browse files

Begin making time interval more dynamic

parent e74150d9
Branches
No related tags found
No related merge requests found
...@@ -15,6 +15,7 @@ from metobscommon.util.nc import calculate_wind_gust ...@@ -15,6 +15,7 @@ from metobscommon.util.nc import calculate_wind_gust
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import warnings import warnings
from datetime import timedelta
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags # map station name to InfluxDB tags
...@@ -63,44 +64,69 @@ class Updater(object): ...@@ -63,44 +64,69 @@ class Updater(object):
averaging every 5 minutes of data added. averaging every 5 minutes of data added.
""" """
def __init__(self): def __init__(self):
# Counter that returns rolling average every 5 minutes. independent of
# self.data since self.data length can be 144 == 12 minutes.
self.counter = 0
self.data = {} self.data = {}
self.start_time = None
def rolling_average(self, record): def rolling_average(self, record):
self.counter += 1 self.start_time = self.start_time if self.start_time else record['timestamp']
# Add new data to dict. time_interval = (record['timestamp'] - self.start_time).total_seconds()
for key in record: # Add in 5 minutes for cutoff
self.data.setdefault(key, []).append(record[key]) start = record['timestamp'] - timedelta(seconds=time_interval % 300)
# If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is end = self.start_time + timedelta(minutes=5)
# dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes. if time_interval > 300:
if self.counter % 60 == 0: for key in record:
if key == 'timestamp':
self.data[key] = np.append(self.data[key], end)
else:
self.data[key] = np.append(self.data[key], np.nan)
else:
for key in record:
if self.data.get(key) is None:
self.data[key] = np.array([])
self.data[key] = np.append(self.data[key], record[key])
# If 5 minutes of data are ready, average current data in dict. Holds up to 15 minutes.
# 60 * 5 seconds = 5 minutes.
if time_interval >= 300:
# Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame.
frame = self._calculate_averages() frame = self._calculate_averages()
frame.fillna(value=np.nan, inplace=True)
print(frame['rel_hum'])
print(frame.resample('5T', closed='right').mean()['rel_hum'])
print(frame.asfreq('5T')['rel_hum'])
# Keep data set within minimum window to improve speed. # Keep data set within minimum window to improve speed.
# Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data # Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data
# before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should # before it, totalling 12 minutes.
# release old data. Data is in 5 second intervals, and 5 seconds * 180 = 15 minutes. test = frame.asfreq('5T')
if self.counter == 180: if len(test.index) > 3:
for key, val in self.data.items(): new_frame = frame.copy()
# Keep last 7 minutes since 5 + 7 = 12: 5 seconds * 84 = 7 minutes. Optimises performance. new_frame['timestamp'] = list(new_frame.index)
self.data[key] = val[-84:] data = new_frame.drop(new_frame.index[:new_frame.index.get_loc(test.index[-3])]).to_dict('list')
self.counter -= 60 for key in self.data:
self.data[key] = data[key]
if time_interval > 300:
for key in record:
if key == 'timestamp':
if end != start:
self.data[key] = np.array([start])
else:
if end != start:
self.data[key] = np.array([np.nan])
self.start_time = self.data['timestamp'][-1]
for key in record:
if self.data.get(key) is None:
self.data[key] = np.array([])
self.data[key] = np.append(self.data[key], record[key])
else: else:
# Make 10 minute gusts before 12 minutes nans because data is insufficient. self.start_time = self.data['timestamp'][-1]
frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) return test
frame.fillna(np.nan, inplace=True)
return frame
def _calculate_averages(self): def _calculate_averages(self):
frame = pd.DataFrame(self.data)
KNOTS_9 = calc.knots_to_mps(9.) KNOTS_9 = calc.knots_to_mps(9.)
KNOTS_5 = calc.knots_to_mps(5.) KNOTS_5 = calc.knots_to_mps(5.)
frame = pd.DataFrame(self.data)
frame.set_index('timestamp', inplace=True) frame.set_index('timestamp', inplace=True)
frame.mask(frame == -99999., inplace=True) frame.mask(frame == -99999., inplace=True)
frame.fillna(value=np.nan, inplace=True) # Add wind direction components so we can average wind direction properly.
# Add wind direction components so we can average wind direction properly
frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
frame['wind_dir']) frame['wind_dir'])
frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north']) frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north'])
...@@ -110,18 +136,20 @@ class Updater(object): ...@@ -110,18 +136,20 @@ class Updater(object):
"it from air temp and relative humidity") "it from air temp and relative humidity")
frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])
# 2 minute rolling average of 5 second data. # 2 minute rolling average.
winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean() winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean()
frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
# TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG.
# 1 minute rolling peaks # 1 minute rolling peaks
wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max() wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max()
# criteria for a fast wind to be considered a wind gust # criteria for a fast wind to be considered a wind gust
gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\
(wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5)
frame['cur_gust'] = wind_peak_1m.mask(~gust_mask) frame['gust_1m'] = wind_peak_1m.mask(~gust_mask)
frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
# Make 10 minute gusts before 12 minutes nans because data is insufficient.
if (self.data['timestamp'][-1] - self.data['timestamp'][0]).total_seconds() < 720:
frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
return frame return frame
...@@ -136,6 +164,9 @@ def convert_to_influx_frame(record_gen, symbols, debug=False): ...@@ -136,6 +164,9 @@ def convert_to_influx_frame(record_gen, symbols, debug=False):
def construct_url(data): def construct_url(data):
for key, val in data.items():
if val is None or isinstance(val, float) and np.isnan(val):
data[key] = ''
return ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' return ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?'
'ID={ID}&' 'ID={ID}&'
'PASSWORD={PASSWORD}&' 'PASSWORD={PASSWORD}&'
...@@ -161,26 +192,25 @@ def get_url_data(avg, wu_id, wu_pw): ...@@ -161,26 +192,25 @@ def get_url_data(avg, wu_id, wu_pw):
# Information on what paramaters that can be sent: # Information on what paramaters that can be sent:
# https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298 # https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298
# For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute. # For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute.
timestamp = avg.index[-1].round('1T').isoformat('+') timestamp = avg.index[-1].isoformat('+')
wind_dir = avg['wind_dir'][-1] wind_dir = avg['wind_dir'][-1]
wind_dir_2m = avg['wind_dir_2m'][-1] wind_dir_2m = avg['wind_dir_2m'][-1]
rel_hum = avg['rel_hum'][-1]
solar_flux = avg['solar_flux'][-1]
precip = avg['precip'][-1]
accum_precip = avg['accum_precip'][-1]
# Converts from m/s to mph. # Converts from m/s to mph.
wind_speed = avg['wind_speed'][-1] * 2.23694 wind_speed = avg['wind_speed'][-1] * 2.23694
wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694 wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694
cur_gust = avg['cur_gust'][-1] * 2.23694 gust_1m = avg['gust_1m'][-1] * 2.23694
gust_10m = avg['gust_10m'][-1] * 2.23694 gust_10m = avg['gust_10m'][-1] * 2.23694
rel_hum = avg['rel_hum'][-1]
# Converts degrees Celsius to degrees Fahrenheit # Converts degrees Celsius to degrees Fahrenheit
air_temp = avg['air_temp'][-1] * 9. / 5. + 32. air_temp = avg['air_temp'][-1] * 9. / 5. + 32.
dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
# hpa to barometric pressure inches # hpa to barometric pressure inches
pressure = avg['pressure'][-1] * 0.02952998016471232 pressure = avg['pressure'][-1] * 0.02952998016471232
# degrees Celcus to degrees Fahrenheit.
dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
solar_flux = avg['solar_flux'][-1]
precip = avg['precip'][-1]
accum_precip = avg['accum_precip'][-1]
return {'ID': wu_id, 'PASSWORD': wu_pw, 'dateutc': timestamp, 'winddir': wind_dir, 'winddir_avg2m': wind_dir_2m, return {'ID': wu_id, 'PASSWORD': wu_pw, 'dateutc': timestamp, 'winddir': wind_dir, 'winddir_avg2m': wind_dir_2m,
'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': cur_gust, 'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': gust_1m,
'windgustmph_10m': gust_10m, 'humidity': rel_hum, 'tempf': air_temp, 'baromin': pressure, 'windgustmph_10m': gust_10m, 'humidity': rel_hum, 'tempf': air_temp, 'baromin': pressure,
'dewptf': dewpoint, 'solarradiation': solar_flux, 'rainin': precip, 'dailyrainin': accum_precip} 'dewptf': dewpoint, 'solarradiation': solar_flux, 'rainin': precip, 'dailyrainin': accum_precip}
...@@ -247,14 +277,17 @@ def main(): ...@@ -247,14 +277,17 @@ def main():
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk) influx_gen = influxdb.grouper(influx_gen, args.bulk)
updater = Updater() updater = Updater()
import time
t0 = time.time()
for record in influx_gen: for record in influx_gen:
lines = influxdb.frame_records(record, **station_tags) # lines = influxdb.frame_records(record, **station_tags)
influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) # influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
# Record is in a list of size 1, but want just the record. # Record is in a list of size 1, but want just the record.
avg = updater.rolling_average(record[0]) avg = updater.rolling_average(record[0])
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
if avg is not None: if avg is not None:
url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) url = construct_url(get_url_data(avg, args.wu_id, wu_pw))
print(url)
if wu_pw and args.ldmp: if wu_pw and args.ldmp:
resp = requests.post(url) resp = requests.post(url)
if resp.status_code != 200: if resp.status_code != 200:
...@@ -263,7 +296,10 @@ def main(): ...@@ -263,7 +296,10 @@ def main():
if args.sleep_interval: if args.sleep_interval:
time.sleep(args.sleep_interval) time.sleep(args.sleep_interval)
t1 = time.time()
print(t1 - t0)
except (RuntimeError, ValueError, KeyError, requests.RequestException): except (RuntimeError, ValueError, KeyError, requests.RequestException):
raise
if hasattr(record_gen, 'close'): if hasattr(record_gen, 'close'):
record_gen.close() record_gen.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment