Skip to content
Snippets Groups Projects
Commit c7094fc0 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Working to create RRD database.

parent 8b3ab15c
No related branches found
No related tags found
No related merge requests found
*.csv
\ No newline at end of file
from collections import OrderedDict
from datetime import datetime, timedelta
import rrdtool
import numpy as np
from .time import to_unix_timestamp
from .wind import mean_wind_vector_degrees
def dewpoint(tempC, relhum):
"""
Algorithm from Tom Whittaker tempC is the temperature in degrees Celsius,
relhum is the relative humidity as a percentage.
:param tempC: temperature in celsius
:param relhum: relative humidity as a percentage
"""
gasconst = 461.5
latheat = 2500800.0
dp = (1.0 / (1.0 / (273.15 + tempC)
- gasconst * np.log((0.0 + relhum) / 100) / (latheat - tempC * 2397.5)))
return np.minimum(dp - 273.15, tempC)
class RrdModel(object):
keys = ['air_temp', 'rh', 'dewpoint',
'wind_speed', 'winddir_north', 'winddir_east',
'pressure', 'precip', 'accum_precip', 'solar_flux',
'altimeter']
def __init__(self, filepath):
self._filepath = filepath
def initialize(self, start=None):
start = start or datetime.now() - timedelta(days=365)
secs = to_unix_timestamp(start)
rrdtool.create(self._filepath,
'--start={}'.format(secs),
'--step=5',
'DS:air_temp:GAUGE:10:-40:50',
'DS:rh:GAUGE:10:0:100',
'DS:dewpoint:GAUGE:10:0:100',
'DS:wind_speed:GAUGE:10:0:100',
'DS:winddir_north:GAUGE:10:-100:100',
'DS:winddir_east:GAUGE:10:-100:100',
'DS:pressure:GAUGE:10:0:1100',
'DS:precip:GAUGE:10:0:100',
'DS:accum_precip:GAUGE:10:0:100',
'DS:solar_flux:GAUGE:10:0:1000',
'DS:altimeter:GAUGE:10:0:100',
'RRA:AVERAGE:0.5:1:6307200',
'RRA:AVERAGE:0.5:12:525600',
'RRA:AVERAGE:0.5:60:105120',
'RRA:AVERAGE:0.5:360:17520')
def _print(self, record):
stamp = record.get_stamp()
values = ':'.join([str(record[k]) for k in self.keys])
values = '{:d}:{}'.format(to_unix_timestamp(stamp), values)
print values
def get_slice(self, start, end, names=None, average=5):
names = names or self.keys[:]
if isinstance(start, datetime):
start = to_unix_timestamp(start)
if isinstance(end, datetime):
end = to_unix_timestamp(end)
# normalize request times to averaging interval
start -= start % average
end -= end % average
range, columns, rawdata = rrdtool.fetch(self._filepath,
'AVERAGE',
'-r {:d}'.format(average),
'-s {:d}'.format(start),
'-e {:d}'.format(end))
src_data = np.array(rawdata)
dst_data = np.zeros((src_data.shape[0], len(names))) * float('nan')
# get only the columns we're interested in
for dst_idx, name in enumerate(names):
if name in columns:
dst_data[:,dst_idx] = src_data[:,columns.index(name)]
# we compute dewpoint since it wasn't always available
if name == 'dewpoint':
temp = src_data[:,self.keys.index('air_temp')].astype(np.float64)
rh = src_data[:,self.keys.index('rh')].astype(np.float64)
dst_data[:,dst_idx] = dewpoint(temp, rh)
# get the wind direction in degrees from the vector components
elif name == 'wind_dir':
east = src_data[:,self.keys.index('winddir_east')].astype(np.float64)
north = src_data[:,self.keys.index('winddir_north')].astype(np.float64)
dst_data[:,dst_idx] = mean_wind_vector_degrees(east, north)
times = np.array([np.arange(start, end + average, average)])
return np.concatenate((times.T, dst_data), axis=1)
# coding=utf-8
from datetime import datetime, timedelta
import numpy
from .time import hhmm_to_secs
symbols = {
'TIME': {'type': numpy.int32},
'ACCURAIN': {'type': numpy.float32},
}
class LineParseError(BaseException):
"""Error parsing line of record data.
"""
@classmethod
def raise_wrapped(cls, exception, msg=None):
import sys
traceback = sys.exc_info()[2]
msg = msg or str(exception)
raise cls(msg), None, traceback
def parse_v0_record(line):
"""
Key/Value (Before June 2 2012)
==============================
TIME: Seconds since Jan 1, 1970
ACCURAIN: Accumulated precipitation (mm)
TEMP107_4: Auxillary temperature*
LI200X: Solar Flux (w/m^2)
TEMP107_1: Box temperature*
RH41372: Relative humidity (%)
TEMP107_5: Auxillary temperature*
CS105: Box pressure*
PAROSCI: Pressure (hPa)
WSPD05305: Wind speed (m/s)
TEMP107_3: Axullary temperature*
CS10162: Box relative humidity*
RAIN380M: Precipitation (0.01in)
TEMP107_2: Outside box temperature*
TEMP41372: Air temperature (ºC)
WDIR05305: Wind direction (degrees)
"""
parts = line.split()
if len(parts) != 32:
msg = "Expected 32 line parts, got {:d}".format(len(parts))
raise LineParseError(msg)
raw_data = {k:v for k, v in zip(parts[0::2], parts[1::2])}
time_str = raw_data['TIME']
try:
unix_time = int(time_str)
except ValueError as err:
msg = "Could not parse unix time from {}".format(time_str)
LineParseError.raise_wrapped(err, msg)
stamp = datetime.utcfromtimestamp(unix_time)
return stamp, raw_data
class RecordV1(dict):
"""
CSV (June 2 2012 to ...)
============================
StationId*
Year
Day of year
HourMinute
Seconds
Box Presure*
ParoSci Air Temperature period*
ParoSci Pressure period*
ParoSci Air Temperature*
Pressure (hPa)
ParoSci Calc. Sig.*
Box relative humidity*
Box air temperature*
Auxillary Air Temp2*
Auxillary Air Temp3*
Auxillary Air Temp4*
Wind Speed (m/s)
Wind Direction (degrees)
RH Shield Freq.*
Relative Humidity (%)
Air Temperature 6.3m (ºC)
Dewpoint (ºC)
RTD Shield Freq.*
Air temperature (ºC)
Solar Flux (w/m^s)
Precipitation (.01in)
Acumulated Precip (mm) *reset at 0z
Altimeter (inHg)
"""
names = ['station_id', 'year', 'doy', 'hhmm', 'sec', 'box_pressure',
'paro_air_temp_period', 'paro_pressure_period', 'paro_air_temp',
'pressure', 'paro_cal_sig', 'box_rh', 'box_air_temp',
'air_temp_2', 'air_temp_3', 'air_temp_4', 'wind_speed', 'wind_dir',
'rh_shield_freq', 'rh', 'air_temp_6-3m', 'dewpoint',
'rtd_shield_freq', 'air_temp', 'solar_flux', 'precip',
'accum_precip', 'altimeter']
def __init__(self, line):
super(self.__class__, self).__init__()
parts = line.split(',')
if len(parts) != 29:
raise LineParseError("Expected 29 parts, got {:d}".format(len(parts)))
self.update({k: v for k, v in zip(self.names, parts)})
def __getattr__(self, name, default=None):
return self.get(name, default)
def get_stamp(self):
year = int(self['year'])
doy = int(self['doy'])
dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
secs = hhmm_to_secs(self['hhmm'])
secs += float(self['sec'])
secs -= (secs % 5)
dt += timedelta(seconds=secs)
return dt
from numpy import *
schema_v0 = {
'TIME': {'type': int32},
'ACCURAIN': {'type': float32,
'standard_name': 'air_temperature',
'name': 'accumulated_precipitation',
'description': 'Accumulated precipitation',
'units': 'mm'},
'TEMP107_4': {'type': float32,
'standard_name': 'air_temperature',
'name': 'box_temp',
'description': 'Auxiliary temperature',
'units': 'degC'},
'LI200X': {'type': float32,
'name': 'solar_flux',
'description': 'Solar flux',
'units': 'w*m^-2'},
'TEMP107_1': {'type': float32,
'standard_name': 'air_temperature',
'name': 'box_temp',
'description': 'Temp inside the data logger enclosure',
'units': 'degC'},
'RH41372': {'type': float32,
'standard_name': 'relative_humidity',
'name': 'rh',
'description': 'Relative humidity',
'units': '%'},
'TEMP107_5': {'type': float32,
'standard_name': 'air_temperature',
'description': 'Auxiliary temperature',
'name': 'air_temp5',
'units': 'degC'},
'CS105': {'type': float32,
'standard_name': 'air_pressure',
'name': 'box_pressure',
'description': 'Air pressure inside the data logger enclosure',
'units': 'hPa'},
'PAROSCI': {'type': float32,
'standard_name': 'air_pressure',
'name': 'pressure',
'description': 'Air pressure measured by ParoScientific sensor',
'units': 'hPa'},
'WSPD05305': {'type': float32,
'standard_name': 'wind_speed',
'name': 'wind_speed',
'description': 'Wind speed',
'units': 'm/s'},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
}
\ No newline at end of file
from datetime import datetime
def test_to_unix_timestamp():
from aosstower.time import to_unix_timestamp
assert to_unix_timestamp(datetime(1970, 1, 1)) == 0
def test_hhmm_to_secs():
from aosstower.time import hhmm_to_secs
assert hhmm_to_secs('2400') == 86400, "Can't handle > 23:59"
assert hhmm_to_secs('2401') == 86460, "Can't handle > 23:59"
assert hhmm_to_secs('0') == 0, "Can't handle short times"
assert hhmm_to_secs('001') == 60, "Can't handle leading 0"
import unittest
class MeanVectorAverageTests(unittest.TestCase):
def _fut(self, input_windspd, input_winddir):
from aosstower.wind import mean_wind_vector
return mean_wind_vector(input_windspd, input_winddir)
def test_scalar_input(self):
for wdir, wspd in [(0, 20), (90, 20), (180, 20), (270, 20)]:
output_speed, output_dir = self._fut(wspd, wdir)
self.assertEqual(wdir, output_dir)
self.assertEqual(wspd, 20)
def test_array_input(self):
for wdir, wspd in [([0, 90, 180, 270],[10, 10, 10, 10])]:
output_speed, output_winddir = self._fut(wspd, wdir)
self.assertEqual(wdir, output_winddir.tolist())
self.assertEqual(wspd, output_speed.tolist())
from calendar import timegm
from datetime import datetime, timedelta
def to_unix_timestamp(dt):
return int(timegm(dt.utctimetuple()))
def hhmm_to_secs(hhmm):
val = int(hhmm)
hhmm = '{:04d}'.format(int(hhmm))
return timedelta(hours=int(hhmm[0:2]),
minutes=int(hhmm[2:])).total_seconds()
import numpy as np
def mean_wind_vector_components(windspd, winddir):
"""Decompose scalar or list/array wind direction and speed data into the
corresponding horizontal and vertical direction components and speed
vector.
"""
dir_rad = np.deg2rad(winddir)
V_e = windspd * np.sin(dir_rad)
V_n = windspd * np.cos(dir_rad)
U_spd = (V_e**2 + V_n**2)**0.5
return V_e, V_n, U_spd
def mean_wind_vector_degrees(vector_east, vector_north):
"""Re-compose horizontal (east/west) and vertical (north/south) vector
components into wind direction in degrees.
"""
rads = np.arctan2(vector_east, vector_north)
winddir = np.rad2deg(rads)
if isinstance(winddir, np.ndarray):
winddir[np.less(winddir, 0)] += 360
elif winddir < 0:
winddir += 360
return winddir
def mean_wind_vector(windspd, winddir):
"""Compute the mean wind vector.
See: Campbell Scientific CR1000 Manual Section 7.8.5.2.2.
"""
vector_east, vector_north, vector_speed = \
mean_wind_vector_components(windspd, winddir)
return vector_speed, mean_wind_vector_degrees(vector_east, vector_north)
\ No newline at end of file
#!/usr/bin/env python
import sys
import glob
from datetime import datetime
from aosstower.record import RecordV1, LineParseError
from aosstower.model import RrdModel
from aosstower import wind
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('db')
parser.add_argument('path')
args = parser.parse_args()
model = RrdModel(args.db)
model.initialize(datetime(2013, 1, 1))
for filepath in glob.glob(args.path):
print >> sys.stderr, filepath
for line in open(filepath).readlines():
if not line.strip():
continue
try:
record = RecordV1(line)
except LineParseError as err:
continue
windspd = float(record['wind_speed'])
winddir = float(record['wind_dir'])
u_e, u_n, spd = wind.mean_wind_vector_components(windspd, winddir)
record['winddir_east'] = u_e
record['winddir_north'] = u_n
record['wind_speed'] = spd
try:
model._print(record)
except Exception as err:
raise
#!/usr/bin/env bash
rrdtool create aoss_tower.rrd \
--start=1356890400 \
--step=5 \
DS:air_temp:GAUGE:10:-40:50 \
DS:rh:GAUGE:10:0:100 \
DS:dewpoint:GAUGE:10:0:100 \
DS:wind_speed:GAUGE:10:0:100 \
DS:winddir_north:GAUGE:10:-100:100 \
DS:winddir_east:GAUGE:10:-100:100 \
DS:pressure:GAUGE:10:0:1100 \
DS:precip:GAUGE:10:0:100 \
DS:accum_precip:GAUGE:10:0:100 \
DS:solar_flux:GAUGE:10:0:1000 \
DS:altimeter:GAUGE:10:0:100 \
RRA:AVERAGE:0.5:1:6307200 \
RRA:AVERAGE:0.5:12:525600 \
RRA:AVERAGE:0.5:60:105120 \
RRA:AVERAGE:0.5:360:17520
\ No newline at end of file
#!/usr/bin/env python
import argparse
from datetime import datetime, timedelta
from aosstower.model import RrdModel
averages = {
3: 60,
6: 60,
12: 300,
18: 300,
24: 300
}
def main(db, outfile, vars, hours):
model = RrdModel(db)
average = averages[hours]
start = datetime(2013, 4, 1)
end = start + timedelta(hours=hours)
data = model.get_slice(start,
end,
names=vars,
average=average)
fptr = open(outfile, 'wt')
fptr.write('Time,' + ','.join(vars) + '\n')
for row in data:
dt = datetime.utcfromtimestamp(row[0])
fptr.write(dt.strftime('%Y/%m/%d %H:%M:%S'))
for val in row[1:]:
if val is None:
fptr.write(',NaN')
else:
fptr.write(',{}'.format(val))
fptr.write('\n')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('hours', type=int, default=3, choices=[3, 6, 12, 18, 24])
parser.add_argument('db')
parser.add_argument('outfile')
parser.add_argument('var', nargs='+')
args = parser.parse_args()
print args.var
main(args.db, args.outfile, args.var, args.hours)
......@@ -4,11 +4,10 @@ except ImportError:
from ez_setup import use_setuptools
use_setuptools()
from setuptools import setup, find_packages
import versiontools_support
setup(
name='AossTower',
version=':versiontools:aosstower',
version='0.1',
zip_safe=True,
description='UW AOSS Rooftop Instrument Group Met Tower',
url='http://metobs.ssec.wisc.edu',
......@@ -16,7 +15,7 @@ setup(
author_email='',
scripts=[],
install_requires=[
'metobs.db >= 0.4dev-r1724',
'python-rrdtool'
],
dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'],
packages=find_packages(exclude=['aosstower.tests']),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment