Skip to content
Snippets Groups Projects
Commit 0f5bd87b authored by William Roberts's avatar William Roberts
Browse files

Begin making time interval more dynamic

parent 9749b866
No related branches found
No related tags found
No related merge requests found
...@@ -57,37 +57,40 @@ SYMBOLS = list(SYMBOL_CONVERSIONS.values()) ...@@ -57,37 +57,40 @@ SYMBOLS = list(SYMBOL_CONVERSIONS.values())
class Updater(object): class Updater(object):
"""Append weather record (taken as a dict) and do averages when enough data is ready. """Append weather record (taken as a dict) and do averages when enough data is ready.
At least 12 minutes of data is required to do averaging: 10 minutes of wind gusts, At least 12 minutes of data is required to do averaging for gust_10m: 10 minutes of wind gusts,
and wind gusts need 2 minutes of data to calculate. and wind gusts need 2 minutes of data to calculate.
This class is created once at startup and calls rolling_average every time new data is available which tries to do This class is created once at startup and calls rolling_average every time new data is available which tries to do
averaging every 5 minutes of data added. averaging every 5 minutes of data added.
""" """
def __init__(self): def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)):
"""intervals are timedelta objects."""
self.data = {} self.data = {}
self.start_time = None self.start_time = None
self.data_interval = data_interval.total_seconds()
self.submit_interval = submit_interval.total_seconds()
def rolling_average(self, record): def rolling_average(self, record):
self.start_time = self.start_time if self.start_time else record['timestamp'] # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame.
time_interval = (record['timestamp'] - self.start_time).total_seconds() time_mask = True
if self.data.get('timestamp') is not None:
time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12)
for key in record: for key in record:
if self.data.get(key) is None: if self.data.get(key) is None:
self.data[key] = np.array([]) self.data[key] = np.array([])
self.data[key] = np.append(self.data[key], record[key]) self.data[key] = np.append(self.data[key][time_mask], record[key])
# If 5 minutes of data are ready, average current data in dict. Holds up to 15 minutes.
# 60 * 5 seconds = 5 minutes. reference = pd.datetime(1, 1, 1)
if time_interval >= 300 or (record['timestamp'].minute % 5 == 0 and record['timestamp'].second == 0): if (record['timestamp'] - reference).total_seconds() % self.submit_interval == 0:
# Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. submit = True
frame = self._calculate_averages() elif (record['timestamp'] - reference).total_seconds() % self.submit_interval + self.data_interval > \
frame.fillna(value=np.nan, inplace=True) self.submit_interval:
# Keep data set within minimum window to improve speed. submit = True
# Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data else:
# before it, totalling 12 minutes. submit = False
time_mask = frame.index > frame.index[-1] - timedelta(minutes=12) # If data hits a submit_interval interval, return data.
self.data = {key: val[time_mask] for key, val in self.data.items()} if submit:
self.start_time = self.data['timestamp'][-1] return self._calculate_averages()
if record['timestamp'].minute % 5 == 0 and record['timestamp'].second == 0:
return frame
def _calculate_averages(self): def _calculate_averages(self):
KNOTS_9 = calc.knots_to_mps(9.) KNOTS_9 = calc.knots_to_mps(9.)
...@@ -95,6 +98,7 @@ class Updater(object): ...@@ -95,6 +98,7 @@ class Updater(object):
frame = pd.DataFrame(self.data) frame = pd.DataFrame(self.data)
frame.set_index('timestamp', inplace=True) frame.set_index('timestamp', inplace=True)
frame.mask(frame == -99999., inplace=True) frame.mask(frame == -99999., inplace=True)
full_frame = frame.asfreq('{0}S'.format(self.data_interval))
# Add wind direction components so we can average wind direction properly. # Add wind direction components so we can average wind direction properly.
frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
frame['wind_dir']) frame['wind_dir'])
...@@ -105,21 +109,28 @@ class Updater(object): ...@@ -105,21 +109,28 @@ class Updater(object):
"it from air temp and relative humidity") "it from air temp and relative humidity")
frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])
# https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases
# 2 minute rolling average. # 2 minute rolling average.
winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean() winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean()
frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
frame_2 = frame[frame.index > frame.index[-1] - timedelta(minutes=2)]
full_2 = full_frame[full_frame.index > full_frame.index[-1] - timedelta(minutes=2)]
# Makes 2 minute averages nans if given less than 2 minutes of data.
if len(full_2) < 120 / self.data_interval or len(frame_2) != len(full_2):
frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True)
frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True)
# 1 minute rolling peaks # 1 minute rolling peaks
wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max() wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max()
# criteria for a fast wind to be considered a wind gust # criteria for a fast wind to be considered a wind gust
gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\
(wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5)
frame['gust_1m'] = wind_peak_1m.mask(~gust_mask) frame['gust_1m'] = wind_peak_1m.mask(~gust_mask)
frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
# Make 10 minute gusts before 12 minutes nans because data is insufficient. # Makes 10 minute gusts before 12 minutes nans because data is insufficient.
if (self.data['timestamp'][-1] - self.data['timestamp'][0]).total_seconds() < 720: if len(full_frame) < 720 / self.data_interval or len(frame) != len(full_frame):
frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
return frame return frame.fillna(value=np.nan)
def convert_to_influx_frame(record_gen, symbols, debug=False): def convert_to_influx_frame(record_gen, symbols, debug=False):
...@@ -245,7 +256,7 @@ def main(): ...@@ -245,7 +256,7 @@ def main():
try: try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk) influx_gen = influxdb.grouper(influx_gen, args.bulk)
updater = Updater() updater = Updater(submit_interval=timedelta(minutes=30))
import time import time
t0 = time.time() t0 = time.time()
for record in influx_gen: for record in influx_gen:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment