From d997cf2296c9635bb0d79c2f3fc759268945924e Mon Sep 17 00:00:00 2001 From: wroberts <wroberts4@wisc.edu> Date: Tue, 18 Jun 2019 12:57:41 -0500 Subject: [PATCH] Improve style and add warning if data fails to upload --- aosstower/level_00/influxdb.py | 191 ++++++++++++++++++--------------- 1 file changed, 104 insertions(+), 87 deletions(-) diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index 8dc96ba..5bfcb78 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -15,6 +15,7 @@ from metobscommon.util import calc from metobscommon.util.nc import calculate_wind_gust import numpy as np import pandas as pd +import warnings LOG = logging.getLogger(__name__) # map station name to InfluxDB tags @@ -54,53 +55,30 @@ SYMBOLS = list(SYMBOL_CONVERSIONS.values()) class Updater(object): - """https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298""" + """Append weather record (taken as a dict) and do averages when enough data is ready. + + At least 12 minutes of data is required to do averaging: 10 minutes of wind gusts, + and wind gusts need 2 minutes of data to calculate. + + This class is created once at startup and calls rolling_average every time new data is available which tries to do + averaging every 5 minutes of data added. + """ def __init__(self): # Counter that returns rolling average every 5 minutes. independent of - # self.data since its length can be 144 == 12 minutes. + # self.data since self.data length can be 144 == 12 minutes. self.counter = 0 self.data = {} def rolling_average(self, record): - KNOTS_9 = calc.knots_to_mps(9.) - KNOTS_5 = calc.knots_to_mps(5.) self.counter += 1 - database = {k: schema.database_dict[k] for k in schema.met_vars} # Add new data to dict. for key in record: self.data.setdefault(key, []).append(record[key]) - # 4:50 and 4:45. http://metobs.ssec.wisc.edu/aoss/tower/cgi-bin/data_data.py?&separator=, - # &symbols=p:t:td:dir:spd:flux:accum_precip:rh&begin=-00:05:00&end=&interval= # If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is # dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes. if self.counter % 60 == 0: - # Appending to a DataFrame is slow. Instead add to a dict in chunks and pass it to the DataFrame. - frame = pd.DataFrame(self.data) - frame.set_index('timestamp', inplace=True) - frame.mask(frame == -99999., inplace=True) - frame.fillna(value=np.nan, inplace=True) - # Add wind direction components so we can average wind direction properly - frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], - frame['wind_dir']) - frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north']) - - if 'air_temp' in frame and 'rh' in frame and ('dewpoint' in database or 'dewpoint_mean' in database): - LOG.info("'dewpoint' is missing from the input file, will calculate " - "it from air temp and relative humidity") - frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) - - # 2 minute rolling average of 5 second data. - winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean() - frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] - frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) - # TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG. - # 1 minute rolling peaks - wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max() - # criteria for a fast wind to be considered a wind gust - gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ - (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) - frame['cur_gust'] = wind_peak_1m.mask(~gust_mask) - frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) + # Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame. + frame = self._calculate_averages() # Keep data set within minimum window to improve speed. # Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data # before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should @@ -111,11 +89,43 @@ class Updater(object): self.data[key] = val[-84:] self.counter -= 60 else: - # Make 10 minute gusts at or before 10 minutes nans because data is insufficient. + # Make 10 minute gusts before 12 minutes nans because data is insufficient. frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) frame.fillna(np.nan, inplace=True) return frame + def _calculate_averages(self): + frame = pd.DataFrame(self.data) + KNOTS_9 = calc.knots_to_mps(9.) + KNOTS_5 = calc.knots_to_mps(5.) + database = {k: schema.database_dict[k] for k in schema.met_vars} + frame.set_index('timestamp', inplace=True) + frame.mask(frame == -99999., inplace=True) + frame.fillna(value=np.nan, inplace=True) + # Add wind direction components so we can average wind direction properly + frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], + frame['wind_dir']) + frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north']) + + if 'air_temp' in frame and 'rh' in frame and ('dewpoint' in database or 'dewpoint_mean' in database): + LOG.info("'dewpoint' is missing from the input file, will calculate " + "it from air temp and relative humidity") + frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) + + # 2 minute rolling average of 5 second data. + winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean() + frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] + frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) + # TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG. + # 1 minute rolling peaks + wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max() + # criteria for a fast wind to be considered a wind gust + gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\ + (wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5) + frame['cur_gust'] = wind_peak_1m.mask(~gust_mask) + frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) + return frame + def convert_to_influx_frame(record_gen, symbols, debug=False): for idx, record in enumerate(record_gen): @@ -127,6 +137,56 @@ def convert_to_influx_frame(record_gen, symbols, debug=False): yield record +def construct_url(data): + return ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + 'ID={ID}&' + 'PASSWORD={PASSWORD}&' + 'action=updateraw&' + 'dateutc={dateutc}&' + 'winddir={winddir}&' + 'winddir_avg2m={winddir_avg2m}&' + 'windspeedmph={windspeedmph}&' + 'windspdmph_avg2m={windspdmph_avg2m}&' + 'windgustmph={windgustmph}&' + 'windgustmph_10m={windgustmph_10m}&' + 'humidity={humidity}&' + 'tempf={tempf}&' + 'baromin={baromin}&' + 'dewptf={dewptf}&' + 'solarradiation={solarradiation}&' + 'rainin={rainin}&' + 'dailyrainin={dailyrainin}&' + 'softwaretype=SSEC-RIG').format(**data) + + +def get_url_data(avg, wu_id, wu_pw): + # Information on what paramaters that can be sent: + # https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298 + # For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute. + timestamp = avg.index[-1].round('1T').isoformat('+') + wind_dir = avg['wind_dir'][-1] + wind_dir_2m = avg['wind_dir_2m'][-1] + # Converts from m/s to mph. + wind_speed = avg['wind_speed'][-1] * 2.23694 + wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694 + cur_gust = avg['cur_gust'][-1] * 2.23694 + gust_10m = avg['gust_10m'][-1] * 2.23694 + rel_hum = avg['rel_hum'][-1] + # Converts degrees Celsius to degrees Fahrenheit + air_temp = avg['air_temp'][-1] * 9. / 5. + 32. + # hpa to barometric pressure inches + pressure = avg['pressure'][-1] * 0.02952998016471232 + # degrees Celcus to degrees Fahrenheit. + dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32. + solar_flux = avg['solar_flux'][-1] + precip = avg['precip'][-1] + accum_precip = avg['accum_precip'][-1] + return {'ID': wu_id, 'PASSWORD': wu_pw, 'dateutc': timestamp, 'winddir': wind_dir, 'winddir_avg2m': wind_dir_2m, + 'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': cur_gust, + 'windgustmph_10m': gust_10m, 'humidity': rel_hum, 'tempf': air_temp, 'baromin': pressure, + 'dewptf': dewpoint, 'solarradiation': solar_flux, 'rainin': precip, 'dailyrainin': accum_precip} + + def main(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -190,61 +250,18 @@ def main(): influx_gen = influxdb.grouper(influx_gen, args.bulk) updater = Updater() for record in influx_gen: - # lines = influxdb.frame_records(record, **station_tags) - # influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) + lines = influxdb.frame_records(record, **station_tags) + influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) # Record is in a list of size 1, but want just the record. avg = updater.rolling_average(record[0]) - # Once every 5 minutes: 0 through 295 seconds inclusive. + # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. if avg is not None: - # For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute. - timestamp = avg.index[-1].round('1T').isoformat('+') - wind_dir = avg['wind_dir'][-1] - wind_dir_2m = avg['wind_dir_2m'][-1] - # Converts from m/s to mph. - wind_speed = avg['wind_speed'][-1] * 2.23694 - wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694 - cur_gust = avg['cur_gust'][-1] * 2.23694 - gust_10m = avg['gust_10m'][-1] * 2.23694 - rel_hum = avg['rel_hum'][-1] - # Converts degrees Celsius to degrees Fahrenheit - air_temp = avg['air_temp'][-1] * 9. / 5. + 32. - # hpa to barometric pressure inches - pressure = avg['pressure'][-1] * 0.02952998016471232 - # degrees Celcus to degrees Fahrenheit. - dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32. - solar_flux = avg['solar_flux'][-1] - precip = avg['precip'][-1] - accum_precip = avg['accum_precip'][-1] - url = ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' - 'ID={wu_id}&' - 'PASSWORD={wu_pw}&' - 'action=updateraw&' - 'dateutc={timestamp}&' - 'winddir={wind_dir}&' - 'winddir_avg2m={wind_dir_2m}&' - 'windspeedmph={wind_speed}&' - 'windspdmph_avg2m={wind_speed_2m}&' - 'windgustmph={cur_gust}&' - 'windgustmph_10m={gust_10m}&' - 'humidity={rel_hum}&' - 'tempf={air_temp}&' - 'baromin={pressure}&' - 'dewptf={dewpoint}&' - 'solarradiation={solar_flux}&' - 'rainin={precip}&' - 'dailyrainin={accum_precip}&' - 'softwaretype=SSEC-RIG').format(wu_id=args.wu_id, wu_pw=wu_pw, - timestamp=timestamp, wind_dir=wind_dir, wind_dir_2m=wind_dir_2m, - wind_speed=wind_speed, wind_speed_2m=wind_speed_2m, - cur_gust=cur_gust, gust_10m=gust_10m, rel_hum=rel_hum, - air_temp=air_temp, pressure=pressure, dewpoint=dewpoint, - solar_flux=solar_flux, precip=precip, accum_precip=accum_precip) - print(url) - # if wu_pw and args.ldmp: - # # TODO: CHECK FOR SUCCESS RESPONSE PING AFTER SENDING. - # resp = requests.post(url) - # if resp != 'success': - # raise ValueError('Data not received.') + url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) + if wu_pw and args.ldmp: + resp = requests.post(url) + if resp.status_code != 200: + warnings.warn('Data failed to upload to {0} with status code {1}: {2}'.format( + url, resp.status_code, resp.text)) if args.sleep_interval: time.sleep(args.sleep_interval) -- GitLab