Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
A
AossTower
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Deploy
Model registry
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
MetObs
AossTower
Commits
d997cf22
Commit
d997cf22
authored
5 years ago
by
William Roberts
Browse files
Options
Downloads
Patches
Plain Diff
Improve style and add warning if data fails to upload
parent
d0af6318
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
aosstower/level_00/influxdb.py
+104
-87
104 additions, 87 deletions
aosstower/level_00/influxdb.py
with
104 additions
and
87 deletions
aosstower/level_00/influxdb.py
+
104
−
87
View file @
d997cf22
...
@@ -15,6 +15,7 @@ from metobscommon.util import calc
...
@@ -15,6 +15,7 @@ from metobscommon.util import calc
from
metobscommon.util.nc
import
calculate_wind_gust
from
metobscommon.util.nc
import
calculate_wind_gust
import
numpy
as
np
import
numpy
as
np
import
pandas
as
pd
import
pandas
as
pd
import
warnings
LOG
=
logging
.
getLogger
(
__name__
)
LOG
=
logging
.
getLogger
(
__name__
)
# map station name to InfluxDB tags
# map station name to InfluxDB tags
...
@@ -54,53 +55,30 @@ SYMBOLS = list(SYMBOL_CONVERSIONS.values())
...
@@ -54,53 +55,30 @@ SYMBOLS = list(SYMBOL_CONVERSIONS.values())
class
Updater
(
object
):
class
Updater
(
object
):
"""
https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298
"""
"""
Append weather record (taken as a dict) and do averages when enough data is ready.
At least 12 minutes of data is required to do averaging: 10 minutes of wind gusts,
and wind gusts need 2 minutes of data to calculate.
This class is created once at startup and calls rolling_average every time new data is available which tries to do
averaging every 5 minutes of data added.
"""
def
__init__
(
self
):
def
__init__
(
self
):
# Counter that returns rolling average every 5 minutes. independent of
# Counter that returns rolling average every 5 minutes. independent of
# self.data since
its
length can be 144 == 12 minutes.
# self.data since
self.data
length can be 144 == 12 minutes.
self
.
counter
=
0
self
.
counter
=
0
self
.
data
=
{}
self
.
data
=
{}
def
rolling_average
(
self
,
record
):
def
rolling_average
(
self
,
record
):
KNOTS_9
=
calc
.
knots_to_mps
(
9.
)
KNOTS_5
=
calc
.
knots_to_mps
(
5.
)
self
.
counter
+=
1
self
.
counter
+=
1
database
=
{
k
:
schema
.
database_dict
[
k
]
for
k
in
schema
.
met_vars
}
# Add new data to dict.
# Add new data to dict.
for
key
in
record
:
for
key
in
record
:
self
.
data
.
setdefault
(
key
,
[]).
append
(
record
[
key
])
self
.
data
.
setdefault
(
key
,
[]).
append
(
record
[
key
])
# 4:50 and 4:45. http://metobs.ssec.wisc.edu/aoss/tower/cgi-bin/data_data.py?&separator=,
# &symbols=p:t:td:dir:spd:flux:accum_precip:rh&begin=-00:05:00&end=&interval=
# If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is
# If 5 minutes of data are ready, average current data in dict (up to 15 minutes until data is
# dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes.
# dropped for the first time, then up to 12 minutes thereafter are held). 60 * 5 seconds = 5 minutes.
if
self
.
counter
%
60
==
0
:
if
self
.
counter
%
60
==
0
:
# Appending to a DataFrame is slow. Instead add to a dict in chunks and pass it to the DataFrame.
# Appending to a DataFrame is slow. Instead, this adds to a dict in chunks and passes it to the DataFrame.
frame
=
pd
.
DataFrame
(
self
.
data
)
frame
=
self
.
_calculate_averages
()
frame
.
set_index
(
'
timestamp
'
,
inplace
=
True
)
frame
.
mask
(
frame
==
-
99999.
,
inplace
=
True
)
frame
.
fillna
(
value
=
np
.
nan
,
inplace
=
True
)
# Add wind direction components so we can average wind direction properly
frame
[
'
wind_east
'
],
frame
[
'
wind_north
'
],
_
=
calc
.
wind_vector_components
(
frame
[
'
wind_speed
'
],
frame
[
'
wind_dir
'
])
frame
[
'
wind_dir
'
]
=
calc
.
wind_vector_degrees
(
frame
[
'
wind_east
'
],
frame
[
'
wind_north
'
])
if
'
air_temp
'
in
frame
and
'
rh
'
in
frame
and
(
'
dewpoint
'
in
database
or
'
dewpoint_mean
'
in
database
):
LOG
.
info
(
"'
dewpoint
'
is missing from the input file, will calculate
"
"
it from air temp and relative humidity
"
)
frame
[
'
dewpoint
'
]
=
calc
.
dewpoint
(
frame
[
'
air_temp
'
],
frame
[
'
rh
'
])
# 2 minute rolling average of 5 second data.
winds_frame_2m
=
frame
[[
'
wind_speed
'
,
'
wind_east
'
,
'
wind_north
'
]].
rolling
(
'
2T
'
).
mean
()
frame
[
'
wind_speed_2m
'
]
=
winds_frame_2m
[
'
wind_speed
'
]
frame
[
'
wind_dir_2m
'
]
=
calc
.
wind_vector_degrees
(
winds_frame_2m
[
'
wind_east
'
],
winds_frame_2m
[
'
wind_north
'
])
# TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG.
# 1 minute rolling peaks
wind_peak_1m
=
frame
[
'
wind_speed
'
].
rolling
(
window
=
'
1T
'
,
center
=
False
).
max
()
# criteria for a fast wind to be considered a wind gust
gust_mask
=
(
winds_frame_2m
[
'
wind_speed
'
]
>=
KNOTS_9
)
&
\
(
wind_peak_1m
>=
winds_frame_2m
[
'
wind_speed
'
]
+
KNOTS_5
)
frame
[
'
cur_gust
'
]
=
wind_peak_1m
.
mask
(
~
gust_mask
)
frame
[
'
gust_10m
'
]
=
calculate_wind_gust
(
frame
[
'
wind_speed
'
],
winds_frame_2m
[
'
wind_speed
'
])
# Keep data set within minimum window to improve speed.
# Keep data set within minimum window to improve speed.
# Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data
# Wind gusts looks at 10 minute intervals, including the first data point which needs 2 minutes of data
# before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should
# before it, totalling 12 minutes. Since data is sent every 5 minutes, at 15+ minutes we should
...
@@ -111,11 +89,43 @@ class Updater(object):
...
@@ -111,11 +89,43 @@ class Updater(object):
self
.
data
[
key
]
=
val
[
-
84
:]
self
.
data
[
key
]
=
val
[
-
84
:]
self
.
counter
-=
60
self
.
counter
-=
60
else
:
else
:
# Make 10 minute gusts
at or
before 1
0
minutes nans because data is insufficient.
# Make 10 minute gusts before 1
2
minutes nans because data is insufficient.
frame
[
'
gust_10m
'
].
mask
(
frame
[
'
gust_10m
'
]
>
-
1.
,
inplace
=
True
)
frame
[
'
gust_10m
'
].
mask
(
frame
[
'
gust_10m
'
]
>
-
1.
,
inplace
=
True
)
frame
.
fillna
(
np
.
nan
,
inplace
=
True
)
frame
.
fillna
(
np
.
nan
,
inplace
=
True
)
return
frame
return
frame
def
_calculate_averages
(
self
):
frame
=
pd
.
DataFrame
(
self
.
data
)
KNOTS_9
=
calc
.
knots_to_mps
(
9.
)
KNOTS_5
=
calc
.
knots_to_mps
(
5.
)
database
=
{
k
:
schema
.
database_dict
[
k
]
for
k
in
schema
.
met_vars
}
frame
.
set_index
(
'
timestamp
'
,
inplace
=
True
)
frame
.
mask
(
frame
==
-
99999.
,
inplace
=
True
)
frame
.
fillna
(
value
=
np
.
nan
,
inplace
=
True
)
# Add wind direction components so we can average wind direction properly
frame
[
'
wind_east
'
],
frame
[
'
wind_north
'
],
_
=
calc
.
wind_vector_components
(
frame
[
'
wind_speed
'
],
frame
[
'
wind_dir
'
])
frame
[
'
wind_dir
'
]
=
calc
.
wind_vector_degrees
(
frame
[
'
wind_east
'
],
frame
[
'
wind_north
'
])
if
'
air_temp
'
in
frame
and
'
rh
'
in
frame
and
(
'
dewpoint
'
in
database
or
'
dewpoint_mean
'
in
database
):
LOG
.
info
(
"'
dewpoint
'
is missing from the input file, will calculate
"
"
it from air temp and relative humidity
"
)
frame
[
'
dewpoint
'
]
=
calc
.
dewpoint
(
frame
[
'
air_temp
'
],
frame
[
'
rh
'
])
# 2 minute rolling average of 5 second data.
winds_frame_2m
=
frame
[[
'
wind_speed
'
,
'
wind_east
'
,
'
wind_north
'
]].
rolling
(
'
2T
'
).
mean
()
frame
[
'
wind_speed_2m
'
]
=
winds_frame_2m
[
'
wind_speed
'
]
frame
[
'
wind_dir_2m
'
]
=
calc
.
wind_vector_degrees
(
winds_frame_2m
[
'
wind_east
'
],
winds_frame_2m
[
'
wind_north
'
])
# TODO: PEAK_DIR IS THE 5 SEC MAX FROM LAST MINUTE IF 5 KTS OVER LAST 2 MINUTE AVG.
# 1 minute rolling peaks
wind_peak_1m
=
frame
[
'
wind_speed
'
].
rolling
(
window
=
'
1T
'
,
center
=
False
).
max
()
# criteria for a fast wind to be considered a wind gust
gust_mask
=
(
winds_frame_2m
[
'
wind_speed
'
]
>=
KNOTS_9
)
&
\
(
wind_peak_1m
>=
winds_frame_2m
[
'
wind_speed
'
]
+
KNOTS_5
)
frame
[
'
cur_gust
'
]
=
wind_peak_1m
.
mask
(
~
gust_mask
)
frame
[
'
gust_10m
'
]
=
calculate_wind_gust
(
frame
[
'
wind_speed
'
],
winds_frame_2m
[
'
wind_speed
'
])
return
frame
def
convert_to_influx_frame
(
record_gen
,
symbols
,
debug
=
False
):
def
convert_to_influx_frame
(
record_gen
,
symbols
,
debug
=
False
):
for
idx
,
record
in
enumerate
(
record_gen
):
for
idx
,
record
in
enumerate
(
record_gen
):
...
@@ -127,6 +137,56 @@ def convert_to_influx_frame(record_gen, symbols, debug=False):
...
@@ -127,6 +137,56 @@ def convert_to_influx_frame(record_gen, symbols, debug=False):
yield
record
yield
record
def
construct_url
(
data
):
return
(
'
http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?
'
'
ID={ID}&
'
'
PASSWORD={PASSWORD}&
'
'
action=updateraw&
'
'
dateutc={dateutc}&
'
'
winddir={winddir}&
'
'
winddir_avg2m={winddir_avg2m}&
'
'
windspeedmph={windspeedmph}&
'
'
windspdmph_avg2m={windspdmph_avg2m}&
'
'
windgustmph={windgustmph}&
'
'
windgustmph_10m={windgustmph_10m}&
'
'
humidity={humidity}&
'
'
tempf={tempf}&
'
'
baromin={baromin}&
'
'
dewptf={dewptf}&
'
'
solarradiation={solarradiation}&
'
'
rainin={rainin}&
'
'
dailyrainin={dailyrainin}&
'
'
softwaretype=SSEC-RIG
'
).
format
(
**
data
)
def
get_url_data
(
avg
,
wu_id
,
wu_pw
):
# Information on what paramaters that can be sent:
# https://feedback.weather.com/customer/en/portal/articles/2924682-pws-upload-protocol?b_id=17298
# For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute.
timestamp
=
avg
.
index
[
-
1
].
round
(
'
1T
'
).
isoformat
(
'
+
'
)
wind_dir
=
avg
[
'
wind_dir
'
][
-
1
]
wind_dir_2m
=
avg
[
'
wind_dir_2m
'
][
-
1
]
# Converts from m/s to mph.
wind_speed
=
avg
[
'
wind_speed
'
][
-
1
]
*
2.23694
wind_speed_2m
=
avg
[
'
wind_speed_2m
'
][
-
1
]
*
2.23694
cur_gust
=
avg
[
'
cur_gust
'
][
-
1
]
*
2.23694
gust_10m
=
avg
[
'
gust_10m
'
][
-
1
]
*
2.23694
rel_hum
=
avg
[
'
rel_hum
'
][
-
1
]
# Converts degrees Celsius to degrees Fahrenheit
air_temp
=
avg
[
'
air_temp
'
][
-
1
]
*
9.
/
5.
+
32.
# hpa to barometric pressure inches
pressure
=
avg
[
'
pressure
'
][
-
1
]
*
0.02952998016471232
# degrees Celcus to degrees Fahrenheit.
dewpoint
=
avg
[
'
dewpoint
'
][
-
1
]
*
9.
/
5.
+
32.
solar_flux
=
avg
[
'
solar_flux
'
][
-
1
]
precip
=
avg
[
'
precip
'
][
-
1
]
accum_precip
=
avg
[
'
accum_precip
'
][
-
1
]
return
{
'
ID
'
:
wu_id
,
'
PASSWORD
'
:
wu_pw
,
'
dateutc
'
:
timestamp
,
'
winddir
'
:
wind_dir
,
'
winddir_avg2m
'
:
wind_dir_2m
,
'
windspeedmph
'
:
wind_speed
,
'
windspdmph_avg2m
'
:
wind_speed_2m
,
'
windgustmph
'
:
cur_gust
,
'
windgustmph_10m
'
:
gust_10m
,
'
humidity
'
:
rel_hum
,
'
tempf
'
:
air_temp
,
'
baromin
'
:
pressure
,
'
dewptf
'
:
dewpoint
,
'
solarradiation
'
:
solar_flux
,
'
rainin
'
:
precip
,
'
dailyrainin
'
:
accum_precip
}
def
main
():
def
main
():
import
argparse
import
argparse
parser
=
argparse
.
ArgumentParser
(
description
=
__doc__
,
formatter_class
=
argparse
.
ArgumentDefaultsHelpFormatter
)
parser
=
argparse
.
ArgumentParser
(
description
=
__doc__
,
formatter_class
=
argparse
.
ArgumentDefaultsHelpFormatter
)
...
@@ -190,61 +250,18 @@ def main():
...
@@ -190,61 +250,18 @@ def main():
influx_gen
=
influxdb
.
grouper
(
influx_gen
,
args
.
bulk
)
influx_gen
=
influxdb
.
grouper
(
influx_gen
,
args
.
bulk
)
updater
=
Updater
()
updater
=
Updater
()
for
record
in
influx_gen
:
for
record
in
influx_gen
:
#
lines = influxdb.frame_records(record, **station_tags)
lines
=
influxdb
.
frame_records
(
record
,
**
station_tags
)
#
influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
influxdb
.
insert
(
lines
,
host
=
args
.
host
,
port
=
args
.
port
,
dbname
=
args
.
dbname
)
# Record is in a list of size 1, but want just the record.
# Record is in a list of size 1, but want just the record.
avg
=
updater
.
rolling_average
(
record
[
0
])
avg
=
updater
.
rolling_average
(
record
[
0
])
# Once every 5 minutes: 0 through 295 seconds inclusive.
# Once every 5 minutes: 0 through 295 seconds inclusive
in 5 second intervals
.
if
avg
is
not
None
:
if
avg
is
not
None
:
# For timestamp, want YYYY-MM-DD+hh:mm:ss of last dataset that was averaged, rounded up to nearest minute.
url
=
construct_url
(
get_url_data
(
avg
,
args
.
wu_id
,
wu_pw
))
timestamp
=
avg
.
index
[
-
1
].
round
(
'
1T
'
).
isoformat
(
'
+
'
)
if
wu_pw
and
args
.
ldmp
:
wind_dir
=
avg
[
'
wind_dir
'
][
-
1
]
resp
=
requests
.
post
(
url
)
wind_dir_2m
=
avg
[
'
wind_dir_2m
'
][
-
1
]
if
resp
.
status_code
!=
200
:
# Converts from m/s to mph.
warnings
.
warn
(
'
Data failed to upload to {0} with status code {1}: {2}
'
.
format
(
wind_speed
=
avg
[
'
wind_speed
'
][
-
1
]
*
2.23694
url
,
resp
.
status_code
,
resp
.
text
))
wind_speed_2m
=
avg
[
'
wind_speed_2m
'
][
-
1
]
*
2.23694
cur_gust
=
avg
[
'
cur_gust
'
][
-
1
]
*
2.23694
gust_10m
=
avg
[
'
gust_10m
'
][
-
1
]
*
2.23694
rel_hum
=
avg
[
'
rel_hum
'
][
-
1
]
# Converts degrees Celsius to degrees Fahrenheit
air_temp
=
avg
[
'
air_temp
'
][
-
1
]
*
9.
/
5.
+
32.
# hpa to barometric pressure inches
pressure
=
avg
[
'
pressure
'
][
-
1
]
*
0.02952998016471232
# degrees Celcus to degrees Fahrenheit.
dewpoint
=
avg
[
'
dewpoint
'
][
-
1
]
*
9.
/
5.
+
32.
solar_flux
=
avg
[
'
solar_flux
'
][
-
1
]
precip
=
avg
[
'
precip
'
][
-
1
]
accum_precip
=
avg
[
'
accum_precip
'
][
-
1
]
url
=
(
'
http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?
'
'
ID={wu_id}&
'
'
PASSWORD={wu_pw}&
'
'
action=updateraw&
'
'
dateutc={timestamp}&
'
'
winddir={wind_dir}&
'
'
winddir_avg2m={wind_dir_2m}&
'
'
windspeedmph={wind_speed}&
'
'
windspdmph_avg2m={wind_speed_2m}&
'
'
windgustmph={cur_gust}&
'
'
windgustmph_10m={gust_10m}&
'
'
humidity={rel_hum}&
'
'
tempf={air_temp}&
'
'
baromin={pressure}&
'
'
dewptf={dewpoint}&
'
'
solarradiation={solar_flux}&
'
'
rainin={precip}&
'
'
dailyrainin={accum_precip}&
'
'
softwaretype=SSEC-RIG
'
).
format
(
wu_id
=
args
.
wu_id
,
wu_pw
=
wu_pw
,
timestamp
=
timestamp
,
wind_dir
=
wind_dir
,
wind_dir_2m
=
wind_dir_2m
,
wind_speed
=
wind_speed
,
wind_speed_2m
=
wind_speed_2m
,
cur_gust
=
cur_gust
,
gust_10m
=
gust_10m
,
rel_hum
=
rel_hum
,
air_temp
=
air_temp
,
pressure
=
pressure
,
dewpoint
=
dewpoint
,
solar_flux
=
solar_flux
,
precip
=
precip
,
accum_precip
=
accum_precip
)
print
(
url
)
# if wu_pw and args.ldmp:
# # TODO: CHECK FOR SUCCESS RESPONSE PING AFTER SENDING.
# resp = requests.post(url)
# if resp != 'success':
# raise ValueError('Data not received.')
if
args
.
sleep_interval
:
if
args
.
sleep_interval
:
time
.
sleep
(
args
.
sleep_interval
)
time
.
sleep
(
args
.
sleep_interval
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment