diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index 2fb1689af0f3675313308f19266f8e3fb16b8892..d549427b3ca913b560f6c6a9992bb9e768432e6d 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -57,37 +57,40 @@ SYMBOLS = list(SYMBOL_CONVERSIONS.values()) class Updater(object): """Append weather record (taken as a dict) and do averages when enough data is ready. - At least 12 minutes of data is required to do averaging: 10 minutes of wind gusts, + At least 12 minutes of data is required to do averaging for gust_10m: 10 minutes of wind gusts, and wind gusts need 2 minutes of data to calculate. This class is created once at startup and calls rolling_average every time new data is available which tries to do averaging every 5 minutes of data added. """ - def __init__(self): + def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)): + """intervals are timedelta objects.""" self.data = {} self.start_time = None + self.data_interval = data_interval.total_seconds() + self.submit_interval = submit_interval.total_seconds() def rolling_average(self, record): - self.start_time = self.start_time if self.start_time else record['timestamp'] - time_interval = (record['timestamp'] - self.start_time).total_seconds() + # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. + time_mask = True + if self.data.get('timestamp') is not None: + time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12) for key in record: if self.data.get(key) is None: self.data[key] = np.array([]) - self.data[key] = np.append(self.data[key], record[key]) - # If 5 minutes of data are ready, average current data in dict. Holds up to 15 minutes. - # 60 * 5 seconds = 5 minutes. - if time_interval >= 300 or (record['timestamp'].minute % 5 == 0 and record['timestamp'].second == 0): - # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. - frame = self._calculate_averages() - frame.fillna(value=np.nan, inplace=True) - # Keep data set within minimum window to improve speed. - # Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data - # before it, totalling 12 minutes. - time_mask = frame.index > frame.index[-1] - timedelta(minutes=12) - self.data = {key: val[time_mask] for key, val in self.data.items()} - self.start_time = self.data['timestamp'][-1] - if record['timestamp'].minute % 5 == 0 and record['timestamp'].second == 0: - return frame + self.data[key] = np.append(self.data[key][time_mask], record[key]) + + reference = pd.datetime(1, 1, 1) + if (record['timestamp'] - reference).total_seconds() % self.submit_interval == 0: + submit = True + elif (record['timestamp'] - reference).total_seconds() % self.submit_interval + self.data_interval > \ + self.submit_interval: + submit = True + else: + submit = False + # If data hits a submit_interval interval, return data. + if submit: + return self._calculate_averages() def _calculate_averages(self): KNOTS_9 = calc.knots_to_mps(9.) @@ -95,6 +98,7 @@ class Updater(object): frame = pd.DataFrame(self.data) frame.set_index('timestamp', inplace=True) frame.mask(frame == -99999., inplace=True) + full_frame = frame.asfreq('{0}S'.format(self.data_interval)) # Add wind direction components so we can average wind direction properly. frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) @@ -105,21 +109,28 @@ class Updater(object): "it from air temp and relative humidity") frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) + # https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases # 2 minute rolling average. - winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean() + winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean() frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) + frame_2 = frame[frame.index > frame.index[-1] - timedelta(minutes=2)] + full_2 = full_frame[full_frame.index > full_frame.index[-1] - timedelta(minutes=2)] + # Makes 2 minute averages nans if given less than 2 minutes of data. + if len(full_2) < 120 / self.data_interval or len(frame_2) != len(full_2): + frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True) + frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True) # 1 minute rolling peaks - wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max() + wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max() # criteria for a fast wind to be considered a wind gust gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) frame['gust_1m'] = wind_peak_1m.mask(~gust_mask) frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) - # Make 10 minute gusts before 12 minutes nans because data is insufficient. - if (self.data['timestamp'][-1] - self.data['timestamp'][0]).total_seconds() < 720: + # Makes 10 minute gusts before 12 minutes nans because data is insufficient. + if len(full_frame) < 720 / self.data_interval or len(frame) != len(full_frame): frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) - return frame + return frame.fillna(value=np.nan) def convert_to_influx_frame(record_gen, symbols, debug=False): @@ -245,7 +256,7 @@ def main(): try: influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = influxdb.grouper(influx_gen, args.bulk) - updater = Updater() + updater = Updater(submit_interval=timedelta(minutes=30)) import time t0 = time.time() for record in influx_gen: