Skip to content
Snippets Groups Projects
Commit d6386d31 authored by David Hoese's avatar David Hoese
Browse files

Merge branch 'wunderground' into 'master'

Add Wunderground Support

See merge request metobs/AossTower!1
parents 0efda301 46f06731
No related branches found
No related tags found
No related merge requests found
...@@ -8,8 +8,15 @@ import logging.handlers ...@@ -8,8 +8,15 @@ import logging.handlers
import time import time
import sys import sys
import requests import requests
from datetime import timedelta
from urllib.parse import urlencode
from metobscommon import influxdb from metobscommon import influxdb
from aosstower.level_00.parser import read_frames from aosstower.level_00.parser import read_frames
from metobscommon.util import calc
from metobscommon.util.nc import calculate_wind_gust
import numpy as np
import pandas as pd
import warnings
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags # map station name to InfluxDB tags
...@@ -48,6 +55,78 @@ SYMBOL_CONVERSIONS = { ...@@ -48,6 +55,78 @@ SYMBOL_CONVERSIONS = {
SYMBOLS = list(SYMBOL_CONVERSIONS.values()) SYMBOLS = list(SYMBOL_CONVERSIONS.values())
class Updater(object):
"""Append weather record (taken as a dict) and do averages when enough data is ready.
At least 12 minutes of data is required to do averaging for gust_10m: 10 minutes of wind gusts,
and wind gusts need 2 minutes of data to calculate.
This class is created once at startup and calls rolling_average every time new data is available which tries to do
averaging every submit_interval of data added.
"""
def __init__(self, data_interval=timedelta(seconds=5), submit_interval=timedelta(minutes=5)):
"""intervals are timedelta objects."""
self.data = {'timestamp': np.array([])}
self.data_interval = data_interval.total_seconds()
self.submit_interval = submit_interval.total_seconds()
def rolling_average(self, record):
# Keeps data within 12 minutes.
time_mask = self.data['timestamp'] > record['timestamp'] - timedelta(minutes=12)
# Appending to a DataFrame is slow. Instead, this adds to a np array in chunks and passes it to the DataFrame.
for key in record:
if self.data.get(key) is None:
self.data[key] = np.array([])
self.data[key] = np.append(self.data[key][time_mask], record[key])
# Gets the seconds from the start of the current year. This makes any interval equal to or smaller than a
# year an absolute interval (ie if the submit interval was 5 minutes, then data is submitted at 5, 10, 15,
# etc). For example: if "reference" used the start of the current minute and we wanted to submit anything in
# longer intervals than a minute, then the times that data is submitted would not be consistent. i.e. if the
# current minute was 13 and the submit interval was 5 minutes, then data would be submitted at 18, 23, 28,
# etc. If data collection went down and the next recorded current minute was 14, then data would be submitted
# at 19, 24, 29, etc (inconsistent).
reference = pd.datetime(record['timestamp'].year, 1, 1)
progress = (record['timestamp'] - reference).total_seconds() % self.submit_interval
# If data hits or will pass over a submit_interval interval, return data.
if progress == 0 or progress > self.submit_interval - self.data_interval:
return self._calculate_averages()
def _calculate_averages(self):
index = self.data.pop('timestamp')
frame = pd.DataFrame(self.data, index=index)
frame.mask(frame == -99999., inplace=True)
# Add wind direction components so we can average wind direction properly.
frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'],
frame['wind_dir'])
frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north'])
if 'air_temp' in frame and 'rh' in frame and ('dewpoint' in frame or 'dewpoint_mean' in frame):
LOG.info("'dewpoint' is missing from the input file, will calculate "
"it from air temp and relative humidity")
frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])
# https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases
# 2 minute rolling average.
winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean()
frame['wind_speed_2m'] = winds_frame_2m['wind_speed']
frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north'])
# Makes 2 minute averages nans if given less than 2 minutes of data.
if len(frame[frame.index > frame.index[-1] - timedelta(minutes=2)]) < 120 / self.data_interval:
frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True)
frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True)
# 1 minute rolling peaks
wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max()
# criteria for a fast wind to be considered a wind gust
gust_mask = (winds_frame_2m['wind_speed'] >= calc.knots_to_mps(9.)) &\
(wind_peak_1m >= winds_frame_2m['wind_speed'] + calc.knots_to_mps(5.))
frame['gust_1m'] = wind_peak_1m.mask(~gust_mask)
frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed'])
# Makes 10 minute gusts before 12 minutes nans because data is insufficient.
if len(frame) < 720 / self.data_interval:
frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True)
return frame.fillna(value=np.nan)
def convert_to_influx_frame(record_gen, symbols, debug=False): def convert_to_influx_frame(record_gen, symbols, debug=False):
for idx, record in enumerate(record_gen): for idx, record in enumerate(record_gen):
LOG.info("Inserting records for frame %d", idx) LOG.info("Inserting records for frame %d", idx)
...@@ -58,6 +137,42 @@ def convert_to_influx_frame(record_gen, symbols, debug=False): ...@@ -58,6 +137,42 @@ def convert_to_influx_frame(record_gen, symbols, debug=False):
yield record yield record
def construct_url(data):
# Sends data as empty string to show that recording worked, but that the data is nonexistent.
for key, val in data.items():
if val is None or isinstance(val, float) and np.isnan(val):
data[key] = ''
# Makes url be "url escaped".
return 'http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + urlencode(data)
def get_url_data(avg, wu_id, wu_pw):
# Information on what paramaters that can be sent:
# https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298
timestamp = avg.index[-1]
wind_dir = avg['wind_dir'][-1]
wind_dir_2m = avg['wind_dir_2m'][-1]
rel_hum = avg['rel_hum'][-1]
solar_flux = avg['solar_flux'][-1]
precip = avg['precip'][-1]
accum_precip = avg['accum_precip'][-1]
# Converts from m/s to mph.
wind_speed = avg['wind_speed'][-1] * 2.23694
wind_speed_2m = avg['wind_speed_2m'][-1] * 2.23694
gust_1m = avg['gust_1m'][-1] * 2.23694
gust_10m = avg['gust_10m'][-1] * 2.23694
# Converts degrees Celsius to degrees Fahrenheit
air_temp = avg['air_temp'][-1] * 9. / 5. + 32.
dewpoint = avg['dewpoint'][-1] * 9. / 5. + 32.
# hpa to barometric pressure inches
pressure = avg['pressure'][-1] * 0.02952998016471232
return {'ID': wu_id, 'PASSWORD': wu_pw, 'dateutc': timestamp, 'winddir': wind_dir, 'winddir_avg2m': wind_dir_2m,
'windspeedmph': wind_speed, 'windspdmph_avg2m': wind_speed_2m, 'windgustmph': gust_1m,
'windgustmph_10m': gust_10m, 'humidity': rel_hum, 'tempf': air_temp, 'baromin': pressure,
'dewptf': dewpoint, 'solarradiation': solar_flux, 'rainin': precip, 'dailyrainin': accum_precip,
'softwaretype': 'SSEC-RIG', 'action': 'updateraw'}
def main(): def main():
import argparse import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
...@@ -68,7 +183,8 @@ def main(): ...@@ -68,7 +183,8 @@ def main():
help=('Tail file forever, not returning. This will start at the end ' help=('Tail file forever, not returning. This will start at the end '
'of the file and insert any new data added after starting')) 'of the file and insert any new data added after starting'))
parser.add_argument('--ldmp', action='store_true', parser.add_argument('--ldmp', action='store_true',
help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)') help='Treat `src` file as a station name and read records from'
'LoggerNet LDMP server (port: 1024)')
parser.add_argument('--tables', nargs='*', default=['1'], parser.add_argument('--tables', nargs='*', default=['1'],
help="LoggerNet LDMP tables to read in") help="LoggerNet LDMP tables to read in")
parser.add_argument("--host", default=influxdb.DB_HOST, parser.add_argument("--host", default=influxdb.DB_HOST,
...@@ -83,12 +199,12 @@ def main(): ...@@ -83,12 +199,12 @@ def main():
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
parser.add_argument('--sleep-interval', type=float, parser.add_argument('--sleep-interval', type=float,
help="Seconds to wait between submitting each record") help="Seconds to wait between submitting each record")
# parser.add_argument('--weather-underground', action='store_true', parser.add_argument('--weather-underground', action='store_true',
# help="Send new records to wunderground.com") help="Send new records to wunderground.com")
# parser.add_argument('--wu-id', default='KWIMADIS52', parser.add_argument('--wu-id', default='KWIMADIS52',
# help='Weather underground station ID') help='Weather underground station ID')
# parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt', parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
# help='File containing the password for the weather underground upload') help='File containing the password for the weather underground upload')
parser.add_argument('--bulk', type=int, default=1, parser.add_argument('--bulk', type=int, default=1,
help="Number of records to buffer before sending to " help="Number of records to buffer before sending to "
"the database. For large inserts this should be " "the database. For large inserts this should be "
...@@ -105,9 +221,9 @@ def main(): ...@@ -105,9 +221,9 @@ def main():
station_tags = STATIONS[args.station] station_tags = STATIONS[args.station]
symbols = SYMBOL_CONVERSIONS symbols = SYMBOL_CONVERSIONS
# wu_pw = None wu_pw = None
# if args.weather_underground: if args.weather_underground:
# wu_pw = open(args.wu_password_file, 'r').read().strip() wu_pw = open(args.wu_password_file, 'r').read().strip()
if args.ldmp: if args.ldmp:
from aosstower.level_00.parser import LDMPGenerator from aosstower.level_00.parser import LDMPGenerator
...@@ -119,17 +235,21 @@ def main(): ...@@ -119,17 +235,21 @@ def main():
try: try:
influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
influx_gen = influxdb.grouper(influx_gen, args.bulk) influx_gen = influxdb.grouper(influx_gen, args.bulk)
updater = Updater()
for record in influx_gen: for record in influx_gen:
lines = influxdb.frame_records(record, **station_tags) lines = influxdb.frame_records(record, **station_tags)
influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
# Record is in a list of size 1, but want just the record.
avg = updater.rolling_average(record[0])
# Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals.
if avg is not None:
url = construct_url(get_url_data(avg, args.wu_id, wu_pw))
if wu_pw and args.ldmp:
resp = requests.post(url)
if resp.status_code != 200:
warnings.warn('Data failed to upload to {0} with status code {1}: {2}'.format(
url, resp.status_code, resp.text))
# if wu_pw and args.ldmp:
# stamp = record['timestamp'].isoformat('+')
# record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format(
# stamp=stamp,
# winddir=int(record['wind_dir']),
# )
# resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG')
if args.sleep_interval: if args.sleep_interval:
time.sleep(args.sleep_interval) time.sleep(args.sleep_interval)
except (RuntimeError, ValueError, KeyError, requests.RequestException): except (RuntimeError, ValueError, KeyError, requests.RequestException):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment