Skip to content
Snippets Groups Projects
Verified Commit fd17fdca authored by David Hoese's avatar David Hoese
Browse files

Fix mytime tests and other small fixes

parent d9a7ea12
No related branches found
No related tags found
No related merge requests found
import re
import sys
from datetime import datetime
import rrdtool
import numpy as np
from zope.interface import implementer
from metobscommon.util.calc import wind_vector_degrees
from metobscommon.util.mytime import to_epoch
from metobscommon import interface
class ModelError(Exception):
"""Base class for model errors.
"""
class WrapErrors(object):
"""Class wrapper to catch exceptions and properly re-raise them such that
the only exceptions to propagate are `ModelError`s. Essentially, this
prevents anyone from having to import rrdtool lib.
"""
def __init__(self, *exceptions):
self.exceptions = exceptions
def __call__(self, cls):
def _wrap(fcn):
def wrapped(*args, **kwargs):
try:
return fcn(*args, **kwargs)
except self.exceptions as err:
traceback = sys.exc_info()[2]
raise ModelError, str(err), traceback
wrapped.__doc__ = fcn.__doc__
return wrapped
for name in dir(cls):
value = getattr(cls, name)
if not name.startswith('_') and hasattr(value, '__call__'):
setattr(cls, name, _wrap(value))
return cls
@WrapErrors(rrdtool.error)
@implementer(interface.IModel)
class RrdModel(object):
"""Model for storing the Level0 uncalibrated data for non-scientific
purposes, such as web-widgets.
"""
def __init__(self, filepath):
self._filepath = filepath
self._averages = tuple()
self._datasets = None
@property
def datasets(self):
"""Get dataset names available in the database.
"""
if self._datasets is None:
datasets = set()
info = rrdtool.info(self._filepath)
for key in info.keys():
match = re.match('^ds\[(.*)\]', key)
if not match:
continue
datasets.add(match.groups()[0])
self._datasets = tuple(sorted(datasets))
return self._datasets
def averaging_intervals(self):
"""Lazy load averaging intervals from database.
"""
if not self._averages:
averages = set()
info = rrdtool.info(self._filepath)
for key in info.keys():
if key.startswith('rra') and key.endswith('pdp_per_row'):
averages.add(int(info[key] * info['step']))
self._averages = tuple(sorted(averages))
return self._averages
def _format_data(self, stamp, data):
"""Format data for insert into RRD returning a template string and data
line appropriate for arguments to rrdupdate.
"""
validkeys = set(self.datasets).intersection(data.keys())
if not validkeys:
raise ModelError("No valid data keys provided", data)
tmpl = ':'.join(validkeys)
values = ':'.join([str(data[k]) for k in validkeys])
values = '{:d}@{}'.format(to_epoch(stamp), values)
return tmpl, values
def add_record(self, stamp, record):
"""Add a single record to the database, where a record is a dict like
object with keys for each dataset. Additional keys are ignored.
"""
# Normalize to data interval
utime = to_epoch(stamp)
data_interval = min(self.averaging_intervals())
stamp = datetime.utcfromtimestamp(utime - utime % data_interval)
tmpl, data = self._format_data(stamp, dict(record))
rrdtool.update(self._filepath, '--template=%s' % tmpl, data)
def get_slice(self, start, end, names=None, average=None):
"""Get a slice of data from the database.
:param start: Start time as datetime
:param end: Inclusive end time as datetime
:param names: Names to query for, defaults to all available, see ``datasets``
:param average: Averaging interval supported by the database, see ``averaging_intervals``.
"""
average = average or 5
if average not in self.averaging_intervals():
raise ValueError("Invalid average:%d", average)
names = names or self.datasets[:]
if isinstance(start, datetime):
start = to_epoch(start)
if isinstance(end, datetime):
end = to_epoch(end)
# normalize request times to averaging interval
start -= start % average
end -= end % average
# we always get all the data, no matter what was requested
range, columns, rawdata = rrdtool.fetch(self._filepath,
'AVERAGE',
'-r {:d}'.format(average),
'-s {:d}'.format(start),
'-e {:d}'.format(end))
src_data = np.array(rawdata)
# NaN filled matrix of shape big enough for the request names
dst_data = np.zeros((src_data.shape[0], len(names))) * float('nan')
# get only the columns we're interested in
for dst_idx, name in enumerate(names):
if name in columns:
dst_data[:, dst_idx] = src_data[:, columns.index(name)]
# recompose the wind direction if asked for
elif name == 'wind_dir':
east = src_data[:, self.datasets.index('winddir_east')].astype(np.float64)
north = src_data[:, self.datasets.index('winddir_north')].astype(np.float64)
dst_data[:, dst_idx] = wind_vector_degrees(east, north)
# generate column of times for the req average interval
times = np.array([np.arange(start, end + average, average)])
return np.concatenate((times.T, dst_data), axis=1)
import unittest
from datetime import datetime
class RrdModelTests(unittest.TestCase):
def setUp(self):
from metobscommon.util import TemporaryDirectory
self.tmpdir = TemporaryDirectory(chdir=True)
self.addCleanup(self.tmpdir.cleanup)
self.dbname = "database.rrd"
self.create_testdb()
def create_testdb(self):
import rrdtool
rrdtool.create(self.dbname,
# start at epoch so we can insert whatever
"--start=0",
"--step=5",
"DS:var:GAUGE:10:U:U",
"RRA:AVERAGE:0.5:1:1000",
"RRA:AVERAGE:0.5:12:1000",
"RRA:AVERAGE:0.5:60:1000",
)
def _cut(self):
from metobscommon.model import RrdModel
return RrdModel(self.dbname)
def test_datasets(self):
model = self._cut()
self.assertEqual(model.datasets, ('var',))
def test_averaging_intervals(self):
model = self._cut()
self.assertEqual(model.averaging_intervals(), (5, 60, 300))
def test_invalid_dataset_causes_error(self):
from metobscommon.model import ModelError
model = self._cut()
model.averaging_intervals()
with self.assertRaises(ModelError):
model.add_record(datetime.now(), {"Idontexist": 999})
......@@ -5,7 +5,7 @@ def is_utc(d):
return d.tzinfo != None and d.tzinfo.utcoffset(d).seconds == 0
def test_parse_stamp():
from metobs import mytime
from metobscommon.util import mytime
s = '1970-01-01 00:00:00'
d = mytime.parse_stamp(s)
assert is_utc(d)
......@@ -26,27 +26,27 @@ def test_parse_stamp():
assert d.hour == 0 and d.minute == 0 and d.second == 0
def test_utc_now():
from metobs import mytime
from metobscommon.util import mytime
assert is_utc(mytime.utc_now())
def test_set_tz():
from metobs import mytime
from metobscommon.util import mytime
assert is_utc(mytime.set_tz(datetime.now()))
def test_seconds_to_datetime():
from metobs import mytime
from metobscommon.util import mytime
d = mytime.seconds_to_datetime(0)
assert is_utc(d)
assert d.year == 1970 and d.day == 1 and d.month == 1
assert d.hour == 0 and d.minute == 0 and d.second == 0
def test_datetime_to_epoch():
from metobs import mytime
from metobscommon.util import mytime
d = datetime(1970,1,1)
assert mytime.datetime_to_epoch(d) == 0
def test_parse_interval():
from metobs import mytime
from metobscommon.util import mytime
s = '00:00:00'
i = mytime.parse_interval(s)
assert i == 0
......
......@@ -6,8 +6,6 @@ setup(
description='MetObs Common Libraries',
url='http://metobs.ssec.wisc.edu',
install_requires=[
# 'python-rrdtool',
# 'zope.interface'
"sh",
],
dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment