From 1a5f5e3e628d6a1388d602500c96bca018915aee Mon Sep 17 00:00:00 2001
From: wroberts <wroberts4@wisc.edu>
Date: Wed, 7 Aug 2019 16:56:43 -0500
Subject: [PATCH] Begin making time interval more dynamic

---
 aosstower/level_00/influxdb.py | 112 ++++++++++++++++++++++-----------
 1 file changed, 74 insertions(+), 38 deletions(-)

diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py
index 92445ec..c03fb6a 100644
--- a/aosstower/level_00/influxdb.py
+++ b/aosstower/level_00/influxdb.py
@@ -15,6 +15,7 @@ from metobscommon.util.nc import calculate_wind_gust
 import numpy as np
 import pandas as pd
 import warnings
+from datetime import timedelta
 
 LOG = logging.getLogger(__name__)
 # map station name to InfluxDB tags
@@ -63,44 +64,69 @@ class Updater(object):
     averaging every 5 minutes of data added.
     """
     def __init__(self):
-        # Counter that returns rolling average every 5 minutes. independent of
-        # self.data since self.data length can be 144 == 12 minutes.
-        self.counter = 0
         self.data = {}
+        self.start_time = None
 
     def rolling_average(self, record):
-        self.counter += 1
-        # Add new data to dict.
-        for key in record:
-            self.data.setdefault(key, []).append(record[key])
-        # If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is
-        # dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes.
-        if self.counter % 60 == 0:
+        self.start_time = self.start_time if self.start_time else record['timestamp']
+        time_interval = (record['timestamp'] - self.start_time).total_seconds()
+        # Add in 5 minutes for cutoff
+        start = record['timestamp'] - timedelta(seconds=time_interval % 300)
+        end = self.start_time + timedelta(minutes=5)
+        if time_interval > 300:
+            for key in record:
+                if key == 'timestamp':
+                    self.data[key] = np.append(self.data[key], end)
+                else:
+                    self.data[key] = np.append(self.data[key], np.nan)
+        else:
+            for key in record:
+                if self.data.get(key) is None:
+                    self.data[key] = np.array([])
+                self.data[key] = np.append(self.data[key], record[key])
+        # If 5 minutes of data are ready, average current data in dict. Holds up to 15 minutes.
+        # 60 * 5 seconds = 5 minutes.
+        if time_interval >= 300:
             # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame.
             frame = self._calculate_averages()
+            frame.fillna(value=np.nan, inplace=True)
+            print(frame['rel_hum'])
+            print(frame.resample('5T', closed='right').mean()['rel_hum'])
+            print(frame.asfreq('5T')['rel_hum'])
             # Keep data set within minimum window to improve speed.
             # Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data
-            # before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should
-            # release old data. Data is in 5 second intervals, and 5 seconds * 180 = 15 minutes.
-            if self.counter == 180:
-                for key, val in self.data.items():
-                    # Keep last 7 minutes since 5 + 7 = 12: 5 seconds * 84 = 7 minutes. Optimises performance.
-                    self.data[key] = val[-84:]
-                self.counter -= 60
+            # before it, totalling 12 minutes.
+            test = frame.asfreq('5T')
+            if len(test.index) > 3:
+                new_frame = frame.copy()
+                new_frame['timestamp'] = list(new_frame.index)
+                data = new_frame.drop(new_frame.index[:new_frame.index.get_loc(test.index[-3])]).to_dict('list')
+                for key in self.data:
+                    self.data[key] = data[key]
+            if time_interval > 300:
+                for key in record:
+                    if key == 'timestamp':
+                        if end != start:
+                            self.data[key] = np.array([start])
+                    else:
+                        if end != start:
+                            self.data[key] = np.array([np.nan])
+                self.start_time = self.data['timestamp'][-1]
+                for key in record:
+                    if self.data.get(key) is None:
+                        self.data[key] = np.array([])
+                    self.data[key] = np.append(self.data[key], record[key])
             else:
-                # Make 10 minute gusts before 12 minutes nans because data is insufficient.
-                frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
-            frame.fillna(np.nan, inplace=True)
-            return frame
+                self.start_time = self.data['timestamp'][-1]
+            return test
 
     def _calculate_averages(self):
-        frame = pd.DataFrame(self.data)
         KNOTS_9 = calc.knots_to_mps(9.)
         KNOTS_5 = calc.knots_to_mps(5.)
+        frame = pd.DataFrame(self.data)
         frame.set_index('timestamp', inplace=True)
         frame.mask(frame == -99999., inplace=True)
-        frame.fillna(value=np.nan, inplace=True)
-        # Add wind direction components so we can average wind direction properly
+        # Add wind direction components so we can average wind direction properly.
         frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
                                                                                  frame['wind_dir'])
         frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north'])
@@ -110,18 +136,20 @@ class Updater(object):
                      "it from air temp and relative humidity")
             frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])
 
-        # 2 minute rolling average of 5 second data.
+        # 2 minute rolling average.
         winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean()
         frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
         frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
-        # TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG.
         # 1 minute rolling peaks
         wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max()
         # criteria for a fast wind to be considered a wind gust
         gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\
                     (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5)
-        frame['cur_gust'] = wind_peak_1m.mask(~gust_mask)
+        frame['gust_1m'] = wind_peak_1m.mask(~gust_mask)
         frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
+        # Make 10 minute gusts before 12 minutes nans because data is insufficient.
+        if (self.data['timestamp'][-1] - self.data['timestamp'][0]).total_seconds() < 720:
+            frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
         return frame
 
 
@@ -136,6 +164,9 @@ def convert_to_influx_frame(record_gen, symbols, debug=False):
 
 
 def construct_url(data):
+    for key, val in data.items():
+        if val is None or isinstance(val, float) and np.isnan(val):
+            data[key] = ''
     return ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?'
             'ID={ID}&'
             'PASSWORD={PASSWORD}&'
@@ -161,26 +192,25 @@ def get_url_data(avg, wu_id, wu_pw):
     # Information on what paramaters that can be sent:
     # https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298
     # For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute.
-    timestamp = avg.index[-1].round('1T').isoformat('+')
+    timestamp = avg.index[-1].isoformat('+')
     wind_dir = avg['wind_dir'][-1]
     wind_dir_2m = avg['wind_dir_2m'][-1]
+    rel_hum = avg['rel_hum'][-1]
+    solar_flux = avg['solar_flux'][-1]
+    precip = avg['precip'][-1]
+    accum_precip = avg['accum_precip'][-1]
     # Converts from m/s to mph.
     wind_speed = avg['wind_speed'][-1] * 2.23694
     wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694
-    cur_gust = avg['cur_gust'][-1] * 2.23694
+    gust_1m = avg['gust_1m'][-1] * 2.23694
     gust_10m = avg['gust_10m'][-1] * 2.23694
-    rel_hum = avg['rel_hum'][-1]
     # Converts degrees Celsius to degrees Fahrenheit
     air_temp = avg['air_temp'][-1] * 9. / 5. + 32.
+    dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
     # hpa to barometric pressure inches
     pressure = avg['pressure'][-1] * 0.02952998016471232
-    # degrees Celcus to degrees Fahrenheit.
-    dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
-    solar_flux = avg['solar_flux'][-1]
-    precip = avg['precip'][-1]
-    accum_precip = avg['accum_precip'][-1]
     return {'ID': wu_id, 'PASSWORD': wu_pw, 'dateutc': timestamp, 'winddir': wind_dir, 'winddir_avg2m': wind_dir_2m,
-            'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': cur_gust,
+            'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': gust_1m,
             'windgustmph_10m': gust_10m, 'humidity': rel_hum, 'tempf': air_temp, 'baromin': pressure,
             'dewptf': dewpoint, 'solarradiation': solar_flux, 'rainin': precip, 'dailyrainin': accum_precip}
 
@@ -247,14 +277,17 @@ def main():
         influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
         influx_gen = influxdb.grouper(influx_gen, args.bulk)
         updater = Updater()
+        import time
+        t0 = time.time()
         for record in influx_gen:
-            lines = influxdb.frame_records(record, **station_tags)
-            influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
+            # lines = influxdb.frame_records(record, **station_tags)
+            # influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
             # Record is in a list of size 1, but want just the record.
             avg = updater.rolling_average(record[0])
             # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
             if avg is not None:
                 url = construct_url(get_url_data(avg, args.wu_id, wu_pw))
+                print(url)
                 if wu_pw and args.ldmp:
                     resp = requests.post(url)
                     if resp.status_code != 200:
@@ -263,7 +296,10 @@ def main():
 
             if args.sleep_interval:
                 time.sleep(args.sleep_interval)
+        t1 = time.time()
+        print(t1 - t0)
     except (RuntimeError, ValueError, KeyError, requests.RequestException):
+        raise
         if hasattr(record_gen, 'close'):
             record_gen.close()
 
-- 
GitLab