Skip to content
Snippets Groups Projects
Commit bae47fc5 authored by William Roberts's avatar William Roberts
Browse files

Add wunderground support

parent 0efda301
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,11 @@ import sys
import requests
from metobscommon import influxdb
from aosstower.level_00.parser import read_frames
from aosstower import schema
from metobscommon.util import calc
from metobscommon.util.nc import calculate_wind_gust
import numpy as np
import pandas as pd
LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags
......@@ -48,6 +53,70 @@ SYMBOL_CONVERSIONS = {
SYMBOLS = list(SYMBOL_CONVERSIONS.values())
class Updater(object):
"""https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298"""
def __init__(self):
# Counter that returns rolling average every 5 minutes. independent of
# self.data since its length can be 144 == 12 minutes.
self.counter = 0
self.data = {}
def rolling_average(self, record):
KNOTS_9 = calc.knots_to_mps(9.)
KNOTS_5 = calc.knots_to_mps(5.)
self.counter += 1
database = {k: schema.database_dict[k] for k in schema.met_vars}
# Add new data to dict.
for key in record:
self.data.setdefault(key, []).append(record[key])
# 4:50 and 4:45. http://metobs.ssec.wisc.edu/aoss/tower/cgi-bin/data_data.py?&separator=,
# &symbols=p:t:td:dir:spd:flux:accum_precip:rh&begin=-00:05:00&end=&interval=
# If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is
# dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes.
if self.counter % 60 == 0:
# Appending to a DataFrame is slow. Instead add to a dict in chunks and pass it to the DataFrame.
frame = pd.DataFrame(self.data)
frame.set_index('timestamp', inplace=True)
frame.mask(frame == -99999., inplace=True)
frame.fillna(value=np.nan, inplace=True)
# Add wind direction components so we can average wind direction properly
frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
frame['wind_dir'])
frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north'])
if 'air_temp' in frame and 'rh' in frame and ('dewpoint' in database or 'dewpoint_mean' in database):
LOG.info("'dewpoint' is missing from the input file, will calculate "
"it from air temp and relative humidity")
frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])
# 2 minute rolling average of 5 second data.
winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T').mean()
frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
# TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG.
# 1 minute rolling peaks
wind_peak_1m = frame['wind_speed'].rolling(window='1T', center=False).max()
# criteria for a fast wind to be considered a wind gust
gust_mask = (winds_frame_2m['wind_speed'] >= KNOTS_9) &\
(wind_peak_1m >= winds_frame_2m['wind_speed'] + KNOTS_5)
frame['cur_gust'] = wind_peak_1m.mask(~gust_mask)
frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
# Keep data set within minimum window to improve speed.
# Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data
# before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should
# release old data. Data is in 5 second intervals, and 5 seconds * 180 = 15 minutes.
if self.counter == 180:
for key, val in self.data.items():
# Keep last 7 minutes since 5 + 7 = 12: 5 seconds * 84 = 7 minutes. Optimises performance.
self.data[key] = val[-84:]
self.counter -= 60
else:
# Make 10 minute gusts at or before 10 minutes nans because data is insufficient.
frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
frame.fillna(np.nan, inplace=True)
return frame
def convert_to_influx_frame(record_gen, symbols, debug=False):
for idx, record in enumerate(record_gen):
LOG.info("Inserting records for frame %d", idx)
......@@ -83,12 +152,12 @@ def main():
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
parser.add_argument('--sleep-interval', type=float,
help="Seconds to wait between submitting each record")
# parser.add_argument('--weather-underground', action='store_true',
# help="Send new records to wunderground.com")
# parser.add_argument('--wu-id', default='KWIMADIS52',
# help='Weather underground station ID')
# parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
# help='File containing the password for the weather underground upload')
parser.add_argument('--weather-underground', action='store_true',
help="Send new records to wunderground.com")
parser.add_argument('--wu-id', default='KWIMADIS52',
help='Weather underground station ID')
parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
help='File containing the password for the weather underground upload')
parser.add_argument('--bulk', type=int, default=1,
help="Number of records to buffer before sending to "
"the database. For large inserts this should be "
......@@ -105,9 +174,9 @@ def main():
station_tags = STATIONS[args.station]
symbols = SYMBOL_CONVERSIONS
# wu_pw = None
# if args.weather_underground:
# wu_pw = open(args.wu_password_file, 'r').read().strip()
wu_pw = None
if args.weather_underground:
wu_pw = open(args.wu_password_file, 'r').read().strip()
if args.ldmp:
from aosstower.level_00.parser import LDMPGenerator
......@@ -116,25 +185,68 @@ def main():
src = open(args.src, "r")
record_gen = read_frames(src, tail=args.tail)
try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk)
for record in influx_gen:
lines = influxdb.frame_records(record, **station_tags)
influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk)
updater = Updater()
for record in influx_gen:
# lines = influxdb.frame_records(record, **station_tags)
# influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
# Record is in a list of size 1, but want just the record.
avg = updater.rolling_average(record[0])
# Once every 5 minutes: 0 through 295 seconds inclusive.
if avg is not None:
# For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute.
timestamp = avg.index[-1].round('1T').isoformat('+')
wind_dir = avg['wind_dir'][-1]
wind_dir_2m = avg['wind_dir_2m'][-1]
# Converts from m/s to mph.
wind_speed = avg['wind_speed'][-1] * 2.23694
wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694
cur_gust = avg['cur_gust'][-1] * 2.23694
gust_10m = avg['gust_10m'][-1] * 2.23694
rel_hum = avg['rel_hum'][-1]
# Converts degrees Celsius to degrees Fahrenheit
air_temp = avg['air_temp'][-1] * 9. / 5. + 32.
# hpa to barometric pressure inches
pressure = avg['pressure'][-1] * 0.02952998016471232
# degrees Celcus to degrees Fahrenheit.
dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
solar_flux = avg['solar_flux'][-1]
precip = avg['precip'][-1]
accum_precip = avg['accum_precip'][-1]
url = ('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?'
'ID={wu_id}&'
'PASSWORD={wu_pw}&'
'action=updateraw&'
'dateutc={timestamp}&'
'winddir={wind_dir}&'
'winddir_avg2m={wind_dir_2m}&'
'windspeedmph={wind_speed}&'
'windspdmph_avg2m={wind_speed_2m}&'
'windgustmph={cur_gust}&'
'windgustmph_10m={gust_10m}&'
'humidity={rel_hum}&'
'tempf={air_temp}&'
'baromin={pressure}&'
'dewptf={dewpoint}&'
'solarradiation={solar_flux}&'
'rainin={precip}&'
'dailyrainin={accum_precip}&'
'softwaretype=SSEC-RIG').format(wu_id=args.wu_id, wu_pw=wu_pw,
timestamp=timestamp, wind_dir=wind_dir, wind_dir_2m=wind_dir_2m,
wind_speed=wind_speed, wind_speed_2m=wind_speed_2m,
cur_gust=cur_gust, gust_10m=gust_10m, rel_hum=rel_hum,
air_temp=air_temp, pressure=pressure, dewpoint=dewpoint,
solar_flux=solar_flux, precip=precip, accum_precip=accum_precip)
print(url)
# if wu_pw and args.ldmp:
# stamp = record['timestamp'].isoformat('+')
# record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format(
# stamp=stamp,
# winddir=int(record['wind_dir']),
# )
# resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG')
if args.sleep_interval:
time.sleep(args.sleep_interval)
except (RuntimeError, ValueError, KeyError, requests.RequestException):
if hasattr(record_gen, 'close'):
record_gen.close()
# # TODO: CHECK FOR SUCCESS RESPONSE PING AFTER SENDING.
# resp = requests.post(url)
# if resp != 'success':
# raise ValueError('Data not received.')
if args.sleep_interval:
time.sleep(args.sleep_interval)
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment