From b67c3e5acdcd3fc6184dfdd516f051ce953f5455 Mon Sep 17 00:00:00 2001 From: Bruce <brucef@ssec.wisc.edu> Date: Tue, 27 May 2014 19:50:23 +0000 Subject: [PATCH] Good enough to create RRDs for entire archive --- aosstower/meta.py | 3 + aosstower/model.py | 140 +++++++++++++++------------------------ aosstower/record.py | 94 ++++++++++++++++++++++---- scripts/make_database.py | 30 ++++----- 4 files changed, 152 insertions(+), 115 deletions(-) create mode 100644 aosstower/meta.py mode change 100644 => 100755 scripts/make_database.py diff --git a/aosstower/meta.py b/aosstower/meta.py new file mode 100644 index 0000000..d9ef7af --- /dev/null +++ b/aosstower/meta.py @@ -0,0 +1,3 @@ + +# Data interval in seconds +DATA_INTERVAL = 5 diff --git a/aosstower/model.py b/aosstower/model.py index bdb51bc..b0a665d 100644 --- a/aosstower/model.py +++ b/aosstower/model.py @@ -6,24 +6,8 @@ from datetime import datetime, timedelta import rrdtool import numpy as np -from metobs.data import (wind_vector_degrees, to_unix_timestamp, - wind_vector_components) - - -def dewpoint(tempC, relhum): - """ - Algorithm from Tom Whittaker tempC is the temperature in degrees Celsius, - relhum is the relative humidity as a percentage. - - :param tempC: temperature in celsius - :param relhum: relative humidity as a percentage - """ - gasconst = 461.5 - latheat = 2500800.0 - - dp = (1.0 / (1.0 / (273.15 + tempC) - gasconst * np.log((0.0 + relhum) / 100) / (latheat - tempC * 2397.5))) - - return np.minimum(dp - 273.15, tempC) +from metobs.data import wind_vector_degrees, to_unix_timestamp +from aosstower import meta class ModelError(Exception): @@ -58,15 +42,45 @@ class WrapErrors(object): return cls +def initialize(filepath, start=None, days=365, data_interval=5): + """Create a new empty RRD database. + """ + assert not os.path.exists(filepath), "DB already exists" + start = start or (datetime.utcnow() - timedelta(days=days)) + # normalize start to data interval + secs = to_unix_timestamp(start) + secs -= secs % data_interval + + rrdtool.create(filepath, + '--start={}'.format(secs), + '--step={:d}'.format(data_interval), + 'DS:air_temp:GAUGE:10:-40:50', + 'DS:rh:GAUGE:10:0:100', + 'DS:dewpoint:GAUGE:10:0:100', + 'DS:wind_speed:GAUGE:10:0:100', + 'DS:winddir_north:GAUGE:10:-100:100', + 'DS:winddir_east:GAUGE:10:-100:100', + 'DS:pressure:GAUGE:10:0:1100', + 'DS:precip:GAUGE:10:0:100', + 'DS:accum_precip:GAUGE:10:0:100', + 'DS:solar_flux:GAUGE:10:0:1000', + 'DS:altimeter:GAUGE:10:0:100', + # native resolution + 'RRA:AVERAGE:0.5:1:6307200', + # 1 minute + 'RRA:AVERAGE:0.5:{:d}:525600'.format(60/data_interval), + # 5 minute + 'RRA:AVERAGE:0.5:{:d}:105120'.format(300/data_interval), + # 30 minute + 'RRA:AVERAGE:0.5:{:d}:17520'.format(1800/data_interval)) + + @WrapErrors(rrdtool.error) class RrdModel(object): """Model for storing the Level0 uncalibrated data for non-scientific purposes, such as web-widgets. """ - # data interval in seconds - DATA_INTERVAL = 5 - def __init__(self, filepath): self._filepath = filepath self._averages = tuple() @@ -99,71 +113,29 @@ class RrdModel(object): self._averages = tuple(sorted(averages)) return self._averages - def initialize(self, start=None, days=365): - """Create a new empty RRD database. - """ - assert not os.path.exists(self._filepath), "DB already exists" - start = start or (datetime.utcnow() - timedelta(days=days)) - # normalize start to data interval - secs = to_unix_timestamp(start) - secs -= secs % self.DATA_INTERVAL - - rrdtool.create(self._filepath, - '--start={}'.format(secs), - '--step={:d}'.format(self.DATA_INTERVAL), - 'DS:air_temp:GAUGE:10:-40:50', - 'DS:rh:GAUGE:10:0:100', - 'DS:dewpoint:GAUGE:10:0:100', - 'DS:wind_speed:GAUGE:10:0:100', - 'DS:winddir_north:GAUGE:10:-100:100', - 'DS:winddir_east:GAUGE:10:-100:100', - 'DS:pressure:GAUGE:10:0:1100', - 'DS:precip:GAUGE:10:0:100', - 'DS:accum_precip:GAUGE:10:0:100', - 'DS:solar_flux:GAUGE:10:0:1000', - 'DS:altimeter:GAUGE:10:0:100', - 'RRA:AVERAGE:0.5:1:6307200', - 'RRA:AVERAGE:0.5:{:d}:525600'.format(60/self.DATA_INTERVAL), - 'RRA:AVERAGE:0.5:{:d}:105120'.format(300/self.DATA_INTERVAL), - 'RRA:AVERAGE:0.5:{:d}:17520'.format(1800/self.DATA_INTERVAL)) - def _format_data(self, stamp, data): - """Format data for insert into RRD. + """Format data for insert into RRD returning a template string and data + line appropriate for arguments to rrdupdate. """ - values = ':'.join([str(data[k]) for k in self.datasets]) + validkeys = set(self.datasets).intersection(data.keys()) + if not validkeys: + raise ModelError("No valid data keys provided", data) + tmpl = ':'.join(validkeys) + values = ':'.join([str(data[k]) for k in validkeys]) values = '{:d}@{}'.format(to_unix_timestamp(stamp), values) - return values + return tmpl, values - def _record_to_data(self, record): - """Turn a tower record into database data. - """ - - expected_keys = set(self.datasets) - {'winddir_north', 'winddir_east'} - missing_keys = expected_keys - set(record.keys()) - if missing_keys: - raise ModelError("Missing datasets %s" % missing_keys) - - data = {} - wspd, wdir = (float(record.wind_speed), float(record.wind_dir)) - winds = wind_vector_components(wspd, wdir) - data['winddir_east'] = winds[0] - data['winddir_north'] = winds[1] - data['wind_speed'] = winds[2] - for name in self.datasets: - if name in record: - data[name] = record[name] - return data - - def add_record(self, record): - """Add a single record to the database. + def add_record(self, stamp, record): + """Add a single record to the database, where a record is a dict like + object with keys for each dataset. Additional keys are ignored. """ # Normalize to data interval - utime = to_unix_timestamp(record.get_stamp()) - stamp = datetime.utcfromtimestamp(utime - utime % self.DATA_INTERVAL) - data = self._record_to_data(record) - rrdtool.update(self._filepath, - '--template=%s' % ':'.join(self.datasets), - self._format_data(stamp, data)) + utime = to_unix_timestamp(stamp) + data_interval = min(self.averaging_intervals()) + stamp = datetime.utcfromtimestamp(utime - utime % data_interval) + + tmpl, data = self._format_data(stamp, dict(record)) + rrdtool.update(self._filepath, '--template=%s' % tmpl, data) def get_slice(self, start, end, names=None, average=5): """Get a slice of data from the database. @@ -202,13 +174,7 @@ class RrdModel(object): if name in columns: dst_data[:, dst_idx] = src_data[:, columns.index(name)] - # we compute dewpoint since it wasn't always available - if name == 'dewpoint': - temp = src_data[:, self.datasets.index('air_temp')].astype(np.float64) - rh = src_data[:, self.datasets.index('rh')].astype(np.float64) - dst_data[:, dst_idx] = dewpoint(temp, rh) - - # get the wind direction in degrees from the vector components + # recompose the wind direction if asked for elif name == 'wind_dir': east = src_data[:, self.datasets.index('winddir_east')].astype(np.float64) north = src_data[:, self.datasets.index('winddir_north')].astype(np.float64) diff --git a/aosstower/record.py b/aosstower/record.py index 13dc6e1..ca29d75 100644 --- a/aosstower/record.py +++ b/aosstower/record.py @@ -3,12 +3,16 @@ from datetime import datetime, timedelta import numpy -from metobs.data import hhmm_to_offset +from metobs import data as d -symbols = { - 'TIME': {'type': numpy.int32}, - 'ACCURAIN': {'type': numpy.float32}, -} + +# Tower elevation above surface in feet +STATION_ELEV = 325 + +VARS = {'air_temp', 'rh', 'dewpoint', + 'wind_speed', 'winddir_east', 'winddir_north', + 'pressure', 'precip', 'accum_precip', + 'solar_flux', 'altimeter'} class LineParseError(BaseException): @@ -44,6 +48,7 @@ def parse_v0_record(line): WDIR05305: Wind direction (degrees) """ parts = line.split() + # TODO: handle missing values if len(parts) != 32: msg = "Expected 32 line parts, got {:d}".format(len(parts)) raise LineParseError(msg) @@ -59,7 +64,67 @@ def parse_v0_record(line): return stamp, raw_data -class RecordV1(dict): +class Record(dict): + + @staticmethod + def create(line): + if line.startswith('TIME'): + record = _RecordV0(line) + else: + record = _RecordV1(line) + have = VARS.intersection(record.keys()) + missing = VARS - have + assert not missing, "Missing vars: %s" % missing + return record + + def add_winds(self): + east, north, spd = d.wind_vector_components(float(self['wind_speed']), + float(self['wind_dir'])) + + self['winddir_east'] = '%.3d' % east + self['winddir_north'] = '%.3d' % north + self['wind_speed'] = '%.3d' % spd + + def add_altimeter(self, elev=STATION_ELEV): + self['altimeter'] = '%.3d' % d.altimeter(float(self['pressure']), elev) + + def add_dewpoint(self): + self['dewpoint'] = '%.3d' % d.dewpoint(float(self['air_temp']), + float(self['rh'])) + + +class _RecordV0(Record): + # maps v0 names to newer var names + names = {'ACCURAIN': 'accum_precip', + 'TEMP107_1': 'box_air_temp', + 'TEMP107_2': 'air_temp_2', + 'TEMP107_3': 'air_temp_3', + 'TEMP107_4': 'air_temp_4', + 'LI200X': 'solar_flux', + 'RH41372': 'rh', + 'TEMP41372': 'air_temp', + 'CS105': 'box_pressure', + 'PAROSCI': 'pressure', + 'WSPD05305': 'wind_speed', + 'WDIR05305': 'wind_dir', + 'CS10162': 'box_rh', + 'RAIN380M': 'precip'} + + def __init__(self, line): + super(_RecordV0, self).__init__() + stamp, data = parse_v0_record(line) + for legacyname, value in data.items(): + if legacyname in self.names: + self[self.names[legacyname]] = float(value) + + self.add_winds() + self.add_dewpoint() + self.add_altimeter() + + self['stamp'] = stamp + + +class _RecordV1(Record): """ CSV (June 2 2012 to ...) ============================ @@ -101,21 +166,26 @@ class RecordV1(dict): 'accum_precip', 'altimeter'] def __init__(self, line): - super(self.__class__, self).__init__() + super(_RecordV1, self).__init__() parts = line.split(',') if len(parts) != 29: raise LineParseError("Expected 29 parts, got {:d}".format(len(parts))) self.update({k: v for k, v in zip(self.names, parts)}) - def __getattr__(self, name, default=None): - return self.get(name, default) + self.add_winds() + # we overwrite computed dewpoint and altimeter values with computed + # values, just to be consistent + self.add_dewpoint() + self.add_altimeter() + + self.set_stamp() - def get_stamp(self): + def set_stamp(self): year = int(self['year']) doy = int(self['doy']) dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j') - secs = hhmm_to_offset(self['hhmm']) + secs = d.hhmm_to_offset(self['hhmm']) secs += float(self['sec']) secs -= (secs % 5) dt += timedelta(seconds=secs) - return dt + self['stamp'] = dt diff --git a/scripts/make_database.py b/scripts/make_database.py old mode 100644 new mode 100755 index dc15656..eea1b98 --- a/scripts/make_database.py +++ b/scripts/make_database.py @@ -6,11 +6,12 @@ import logging from datetime import datetime from metobs.data import wind_vector_components -from aosstower.record import RecordV1, LineParseError -from aosstower.model import RrdModel, ModelError +from aosstower.record import Record, LineParseError +from aosstower import model as m LOG = logging + if __name__ == '__main__': import argparse @@ -22,8 +23,7 @@ if __name__ == '__main__': help='Reference start date for database (YYYY-MM-DD)') parser.add_argument('-d', '--db-days', type=int, default=365, help='Size of DB in days') - parser.add_argument('files', nargs='?', type=argparse.FileType('r'), - default=sys.stdin, + parser.add_argument('-i', dest='files', type=argparse.FileType('r'), help="List of time sorted input data files") args = parser.parse_args() @@ -31,10 +31,15 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) assert not os.path.exists(args.outdb) - rrd = RrdModel(args.outdb) - rrd.initialize(args.db_start, days=args.db_days) + m.initialize(args.outdb, args.db_start, days=args.db_days) + rrd = m.RrdModel(args.outdb) LOG.info("initilized %s", args.outdb) + if args.files is None: + LOG.info("files list not provided, reading from stdin") + LOG.info("Enter time ordered data files, one per line, ^D when done") + args.files = sys.stdin + for each in args.files.readlines(): fpath = each.strip() if not os.path.exists(fpath): @@ -46,20 +51,13 @@ if __name__ == '__main__': if not line.strip(): continue try: - record = RecordV1(line) + record = Record.create(line) except LineParseError as err: LOG.error(str(err)) continue - windspd = float(record['wind_speed']) - winddir = float(record['wind_dir']) - u_e, u_n, spd = wind_vector_components(windspd, winddir) - record['winddir_east'] = u_e - record['winddir_north'] = u_n - record['wind_speed'] = spd - try: - rrd.add_record(record) - except ModelError: + rrd.add_record(record['stamp'], record) + except m.ModelError: LOG.exception("Could not insert: %s", record) -- GitLab