From bae47fc5fed4705142b973be92d5843b56d003a4 Mon Sep 17 00:00:00 2001
From: wroberts <wroberts4@wisc.edu>
Date: Mon, 10 Jun 2019 09:21:22 -0500
Subject: [PATCH] Add wunderground support

---
 aosstower/level_00/influxdb.py | 166 +++++++++++++++++++++++++++------
 1 file changed, 139 insertions(+), 27 deletions(-)

diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py
index 8967b98..976c932 100644
--- a/aosstower/level_00/influxdb.py
+++ b/aosstower/level_00/influxdb.py
@@ -10,6 +10,11 @@ import sys
 import requests
 from metobscommon import influxdb
 from aosstower.level_00.parser import read_frames
+from aosstower import schema
+from metobscommon.util import calc
+from metobscommon.util.nc import calculate_wind_gust
+import numpy as np
+import pandas as pd
 
 LOG = logging.getLogger(__name__)
 # map station name to InfluxDB tags
@@ -48,6 +53,70 @@ SYMBOL_CONVERSIONS = {
 SYMBOLS = list(SYMBOL_CONVERSIONS.values())
 
 
+class Updater(object):
+    """https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298"""
+    def __init__(self):
+        # Counter that returns rolling average every 5 minutes. independent of
+        # self.data since its length can be 144 == 12 minutes.
+        self.counter = 0
+        self.data = {}
+
+    def rolling_average(self, record):
+        KNOTS_9 = calc.knots_to_mps(9.)
+        KNOTS_5 = calc.knots_to_mps(5.)
+        self.counter += 1
+        database = {k: schema.database_dict[k] for k in schema.met_vars}
+        # Add new data to dict.
+        for key in record:
+            self.data.setdefault(key, []).append(record[key])
+        # 4:50 and 4:45. http://metobs.ssec.wisc.edu/aoss/tower/cgi-bin/data_data.py?&separator=,
+        # &symbols=p:t:td:dir:spd:flux:accum_precip:rh&begin=-00:05:00&end=&interval=
+        # If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is
+        # dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes.
+        if self.counter % 60 == 0:
+            # Appending to a DataFrame is slow. Instead add to a dict in chunks and pass it to the DataFrame.
+            frame = pd.DataFrame(self.data)
+            frame.set_index('timestamp', inplace=True)
+            frame.mask(frame == -99999., inplace=True)
+            frame.fillna(value=np.nan, inplace=True)
+            # Add wind direction components so we can average wind direction properly
+            frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
+                                                                                     frame['wind_dir'])
+            frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north'])
+
+            if 'air_temp' in frame and 'rh' in frame and ('dewpoint' in database or 'dewpoint_mean' in database):
+                LOG.info("'dewpoint' is missing from the input file, will calculate "
+                         "it from air temp and relative humidity")
+                frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])
+
+            # 2 minute rolling average of 5 second data.
+            winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean()
+            frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
+            frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
+            # TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG.
+            # 1 minute rolling peaks
+            wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max()
+            # criteria for a fast wind to be considered a wind gust
+            gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\
+                        (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5)
+            frame['cur_gust'] = wind_peak_1m.mask(~gust_mask)
+            frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
+            # Keep data set within minimum window to improve speed.
+            # Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data
+            # before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should
+            # release old data. Data is in 5 second intervals, and 5 seconds * 180 = 15 minutes.
+            if self.counter == 180:
+                for key, val in self.data.items():
+                    # Keep last 7 minutes since 5 + 7 = 12: 5 seconds * 84 = 7 minutes. Optimises performance.
+                    self.data[key] = val[-84:]
+                self.counter -= 60
+            else:
+                # Make 10 minute gusts at or before 10 minutes nans because data is insufficient.
+                frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
+            frame.fillna(np.nan, inplace=True)
+            return frame
+
+
 def convert_to_influx_frame(record_gen, symbols, debug=False):
     for idx, record in enumerate(record_gen):
         LOG.info("Inserting records for frame %d", idx)
@@ -83,12 +152,12 @@ def main():
                         help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
     parser.add_argument('--sleep-interval', type=float,
                         help="Seconds to wait between submitting each record")
-    # parser.add_argument('--weather-underground', action='store_true',
-    #                     help="Send new records to wunderground.com")
-    # parser.add_argument('--wu-id', default='KWIMADIS52',
-    #                     help='Weather underground station ID')
-    # parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
-    #                     help='File containing the password for the weather underground upload')
+    parser.add_argument('--weather-underground', action='store_true',
+                        help="Send new records to wunderground.com")
+    parser.add_argument('--wu-id', default='KWIMADIS52',
+                        help='Weather underground station ID')
+    parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
+                        help='File containing the password for the weather underground upload')
     parser.add_argument('--bulk', type=int, default=1,
                         help="Number of records to buffer before sending to "
                              "the database. For large inserts this should be "
@@ -105,9 +174,9 @@ def main():
     station_tags = STATIONS[args.station]
     symbols = SYMBOL_CONVERSIONS
 
-    # wu_pw = None
-    # if args.weather_underground:
-    #     wu_pw = open(args.wu_password_file, 'r').read().strip()
+    wu_pw = None
+    if args.weather_underground:
+        wu_pw = open(args.wu_password_file, 'r').read().strip()
 
     if args.ldmp:
         from aosstower.level_00.parser import LDMPGenerator
@@ -116,25 +185,68 @@ def main():
         src = open(args.src, "r")
         record_gen = read_frames(src, tail=args.tail)
 
-    try:
-        influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
-        influx_gen = influxdb.grouper(influx_gen, args.bulk)
-        for record in influx_gen:
-            lines = influxdb.frame_records(record, **station_tags)
-            influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
-
+    influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
+    influx_gen = influxdb.grouper(influx_gen, args.bulk)
+    updater = Updater()
+    for record in influx_gen:
+        # lines = influxdb.frame_records(record, **station_tags)
+        # influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
+        # Record is in a list of size 1, but want just the record.
+        avg = updater.rolling_average(record[0])
+        # Once every 5 minutes: 0 through 295 seconds inclusive.
+        if avg is not None:
+            # For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute.
+            timestamp = avg.index[-1].round('1T').isoformat('+')
+            wind_dir = avg['wind_dir'][-1]
+            wind_dir_2m = avg['wind_dir_2m'][-1]
+            # Converts from m/s to mph.
+            wind_speed = avg['wind_speed'][-1] * 2.23694
+            wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694
+            cur_gust = avg['cur_gust'][-1] * 2.23694
+            gust_10m = avg['gust_10m'][-1] * 2.23694
+            rel_hum = avg['rel_hum'][-1]
+            # Converts degrees Celsius to degrees Fahrenheit
+            air_temp = avg['air_temp'][-1] * 9. / 5. + 32.
+            # hpa to barometric pressure inches
+            pressure = avg['pressure'][-1] * 0.02952998016471232
+            # degrees Celcus to degrees Fahrenheit.
+            dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
+            solar_flux = avg['solar_flux'][-1]
+            precip = avg['precip'][-1]
+            accum_precip = avg['accum_precip'][-1]
+            url = ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?'
+                   'ID={wu_id}&'
+                   'PASSWORD={wu_pw}&'
+                   'action=updateraw&'
+                   'dateutc={timestamp}&'
+                   'winddir={wind_dir}&'
+                   'winddir_avg2m={wind_dir_2m}&'
+                   'windspeedmph={wind_speed}&'
+                   'windspdmph_avg2m={wind_speed_2m}&'
+                   'windgustmph={cur_gust}&'
+                   'windgustmph_10m={gust_10m}&'
+                   'humidity={rel_hum}&'
+                   'tempf={air_temp}&'
+                   'baromin={pressure}&'
+                   'dewptf={dewpoint}&'
+                   'solarradiation={solar_flux}&'
+                   'rainin={precip}&'
+                   'dailyrainin={accum_precip}&'
+                   'softwaretype=SSEC-RIG').format(wu_id=args.wu_id, wu_pw=wu_pw,
+                                                   timestamp=timestamp, wind_dir=wind_dir, wind_dir_2m=wind_dir_2m,
+                                                   wind_speed=wind_speed, wind_speed_2m=wind_speed_2m,
+                                                   cur_gust=cur_gust, gust_10m=gust_10m, rel_hum=rel_hum,
+                                                   air_temp=air_temp, pressure=pressure, dewpoint=dewpoint,
+                                                   solar_flux=solar_flux, precip=precip, accum_precip=accum_precip)
+            print(url)
             # if wu_pw and args.ldmp:
-            #     stamp = record['timestamp'].isoformat('+')
-            #     record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format(
-            #         stamp=stamp,
-            #         winddir=int(record['wind_dir']),
-            #     )
-            #     resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG')
-            if args.sleep_interval:
-                time.sleep(args.sleep_interval)
-    except (RuntimeError, ValueError, KeyError, requests.RequestException):
-        if hasattr(record_gen, 'close'):
-            record_gen.close()
+            #     # TODO: CHECK FOR SUCCESS RESPONSE PING AFTER SENDING.
+            #     resp = requests.post(url)
+            #     if resp != 'success':
+            #         raise ValueError('Data not received.')
+
+        if args.sleep_interval:
+            time.sleep(args.sleep_interval)
 
 
 if __name__ == "__main__":
-- 
GitLab