Skip to content
Snippets Groups Projects
Commit 810f3f13 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Model working? Needs tests.

parent 73bbc660
No related branches found
No related tags found
No related merge requests found
import os
import re
import sys
from datetime import datetime, timedelta
import rrdtool
import numpy as np
from .time import to_unix_timestamp
from .wind import wind_vector_degrees
from metobs.data import (wind_vector_degrees, to_unix_timestamp,
wind_vector_components)
def dewpoint(tempC, relhum):
......@@ -24,35 +26,79 @@ def dewpoint(tempC, relhum):
return np.minimum(dp - 273.15, tempC)
class RrdModel(object):
class ModelError(Exception):
"""Base class for model errors.
"""
class WrapErrors(object):
"""Class wrapper to catch exceptions and properly re-raise them such that
the only exceptions to propagate are `ModelError`s. Essentially, this
prevents anyone from having to import rrdtool lib.
"""
def __init__(self, *exceptions):
self.exceptions = exceptions
def __call__(self, cls):
def _wrap(fcn):
def wrapped(*args, **kwargs):
try:
return fcn(*args, **kwargs)
except self.exceptions as err:
traceback = sys.exc_info()[2]
raise ModelError, str(err), traceback
return wrapped
for name in dir(cls):
value = getattr(cls, name)
if not name.startswith('_') and hasattr(value, '__call__'):
setattr(cls, name, _wrap(value))
keys = ('air_temp', 'rh', 'dewpoint',
'wind_speed', 'winddir_north', 'winddir_east',
'pressure', 'precip', 'accum_precip', 'solar_flux',
'altimeter')
return cls
@WrapErrors(rrdtool.error)
class RrdModel(object):
"""Model for storing the Level0 uncalibrated data for non-scientific
purposes, such as web-widgets.
"""
def __init__(self, filepath):
self._filepath = filepath
self._averages = tuple()
self._datasets = None
def datasets(self):
"""Get dataset names available in the database.
"""
if self._datasets is None:
datasets = set()
info = rrdtool.info(self._filepath)
for key in info.keys():
match = re.match('^ds\[(.*)\]', key)
if not match:
continue
datasets.add(match.groups()[0])
self._datasets = tuple(sorted(datasets))
return self._datasets
@property
def averaging_intervals(self):
"""Lazy load averaging intervals from database.
"""
if not self._averages:
averages = []
averages = set()
info = rrdtool.info(self._filepath)
for key in info.keys():
if key.startswith('rra') and key.endswith('pdp_per_row'):
averages.append(int(info[key]*info['step']))
averages.sort()
self._averages = tuple(averages)
averages.add(int(info[key] * info['step']))
self._averages = tuple(sorted(averages))
return self._averages
def initialize(self, start=None):
assert not os.path.exists(self._filepath)
start = start or datetime.now() - timedelta(days=365)
"""Create a new empty RRD database.
"""
assert not os.path.exists(self._filepath), "DB already exists"
start = start or (datetime.utcnow() - timedelta(days=365))
secs = to_unix_timestamp(start)
rrdtool.create(self._filepath,
'--start={}'.format(secs),
......@@ -68,29 +114,58 @@ class RrdModel(object):
'DS:accum_precip:GAUGE:10:0:100',
'DS:solar_flux:GAUGE:10:0:1000',
'DS:altimeter:GAUGE:10:0:100',
'RRA:AVERAGE:0.5:1:6307200',
'RRA:LAST:0.5:1:6307200',
'RRA:AVERAGE:0.5:12:525600',
'RRA:AVERAGE:0.5:60:105120',
'RRA:AVERAGE:0.5:360:17520')
def _print(self, record):
stamp = record.get_stamp()
values = ':'.join([str(record[k]) for k in self.keys])
values = '{:d}:{}'.format(to_unix_timestamp(stamp), values)
print values
def _format_data(self, stamp, data):
"""Format data for insert into RRD.
"""
values = ':'.join([str(data[k]) for k in self.datasets()])
values = '{:d}@{}'.format(to_unix_timestamp(stamp), values)
return values
def _record_to_data(self, record):
"""Turn a tower record into database data.
"""
expected_keys = set(self.datasets()) - {'winddir_north', 'winddir_east'}
missing_keys = expected_keys - set(record.keys())
if missing_keys:
raise ModelError("Missing datasets %s" % missing_keys)
data = {}
wspd, wdir = (float(record.wind_speed), float(record.wind_dir))
winds = wind_vector_components(wspd, wdir)
data['winddir_east'] = winds[0]
data['winddir_north'] = winds[1]
data['wind_speed'] = winds[2]
for name in self.datasets():
if name in record:
data[name] = record[name]
return data
def add_record(self, record):
pass
"""Add a single record to the database.
"""
stamp = record.get_stamp()
data = self._record_to_data(record)
rrdtool.update(self._filepath,
'--template=%s' % ':'.join(self.datasets()),
self._format_data(stamp, data))
def get_slice(self, start, end, names=None, average=5):
"""Get a slice of data from the database.
:param start: Start time as datetime
:param end: Inclusive end time as datetime
:param names: Names to query for, defaults to all available, see ``keys``
:param names: Names to query for, defaults to all available, see ``datasets``
:param average: Averaging interval supported by the database, see ``averaging_intervals``.
"""
names = names or self.keys[:]
if average not in self.averaging_intervals():
raise ValueError("Invalid average:%d", average)
names = names or self.datasets[:]
if isinstance(start, datetime):
start = to_unix_timestamp(start)
......@@ -101,6 +176,7 @@ class RrdModel(object):
start -= start % average
end -= end % average
# we always get all the data, no matter what was requested
range, columns, rawdata = rrdtool.fetch(self._filepath,
'AVERAGE',
'-r {:d}'.format(average),
......@@ -108,25 +184,26 @@ class RrdModel(object):
'-e {:d}'.format(end))
src_data = np.array(rawdata)
# NaN filled matrix of shape big enough for the request names
dst_data = np.zeros((src_data.shape[0], len(names))) * float('nan')
# get only the columns we're interested in
for dst_idx, name in enumerate(names):
if name in columns:
dst_data[:,dst_idx] = src_data[:,columns.index(name)]
dst_data[:, dst_idx] = src_data[:, columns.index(name)]
# we compute dewpoint since it wasn't always available
if name == 'dewpoint':
temp = src_data[:,self.keys.index('air_temp')].astype(np.float64)
rh = src_data[:,self.keys.index('rh')].astype(np.float64)
dst_data[:,dst_idx] = dewpoint(temp, rh)
temp = src_data[:, self.datasets.index('air_temp')].astype(np.float64)
rh = src_data[:, self.datasets.index('rh')].astype(np.float64)
dst_data[:, dst_idx] = dewpoint(temp, rh)
# get the wind direction in degrees from the vector components
elif name == 'wind_dir':
east = src_data[:,self.keys.index('winddir_east')].astype(np.float64)
north = src_data[:,self.keys.index('winddir_north')].astype(np.float64)
dst_data[:,dst_idx] = wind_vector_degrees(east, north)
east = src_data[:, self.datasets.index('winddir_east')].astype(np.float64)
north = src_data[:, self.datasets.index('winddir_north')].astype(np.float64)
dst_data[:, dst_idx] = wind_vector_degrees(east, north)
# generate column of times for the req average interval
times = np.array([np.arange(start, end + average, average)])
return np.concatenate((times.T, dst_data), axis=1)
......@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
import numpy
from .time import hhmm_to_offset
from metobs.data import hhmm_to_offset
symbols = {
'TIME': {'type': numpy.int32},
......
from datetime import datetime
def test_to_unix_timestamp():
from aosstower.time import to_unix_timestamp
assert to_unix_timestamp(datetime(1970, 1, 1)) == 0
def test_hhmm_to_secs():
from aosstower.time import hhmm_to_offset
assert hhmm_to_offset('2400') == 86400, "Can't handle > 23:59"
assert hhmm_to_offset('2401') == 86460, "Can't handle > 23:59"
assert hhmm_to_offset('0') == 0, "Can't handle short times"
assert hhmm_to_offset('001') == 60, "Can't handle leading 0"
import unittest
class MeanWindVectorTests(unittest.TestCase):
def _fut(self, winddir, windspd=None):
from aosstower.wind import mean_wind_vector
windspd = windspd or [1]*len(winddir)
return mean_wind_vector(windspd, winddir)[0]
def test_spanning_0_degrees(self):
winddir = self._fut([315, 45])
self.assertAlmostEqual(winddir, 0)
def test_spanning_cardinal_directions(self):
self.assertAlmostEqual(self._fut([45, 135]), 90)
self.assertAlmostEqual(self._fut([135, 225]), 180)
self.assertAlmostEqual(self._fut([225, 315]), 270)
self.assertAlmostEqual(self._fut([315, 45]), 0)
def test_all_zeros(self):
self.assertAlmostEqual(self._fut([0, 0]), 0)
def test_zero_windspd(self):
self.assertAlmostEqual(self._fut([0, 0], windspd=[0, 0]), 0)
def test_45s(self):
self.assertAlmostEqual(self._fut([0, 90]), 45)
self.assertAlmostEqual(self._fut([90, 180]), 135)
self.assertAlmostEqual(self._fut([180, 270]), 225)
self.assertAlmostEqual(self._fut([270, 0]), 315)
from calendar import timegm
from datetime import datetime, timedelta
def to_unix_timestamp(dt):
return int(timegm(dt.utctimetuple()))
def hhmm_to_offset(hhmm):
"""Convert a string time, possibly with missing hours and minutes, to an
offset of seconds.
"""
hhmm = '{:04d}'.format(int(hhmm))
return timedelta(hours=int(hhmm[0:2]),
minutes=int(hhmm[2:])).total_seconds()
"""
See: Campbell Scientific CR1000 Manual Section 7.8.5.2.2.
"""
import numpy as np
def wind_vector_components(windspd, winddir):
"""Decompose scalar or list/array wind direction and speed data into the
corresponding horizontal and vertical direction components and speed
vector.
Inputs can be scalar or arrays.
"""
dir_rad = np.deg2rad(winddir)
spd_arr = np.array(windspd)
V_e = spd_arr * np.sin(dir_rad)
V_n = spd_arr * np.cos(dir_rad)
U_spd = np.sqrt(pow(V_e, 2) + pow(V_n, 2))
return V_e, V_n, U_spd
def wind_vector_degrees(vector_east, vector_north):
"""Re-compose horizontal (east/west) and vertical (north/south) vector
components into wind direction in degrees.
Inputs can be scalar or arrays.
"""
rads = np.arctan2(vector_east, vector_north)
winddir = np.rad2deg(rads)
if isinstance(winddir, np.ndarray):
winddir[np.less(winddir, 0)] += 360
elif winddir < 0:
winddir += 360
return winddir % 360
def mean_wind_vector(windspd, winddir):
V_e, V_n, V_spd = wind_vector_components(windspd, winddir)
avg_dir = wind_vector_degrees(np.mean(V_e), np.mean(V_n))
return avg_dir, np.mean(V_spd)
#!/usr/bin/env python
import sys
import os
import glob
import logging
from datetime import datetime
from metobs.data import wind_vector_components
from aosstower.record import RecordV1, LineParseError
from aosstower.model import RrdModel
from aosstower import wind
from aosstower.model import RrdModel, ModelError
LOG = logging
if __name__ == '__main__':
......@@ -18,27 +19,32 @@ if __name__ == '__main__':
args = parser.parse_args()
model = RrdModel(args.db)
model.initialize(datetime(2013, 1, 1))
logging.basicConfig(level=logging.INFO)
rrd = RrdModel(args.db)
if not os.path.exists(args.db):
LOG.info("initilizing %s", rrd)
rrd.initialize(datetime(2013, 1, 1))
for filepath in glob.glob(args.path):
print >> sys.stderr, filepath
LOG.info("adding %s", filepath)
for line in open(filepath).readlines():
if not line.strip():
continue
try:
record = RecordV1(line)
except LineParseError as err:
LOG.error(str(err))
continue
windspd = float(record['wind_speed'])
winddir = float(record['wind_dir'])
u_e, u_n, spd = wind.wind_vector_components(windspd, winddir)
u_e, u_n, spd = wind_vector_components(windspd, winddir)
record['winddir_east'] = u_e
record['winddir_north'] = u_n
record['wind_speed'] = spd
try:
model._print(record)
except Exception as err:
raise
rrd.add_record(record)
except ModelError:
LOG.exception("Error with record %s" % record)
......@@ -16,4 +16,4 @@ rrdtool create aoss_tower.rrd \
RRA:AVERAGE:0.5:1:6307200 \
RRA:AVERAGE:0.5:12:525600 \
RRA:AVERAGE:0.5:60:105120 \
RRA:AVERAGE:0.5:360:17520
\ No newline at end of file
RRA:AVERAGE:0.5:360:17520
......@@ -8,19 +8,14 @@ except ImportError:
setup(
name='AossTower',
version='0.1',
zip_safe=True,
description='UW AOSS Rooftop Instrument Group Met Tower',
url='http://metobs.ssec.wisc.edu',
author='',
author_email='',
scripts=[],
install_requires=[
'python-rrdtool'
'python-rrdtool',
'numpy',
'metobs.data>=0.4a'
],
dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'],
packages=find_packages(exclude=['aosstower.tests']),
include_package_data=True,
package_data = {},
test_suite='aosstower.tests',
entry_points={},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment