diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index 8967b98356ade393ac5f453a8f3299d8bb50d366..4dee9a4fd4b5fb50389517b0538a4862fc97922d 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -8,8 +8,15 @@ import logging.handlers import time import sys import requests +from datetime import timedelta +from urllib.parse import urlencode from metobscommon import influxdb from aosstower.level_00.parser import read_frames +from metobscommon.util import calc +from metobscommon.util.nc import calculate_wind_gust +import numpy as np +import pandas as pd +import warnings LOG = logging.getLogger(__name__) # map station name to InfluxDB tags @@ -48,6 +55,78 @@ SYMBOL_CONVERSIONS = { SYMBOLS = list(SYMBOL_CONVERSIONS.values()) +class Updater(object): + """Append weather record (taken as a dict) and do averages when enough data is ready. + + At least 12 minutes of data is required to do averaging for gust_10m: 10 minutes of wind gusts, + and wind gusts need 2 minutes of data to calculate. + + This class is created once at startup and calls rolling_average every time new data is available which tries to do + averaging every submit_interval of data added. + """ + def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)): + """intervals are timedelta objects.""" + self.data = {'timestamp': np.array([])} + self.data_interval = data_interval.total_seconds() + self.submit_interval = submit_interval.total_seconds() + + def rolling_average(self, record): + # Keeps data within 12 minutes. + time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12) + # Appending to a DataFrame is slow. Instead, this adds to a np array in chunks and passes it to the DataFrame. + for key in record: + if self.data.get(key) is None: + self.data[key] = np.array([]) + self.data[key] = np.append(self.data[key][time_mask], record[key]) + # Gets the seconds from the start of the current year. This makes any interval equal to or smaller than a + # year an absolute interval (ie if the submit interval was 5 minutes, then data is submitted at 5, 10, 15, + # etc). For example: if "reference" used the start of the current minute and we wanted to submit anything in + # longer intervals than a minute, then the times that data is submitted would not be consistent. i.e. if the + # current minute was 13 and the submit interval was 5 minutes, then data would be submitted at 18, 23, 28, + # etc. If data collection went down and the next recorded current minute was 14, then data would be submitted + # at 19, 24, 29, etc (inconsistent). + reference = pd.datetime(record['timestamp'].year, 1, 1) + progress = (record['timestamp'] - reference).total_seconds() % self.submit_interval + # If data hits or will pass over a submit_interval interval, return data. + if progress == 0 or progress > self.submit_interval - self.data_interval: + return self._calculate_averages() + + def _calculate_averages(self): + index = self.data.pop('timestamp') + frame = pd.DataFrame(self.data, index=index) + frame.mask(frame == -99999., inplace=True) + # Add wind direction components so we can average wind direction properly. + frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], + frame['wind_dir']) + frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north']) + + if 'air_temp' in frame and 'rh' in frame and ('dewpoint' in frame or 'dewpoint_mean' in frame): + LOG.info("'dewpoint' is missing from the input file, will calculate " + "it from air temp and relative humidity") + frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) + + # https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases + # 2 minute rolling average. + winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean() + frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] + frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) + # Makes 2 minute averages nans if given less than 2 minutes of data. + if len(frame[frame.index > frame.index[-1] - timedelta(minutes=2)]) < 120 / self.data_interval: + frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True) + frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True) + # 1 minute rolling peaks + wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max() + # criteria for a fast wind to be considered a wind gust + gust_mask = (winds_frame_2m['wind_speed'] >= calc.knots_to_mps(9.)) &\ + (wind_peak_1m >= winds_frame_2m['wind_speed'] + calc.knots_to_mps(5.)) + frame['gust_1m'] = wind_peak_1m.mask(~gust_mask) + frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) + # Makes 10 minute gusts before 12 minutes nans because data is insufficient. + if len(frame) < 720 / self.data_interval: + frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) + return frame.fillna(value=np.nan) + + def convert_to_influx_frame(record_gen, symbols, debug=False): for idx, record in enumerate(record_gen): LOG.info("Inserting records for frame %d", idx) @@ -58,6 +137,42 @@ def convert_to_influx_frame(record_gen, symbols, debug=False): yield record +def construct_url(data): + # Sends data as empty string to show that recording worked, but that the data is nonexistent. + for key, val in data.items(): + if val is None or isinstance(val, float) and np.isnan(val): + data[key] = '' + # Makes url be "url escaped". + return 'http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + urlencode(data) + + +def get_url_data(avg, wu_id, wu_pw): + # Information on what paramaters that can be sent: + # https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298 + timestamp = avg.index[-1] + wind_dir = avg['wind_dir'][-1] + wind_dir_2m = avg['wind_dir_2m'][-1] + rel_hum = avg['rel_hum'][-1] + solar_flux = avg['solar_flux'][-1] + precip = avg['precip'][-1] + accum_precip = avg['accum_precip'][-1] + # Converts from m/s to mph. + wind_speed = avg['wind_speed'][-1] * 2.23694 + wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694 + gust_1m = avg['gust_1m'][-1] * 2.23694 + gust_10m = avg['gust_10m'][-1] * 2.23694 + # Converts degrees Celsius to degrees Fahrenheit + air_temp = avg['air_temp'][-1] * 9. / 5. + 32. + dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32. + # hpa to barometric pressure inches + pressure = avg['pressure'][-1] * 0.02952998016471232 + return {'ID': wu_id, 'PASSWORD': wu_pw, 'dateutc': timestamp, 'winddir': wind_dir, 'winddir_avg2m': wind_dir_2m, + 'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': gust_1m, + 'windgustmph_10m': gust_10m, 'humidity': rel_hum, 'tempf': air_temp, 'baromin': pressure, + 'dewptf': dewpoint, 'solarradiation': solar_flux, 'rainin': precip, 'dailyrainin': accum_precip, + 'softwaretype': 'SSEC-RIG', 'action': 'updateraw'} + + def main(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -68,7 +183,8 @@ def main(): help=('Tail file forever, not returning. This will start at the end ' 'of the file and insert any new data added after starting')) parser.add_argument('--ldmp', action='store_true', - help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)') + help='Treat `src` file as a station name and read records from' + 'LoggerNet LDMP server (port: 1024)') parser.add_argument('--tables', nargs='*', default=['1'], help="LoggerNet LDMP tables to read in") parser.add_argument("--host", default=influxdb.DB_HOST, @@ -83,12 +199,12 @@ def main(): help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') parser.add_argument('--sleep-interval', type=float, help="Seconds to wait between submitting each record") - # parser.add_argument('--weather-underground', action='store_true', - # help="Send new records to wunderground.com") - # parser.add_argument('--wu-id', default='KWIMADIS52', - # help='Weather underground station ID') - # parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt', - # help='File containing the password for the weather underground upload') + parser.add_argument('--weather-underground', action='store_true', + help="Send new records to wunderground.com") + parser.add_argument('--wu-id', default='KWIMADIS52', + help='Weather underground station ID') + parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt', + help='File containing the password for the weather underground upload') parser.add_argument('--bulk', type=int, default=1, help="Number of records to buffer before sending to " "the database. For large inserts this should be " @@ -105,9 +221,9 @@ def main(): station_tags = STATIONS[args.station] symbols = SYMBOL_CONVERSIONS - # wu_pw = None - # if args.weather_underground: - # wu_pw = open(args.wu_password_file, 'r').read().strip() + wu_pw = None + if args.weather_underground: + wu_pw = open(args.wu_password_file, 'r').read().strip() if args.ldmp: from aosstower.level_00.parser import LDMPGenerator @@ -119,17 +235,21 @@ def main(): try: influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = influxdb.grouper(influx_gen, args.bulk) + updater = Updater() for record in influx_gen: lines = influxdb.frame_records(record, **station_tags) influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) + # Record is in a list of size 1, but want just the record. + avg = updater.rolling_average(record[0]) + # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. + if avg is not None: + url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) + if wu_pw and args.ldmp: + resp = requests.post(url) + if resp.status_code != 200: + warnings.warn('Data failed to upload to {0} with status code {1}: {2}'.format( + url, resp.status_code, resp.text)) - # if wu_pw and args.ldmp: - # stamp = record['timestamp'].isoformat('+') - # record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format( - # stamp=stamp, - # winddir=int(record['wind_dir']), - # ) - # resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG') if args.sleep_interval: time.sleep(args.sleep_interval) except (RuntimeError, ValueError, KeyError, requests.RequestException):