Skip to content
Snippets Groups Projects
Commit b67c3e5a authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Good enough to create RRDs for entire archive

parent c3d3d7bf
No related branches found
No related tags found
No related merge requests found
# Data interval in seconds
DATA_INTERVAL = 5
......@@ -6,24 +6,8 @@ from datetime import datetime, timedelta
import rrdtool
import numpy as np
from metobs.data import (wind_vector_degrees, to_unix_timestamp,
wind_vector_components)
def dewpoint(tempC, relhum):
"""
Algorithm from Tom Whittaker tempC is the temperature in degrees Celsius,
relhum is the relative humidity as a percentage.
:param tempC: temperature in celsius
:param relhum: relative humidity as a percentage
"""
gasconst = 461.5
latheat = 2500800.0
dp = (1.0 / (1.0 / (273.15 + tempC) - gasconst * np.log((0.0 + relhum) / 100) / (latheat - tempC * 2397.5)))
return np.minimum(dp - 273.15, tempC)
from metobs.data import wind_vector_degrees, to_unix_timestamp
from aosstower import meta
class ModelError(Exception):
......@@ -58,15 +42,45 @@ class WrapErrors(object):
return cls
def initialize(filepath, start=None, days=365, data_interval=5):
"""Create a new empty RRD database.
"""
assert not os.path.exists(filepath), "DB already exists"
start = start or (datetime.utcnow() - timedelta(days=days))
# normalize start to data interval
secs = to_unix_timestamp(start)
secs -= secs % data_interval
rrdtool.create(filepath,
'--start={}'.format(secs),
'--step={:d}'.format(data_interval),
'DS:air_temp:GAUGE:10:-40:50',
'DS:rh:GAUGE:10:0:100',
'DS:dewpoint:GAUGE:10:0:100',
'DS:wind_speed:GAUGE:10:0:100',
'DS:winddir_north:GAUGE:10:-100:100',
'DS:winddir_east:GAUGE:10:-100:100',
'DS:pressure:GAUGE:10:0:1100',
'DS:precip:GAUGE:10:0:100',
'DS:accum_precip:GAUGE:10:0:100',
'DS:solar_flux:GAUGE:10:0:1000',
'DS:altimeter:GAUGE:10:0:100',
# native resolution
'RRA:AVERAGE:0.5:1:6307200',
# 1 minute
'RRA:AVERAGE:0.5:{:d}:525600'.format(60/data_interval),
# 5 minute
'RRA:AVERAGE:0.5:{:d}:105120'.format(300/data_interval),
# 30 minute
'RRA:AVERAGE:0.5:{:d}:17520'.format(1800/data_interval))
@WrapErrors(rrdtool.error)
class RrdModel(object):
"""Model for storing the Level0 uncalibrated data for non-scientific
purposes, such as web-widgets.
"""
# data interval in seconds
DATA_INTERVAL = 5
def __init__(self, filepath):
self._filepath = filepath
self._averages = tuple()
......@@ -99,71 +113,29 @@ class RrdModel(object):
self._averages = tuple(sorted(averages))
return self._averages
def initialize(self, start=None, days=365):
"""Create a new empty RRD database.
"""
assert not os.path.exists(self._filepath), "DB already exists"
start = start or (datetime.utcnow() - timedelta(days=days))
# normalize start to data interval
secs = to_unix_timestamp(start)
secs -= secs % self.DATA_INTERVAL
rrdtool.create(self._filepath,
'--start={}'.format(secs),
'--step={:d}'.format(self.DATA_INTERVAL),
'DS:air_temp:GAUGE:10:-40:50',
'DS:rh:GAUGE:10:0:100',
'DS:dewpoint:GAUGE:10:0:100',
'DS:wind_speed:GAUGE:10:0:100',
'DS:winddir_north:GAUGE:10:-100:100',
'DS:winddir_east:GAUGE:10:-100:100',
'DS:pressure:GAUGE:10:0:1100',
'DS:precip:GAUGE:10:0:100',
'DS:accum_precip:GAUGE:10:0:100',
'DS:solar_flux:GAUGE:10:0:1000',
'DS:altimeter:GAUGE:10:0:100',
'RRA:AVERAGE:0.5:1:6307200',
'RRA:AVERAGE:0.5:{:d}:525600'.format(60/self.DATA_INTERVAL),
'RRA:AVERAGE:0.5:{:d}:105120'.format(300/self.DATA_INTERVAL),
'RRA:AVERAGE:0.5:{:d}:17520'.format(1800/self.DATA_INTERVAL))
def _format_data(self, stamp, data):
"""Format data for insert into RRD.
"""Format data for insert into RRD returning a template string and data
line appropriate for arguments to rrdupdate.
"""
values = ':'.join([str(data[k]) for k in self.datasets])
validkeys = set(self.datasets).intersection(data.keys())
if not validkeys:
raise ModelError("No valid data keys provided", data)
tmpl = ':'.join(validkeys)
values = ':'.join([str(data[k]) for k in validkeys])
values = '{:d}@{}'.format(to_unix_timestamp(stamp), values)
return values
return tmpl, values
def _record_to_data(self, record):
"""Turn a tower record into database data.
"""
expected_keys = set(self.datasets) - {'winddir_north', 'winddir_east'}
missing_keys = expected_keys - set(record.keys())
if missing_keys:
raise ModelError("Missing datasets %s" % missing_keys)
data = {}
wspd, wdir = (float(record.wind_speed), float(record.wind_dir))
winds = wind_vector_components(wspd, wdir)
data['winddir_east'] = winds[0]
data['winddir_north'] = winds[1]
data['wind_speed'] = winds[2]
for name in self.datasets:
if name in record:
data[name] = record[name]
return data
def add_record(self, record):
"""Add a single record to the database.
def add_record(self, stamp, record):
"""Add a single record to the database, where a record is a dict like
object with keys for each dataset. Additional keys are ignored.
"""
# Normalize to data interval
utime = to_unix_timestamp(record.get_stamp())
stamp = datetime.utcfromtimestamp(utime - utime % self.DATA_INTERVAL)
data = self._record_to_data(record)
rrdtool.update(self._filepath,
'--template=%s' % ':'.join(self.datasets),
self._format_data(stamp, data))
utime = to_unix_timestamp(stamp)
data_interval = min(self.averaging_intervals())
stamp = datetime.utcfromtimestamp(utime - utime % data_interval)
tmpl, data = self._format_data(stamp, dict(record))
rrdtool.update(self._filepath, '--template=%s' % tmpl, data)
def get_slice(self, start, end, names=None, average=5):
"""Get a slice of data from the database.
......@@ -202,13 +174,7 @@ class RrdModel(object):
if name in columns:
dst_data[:, dst_idx] = src_data[:, columns.index(name)]
# we compute dewpoint since it wasn't always available
if name == 'dewpoint':
temp = src_data[:, self.datasets.index('air_temp')].astype(np.float64)
rh = src_data[:, self.datasets.index('rh')].astype(np.float64)
dst_data[:, dst_idx] = dewpoint(temp, rh)
# get the wind direction in degrees from the vector components
# recompose the wind direction if asked for
elif name == 'wind_dir':
east = src_data[:, self.datasets.index('winddir_east')].astype(np.float64)
north = src_data[:, self.datasets.index('winddir_north')].astype(np.float64)
......
......@@ -3,12 +3,16 @@ from datetime import datetime, timedelta
import numpy
from metobs.data import hhmm_to_offset
from metobs import data as d
symbols = {
'TIME': {'type': numpy.int32},
'ACCURAIN': {'type': numpy.float32},
}
# Tower elevation above surface in feet
STATION_ELEV = 325
VARS = {'air_temp', 'rh', 'dewpoint',
'wind_speed', 'winddir_east', 'winddir_north',
'pressure', 'precip', 'accum_precip',
'solar_flux', 'altimeter'}
class LineParseError(BaseException):
......@@ -44,6 +48,7 @@ def parse_v0_record(line):
WDIR05305: Wind direction (degrees)
"""
parts = line.split()
# TODO: handle missing values
if len(parts) != 32:
msg = "Expected 32 line parts, got {:d}".format(len(parts))
raise LineParseError(msg)
......@@ -59,7 +64,67 @@ def parse_v0_record(line):
return stamp, raw_data
class RecordV1(dict):
class Record(dict):
@staticmethod
def create(line):
if line.startswith('TIME'):
record = _RecordV0(line)
else:
record = _RecordV1(line)
have = VARS.intersection(record.keys())
missing = VARS - have
assert not missing, "Missing vars: %s" % missing
return record
def add_winds(self):
east, north, spd = d.wind_vector_components(float(self['wind_speed']),
float(self['wind_dir']))
self['winddir_east'] = '%.3d' % east
self['winddir_north'] = '%.3d' % north
self['wind_speed'] = '%.3d' % spd
def add_altimeter(self, elev=STATION_ELEV):
self['altimeter'] = '%.3d' % d.altimeter(float(self['pressure']), elev)
def add_dewpoint(self):
self['dewpoint'] = '%.3d' % d.dewpoint(float(self['air_temp']),
float(self['rh']))
class _RecordV0(Record):
# maps v0 names to newer var names
names = {'ACCURAIN': 'accum_precip',
'TEMP107_1': 'box_air_temp',
'TEMP107_2': 'air_temp_2',
'TEMP107_3': 'air_temp_3',
'TEMP107_4': 'air_temp_4',
'LI200X': 'solar_flux',
'RH41372': 'rh',
'TEMP41372': 'air_temp',
'CS105': 'box_pressure',
'PAROSCI': 'pressure',
'WSPD05305': 'wind_speed',
'WDIR05305': 'wind_dir',
'CS10162': 'box_rh',
'RAIN380M': 'precip'}
def __init__(self, line):
super(_RecordV0, self).__init__()
stamp, data = parse_v0_record(line)
for legacyname, value in data.items():
if legacyname in self.names:
self[self.names[legacyname]] = float(value)
self.add_winds()
self.add_dewpoint()
self.add_altimeter()
self['stamp'] = stamp
class _RecordV1(Record):
"""
CSV (June 2 2012 to ...)
============================
......@@ -101,21 +166,26 @@ class RecordV1(dict):
'accum_precip', 'altimeter']
def __init__(self, line):
super(self.__class__, self).__init__()
super(_RecordV1, self).__init__()
parts = line.split(',')
if len(parts) != 29:
raise LineParseError("Expected 29 parts, got {:d}".format(len(parts)))
self.update({k: v for k, v in zip(self.names, parts)})
def __getattr__(self, name, default=None):
return self.get(name, default)
self.add_winds()
# we overwrite computed dewpoint and altimeter values with computed
# values, just to be consistent
self.add_dewpoint()
self.add_altimeter()
self.set_stamp()
def get_stamp(self):
def set_stamp(self):
year = int(self['year'])
doy = int(self['doy'])
dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
secs = hhmm_to_offset(self['hhmm'])
secs = d.hhmm_to_offset(self['hhmm'])
secs += float(self['sec'])
secs -= (secs % 5)
dt += timedelta(seconds=secs)
return dt
self['stamp'] = dt
......@@ -6,11 +6,12 @@ import logging
from datetime import datetime
from metobs.data import wind_vector_components
from aosstower.record import RecordV1, LineParseError
from aosstower.model import RrdModel, ModelError
from aosstower.record import Record, LineParseError
from aosstower import model as m
LOG = logging
if __name__ == '__main__':
import argparse
......@@ -22,8 +23,7 @@ if __name__ == '__main__':
help='Reference start date for database (YYYY-MM-DD)')
parser.add_argument('-d', '--db-days', type=int, default=365,
help='Size of DB in days')
parser.add_argument('files', nargs='?', type=argparse.FileType('r'),
default=sys.stdin,
parser.add_argument('-i', dest='files', type=argparse.FileType('r'),
help="List of time sorted input data files")
args = parser.parse_args()
......@@ -31,10 +31,15 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
assert not os.path.exists(args.outdb)
rrd = RrdModel(args.outdb)
rrd.initialize(args.db_start, days=args.db_days)
m.initialize(args.outdb, args.db_start, days=args.db_days)
rrd = m.RrdModel(args.outdb)
LOG.info("initilized %s", args.outdb)
if args.files is None:
LOG.info("files list not provided, reading from stdin")
LOG.info("Enter time ordered data files, one per line, ^D when done")
args.files = sys.stdin
for each in args.files.readlines():
fpath = each.strip()
if not os.path.exists(fpath):
......@@ -46,20 +51,13 @@ if __name__ == '__main__':
if not line.strip():
continue
try:
record = RecordV1(line)
record = Record.create(line)
except LineParseError as err:
LOG.error(str(err))
continue
windspd = float(record['wind_speed'])
winddir = float(record['wind_dir'])
u_e, u_n, spd = wind_vector_components(windspd, winddir)
record['winddir_east'] = u_e
record['winddir_north'] = u_n
record['wind_speed'] = spd
try:
rrd.add_record(record)
except ModelError:
rrd.add_record(record['stamp'], record)
except m.ModelError:
LOG.exception("Could not insert: %s", record)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment