Skip to content
Snippets Groups Projects
Commit f80dce5e authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Re-organize again.

parent 09633f6c
No related branches found
No related tags found
No related merge requests found
# coding=utf-8
import re
import logging
from datetime import datetime, timedelta
import numpy
from collections import defaultdict
from metobs import data as d
from aosstower import station
from aosstower.schema import database
# Tower elevation above surface in feet
STATION_ELEV = 325
LOG = logging.getLogger(__name__)
VARS = {'air_temp', 'rh', 'dewpoint',
'wind_speed', 'winddir_east', 'winddir_north',
'pressure', 'precip', 'accum_precip',
'solar_flux', 'altimeter'}
class LineParseError(BaseException):
class LineParseError(Exception):
"""Error parsing line of record data.
"""
@classmethod
......@@ -25,58 +23,6 @@ class LineParseError(BaseException):
msg = msg or str(exception)
raise cls(msg), None, traceback
def parse_v0_record(line):
"""
Key/Value (Before June 2 2012)
==============================
TIME: Seconds since Jan 1, 1970
ACCURAIN: Accumulated precipitation (mm)
TEMP107_4: Auxillary temperature*
LI200X: Solar Flux (w/m^2)
TEMP107_1: Box temperature*
RH41372: Relative humidity (%)
TEMP107_5: Auxillary temperature*
CS105: Box pressure*
PAROSCI: Pressure (hPa)
WSPD05305: Wind speed (m/s)
TEMP107_3: Axullary temperature*
CS10162: Box relative humidity*
RAIN380M: Precipitation (0.01in)
TEMP107_2: Outside box temperature*
TEMP41372: Air temperature (ºC)
WDIR05305: Wind direction (degrees)
"""
parts = line.split()
# TODO: handle missing values
if len(parts) != 32:
msg = "Expected 32 line parts, got {:d}".format(len(parts))
raise LineParseError(msg)
raw_data = {k: v for k, v in zip(parts[0::2], parts[1::2])}
time_str = raw_data['TIME']
try:
unix_time = int(time_str)
except ValueError as err:
msg = "Could not parse unix time from {}".format(time_str)
LineParseError.raise_wrapped(err, msg)
else:
stamp = datetime.utcfromtimestamp(unix_time)
return stamp, raw_data
class Record(dict):
@staticmethod
def create(line):
if line.startswith('TIME'):
record = _RecordV0(line)
else:
record = _RecordV1(line)
have = VARS.intersection(record.keys())
missing = VARS - have
assert not missing, "Missing vars: %s" % missing
return record
def add_winds(self):
east, north, spd = d.wind_vector_components(float(self['wind_speed']),
float(self['wind_dir']))
......@@ -85,21 +31,32 @@ class Record(dict):
self['winddir_north'] = '%.3d' % north
self['wind_speed'] = '%.3d' % spd
def add_altimeter(self, elev=STATION_ELEV):
def add_altimeter(self, elev=station.ELEVATION):
self['altimeter'] = '%.3d' % d.altimeter(float(self['pressure']), elev)
def add_dewpoint(self):
self['dewpoint'] = '%.3d' % d.dewpoint(float(self['air_temp']),
self['dewpoint'] = '%.3d' % d.dewpoint(float(self['air_temp']),
float(self['rh']))
class _RecordV0(Record):
# maps v0 names to newer var names
def _make_record(data):
for key in data:
if key == 'stamp':
continue
if key in database:
data[key] = database[key].type(data[key])
return defaultdict(lambda: float('nan'), data)
class ParserV0(object):
# maps v0 names to names in schema db
names = {'ACCURAIN': 'accum_precip',
'TEMP107_1': 'box_air_temp',
'TEMP107_2': 'air_temp_2',
'TEMP107_3': 'air_temp_3',
'TEMP107_4': 'air_temp_4',
'TEMP107_5': 'air_temp_5',
'LI200X': 'solar_flux',
'RH41372': 'rh',
'TEMP41372': 'air_temp',
......@@ -110,82 +67,83 @@ class _RecordV0(Record):
'CS10162': 'box_rh',
'RAIN380M': 'precip'}
def __init__(self, line):
super(_RecordV0, self).__init__()
stamp, data = parse_v0_record(line)
for legacyname, value in data.items():
if legacyname in self.names:
self[self.names[legacyname]] = float(value)
self.add_winds()
self.add_dewpoint()
self.add_altimeter()
self['stamp'] = stamp
@staticmethod
def maybe_mine(line):
return line.startswith('TIME')
def parse(self, line):
parts = line.split()
if len(parts) != 32:
raise LineParseError("Expected 32 components", line)
raw_data = {}
for k1, v1 in {k: v for k, v in zip(parts[0::2], parts[1::2])}.items():
if k1 == 'TIME':
continue
if k1 in self.names:
raw_data[self.names[k1]] = v1
else:
raise LineParseError("Unexpected var: %s" % k1, line)
try:
time_str = parts[1]
unix_time = int(time_str)
raw_data['stamp'] = datetime.utcfromtimestamp(unix_time)
except (ValueError, TypeError):
raise LineParseError("Could not parse stamp", line)
return _make_record(raw_data)
class ParserV1(object):
class _RecordV1(Record):
"""
CSV (June 2 2012 to ...)
============================
StationId*
Year
Day of year
HourMinute
Seconds
Box Presure*
ParoSci Air Temperature period*
ParoSci Pressure period*
ParoSci Air Temperature*
Pressure (hPa)
ParoSci Calc. Sig.*
Box relative humidity*
Box air temperature*
Auxillary Air Temp2*
Auxillary Air Temp3*
Auxillary Air Temp4*
Wind Speed (m/s)
Wind Direction (degrees)
RH Shield Freq.*
Relative Humidity (%)
Air Temperature 6.3m (ºC)
Dewpoint (ºC)
RTD Shield Freq.*
Air temperature (ºC)
Solar Flux (w/m^s)
Precipitation (.01in)
Acumulated Precip (mm) *reset at 0z
Altimeter (inHg)
"""
names = ['station_id', 'year', 'doy', 'hhmm', 'sec', 'box_pressure',
'paro_air_temp_period', 'paro_pressure_period', 'paro_air_temp',
'pressure', 'paro_cal_sig', 'box_rh', 'box_air_temp',
'air_temp_2', 'air_temp_3', 'air_temp_4', 'wind_speed', 'wind_dir',
'rh_shield_freq', 'rh', 'air_temp_6-3m', 'dewpoint',
'rh_shield_freq', 'rh', 'air_temp_6_3m', 'dewpoint',
'rtd_shield_freq', 'air_temp', 'solar_flux', 'precip',
'accum_precip', 'altimeter']
def __init__(self, line):
super(_RecordV1, self).__init__()
parts = line.split(',')
if len(parts) != 29:
raise LineParseError("Expected 29 parts, got {:d}".format(len(parts)))
self.update({k: v for k, v in zip(self.names, parts)})
self.add_winds()
# we overwrite computed dewpoint and altimeter values with computed
# values, just to be consistent
self.add_dewpoint()
self.add_altimeter()
self.set_stamp()
def set_stamp(self):
year = int(self['year'])
doy = int(self['doy'])
@staticmethod
def maybe_mine(line):
return re.search('^\d,\d{4},\d{1,3}', line) is not None
def _get_stamp(self, data):
year = int(data['year'])
doy = int(data['doy'])
dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
secs = d.hhmm_to_offset(self['hhmm'])
secs += float(self['sec'])
secs = d.hhmm_to_offset(data['hhmm'])
secs += float(data['sec'])
secs -= (secs % 5)
dt += timedelta(seconds=secs)
self['stamp'] = dt
return dt
def parse(self, line):
parts = line.split(',')
if len(parts) != 29:
raise LineParseError("Expected 28 parts", line)
raw_data = {k: v for k, v in zip(self.names, parts)}
try:
raw_data['stamp'] = self._get_stamp(raw_data)
except (TypeError, ValueError):
raise LineParseError("Could not parse timesamp", line)
return _make_record(raw_data)
def read_records(source, error_handler=lambda *a: None):
"""Returns a generator for reading records from `source`. Records are
checked line-by-line so record line versions may be mixed.
"""
if hasattr(source, 'readlines'):
fptr = source
else:
fptr = open(source)
for idx, line in enumerate(fptr.readlines()):
for parser in [ParserV1(), ParserV0()]:
if parser.maybe_mine(line):
try:
yield parser.parse(line)
break # forces 'else' to execute
except LineParseError as err:
error_handler(idx + 1, line, err)
else:
error_handler(idx + 1, line, RuntimeError("no parser found", line))
"""The data model used for the MetObs widgets using tower data.
"""
import os
from datetime import datetime, timedelta
import rrdtool
from metobs.data import wind_vector_degrees, to_unix_timestamp
from metobs.data import to_unix_timestamp
# minimum set of records for the tower
VARS = {'air_temp', 'rh', 'dewpoint',
'wind_speed', 'winddir_east', 'winddir_north',
'pressure', 'precip', 'accum_precip',
'solar_flux', 'altimeter'}
def initialize(filepath, start=None, days=365, data_interval=5):
def initialize_rrd(filepath, start=None, days=365, data_interval=5):
"""Create a new empty RRD database.
"""
assert not os.path.exists(filepath), "DB already exists"
......
# Data interval in seconds
DATA_INTERVAL = 5
from numpy import *
from numpy import float32
from collections import namedtuple
schema_v0 = {
'TIME': {'type': int32},
'ACCURAIN': {'type': float32,
'standard_name': 'air_temperature',
'name': 'accumulated_precipitation',
'description': 'Accumulated precipitation',
'units': 'mm'},
'TEMP107_4': {'type': float32,
'standard_name': 'air_temperature',
'name': 'box_temp',
'description': 'Auxiliary temperature',
'units': 'degC'},
'LI200X': {'type': float32,
'name': 'solar_flux',
'description': 'Solar flux',
'units': 'w*m^-2'},
'TEMP107_1': {'type': float32,
'standard_name': 'air_temperature',
'name': 'box_temp',
'description': 'Temp inside the data logger enclosure',
'units': 'degC'},
'RH41372': {'type': float32,
'standard_name': 'relative_humidity',
'name': 'rh',
'description': 'Relative humidity',
'units': '%'},
'TEMP107_5': {'type': float32,
'standard_name': 'air_temperature',
'description': 'Auxiliary temperature',
'name': 'air_temp5',
'units': 'degC'},
'CS105': {'type': float32,
'standard_name': 'air_pressure',
'name': 'box_pressure',
'description': 'Air pressure inside the data logger enclosure',
'units': 'hPa'},
'PAROSCI': {'type': float32,
'standard_name': 'air_pressure',
'name': 'pressure',
'description': 'Air pressure measured by ParoScientific sensor',
'units': 'hPa'},
'WSPD05305': {'type': float32,
'standard_name': 'wind_speed',
'name': 'wind_speed',
'description': 'Wind speed',
'units': 'm/s'},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
'ACCURAIN': {'type': float32},
Var = namedtuple('Var', ['type', 'standard_name', 'name', 'description', 'units'])
}
\ No newline at end of file
database = dict(
box_temp=Var(
float32,
'air_temperature',
'box_temp',
'Auxillary Temperature',
'degC'),
box_pressure=Var(
float32,
'air_pressure',
'box_pressure',
'Pressure inside the data logger enclosure',
'hPa'),
paro_air_temp_period=Var(
float32,
'',
'paro_air_temp_period',
'',
'1'),
paro_pressure_period=Var(
float32,
'',
'paro_pressure_period',
'',
'1'),
paro_air_temp=Var(
float32,
'air_temperature',
'paro_air_temp',
'',
'degC'),
pressure=Var(
float32,
'air_pressure',
'pressure',
'Air pressure as measured from the PAROSCI pressure sensor',
'hPa'),
paro_cal_sig=Var(
float32,
'',
'paro_cal_sig',
'',
''),
box_rh=Var(
float32,
'relative_humidity',
'box_rh',
'Relative humidity inside the data logger enclosure',
'%'),
box_air_temp=Var(
float32,
'air_temperature',
'box_air_temp',
'Air temperature inside the data logger enclosure',
'degC'),
air_temp_2=Var(
float32,
'air_temperature',
'air_temp_2',
'Auxillary air temperature',
'degC'),
air_temp_3=Var(
float32,
'air_temperature',
'air_temp_3',
'Auxillary air temperature',
'degC'),
air_temp_4=Var(
float32,
'air_temperature',
'air_temp_4',
'Auxillary air temperature',
'degC'),
air_temp_5=Var(
float32,
'air_temperature',
'air_temp_5',
'Auxillary air temperature',
'degC'),
wind_speed=Var(
float32,
'wind_speed',
'wind_speed',
'Wind speed',
'm*s^-1'),
wind_dir=Var(
float32,
'wind_direction',
'wind_dir',
'Wind direction',
'degrees'),
rh_shield_freq=Var(
float32,
'',
'rh_shield_freq',
'',
'hz'),
rh=Var(
float32,
'relative_humidity',
'rh',
'Relative humidity',
'%'),
air_temp_6_3m=Var(
float32,
'air_temperature',
'air_temp_6_3m',
'Air temperature 6.3m from tower base',
'degC'),
dewpiont=Var(
float32,
'dewpoint_temperature',
'dewpoint',
'Calculated dewpoint temperature',
'degC'),
rtd_shield_freq=Var(
float32,
'',
'rtd_shied_freq',
'',
''),
air_temp=Var(
float32,
'air_temperature',
'air_temp',
'Air temperature',
'degC'),
solar_flux=Var(
float32,
'solar_flux',
'solar_flux',
'Solar flux',
'w*m^-2'),
precip=Var(
float32,
'',
'precip',
'Precipitation',
'mm'),
accum_precip=Var(
float32,
'accumulated_precipitation',
'accum_precip',
'Precipitation accumulated since 0Z',
'mm'),
altimeter=Var(
float32,
'',
'altimeter',
'',
'inHg')
)
met_vars = {'air_temp', 'rh', 'solar_flux', 'pressure', 'precip', 'accum_precip',
'wind_speed', 'wind_dir'}
engr_vars = set(database.keys()) - met_vars
\ No newline at end of file
# Time between data samples in seconds
DATA_INTERVAL = 5
# station elevation in meters above the surface in feet
ELEVATION = 325
# Id of station from v1 records
ID = 1
import unittest
from datetime import datetime
class ParserV0Tests(unittest.TestCase):
line = ("TIME 0 ACCURAIN 0.000000 TEMP107_4 8.139600 "
"LI200X 0.066020 TEMP107_1 9.307800 RH41372 92.064000 "
"TEMP107_5 -99999.000000 CS105 970.100000 PAROSCI 971.428000 "
"WSPD05305 8.663000 TEMP107_3 8.368400 CS10162 65.653000 "
"RAIN380M 0.000000 TEMP107_2 8.287800 TEMP41372 8.202300 "
"WDIR05305 143.380000\n")
def _cut(self):
from aosstower.l00.parser import ParserV0
return ParserV0()
def test_maybe_mine(self):
parser = self._cut()
self.assertTrue(parser.maybe_mine(self.line))
bad_line = 'xx' + self.line
self.assertFalse(parser.maybe_mine(bad_line))
def test_record_format(self):
parser = self._cut()
record = parser.parse(self.line)
self.assertIn('stamp', record)
self.assertEqual(record['stamp'], datetime(1970, 1, 1))
class ParserV1Tests(unittest.TestCase):
line = ("1,1970,1,0000,0,976.59,5.8564,30.085,25.893,977.36,58732,"
"47.375,24.234,23.865,22.615,37.219,6.9222,67.398,145.2,45.581,"
"22.669,10.417,145.2,22.665,163.94,0,0,30.015,29.89\n")
def _cut(self):
from aosstower.l00.parser import ParserV1
return ParserV1()
def test_maybe_mine(self):
parser = self._cut()
self.assertTrue(parser.maybe_mine(self.line))
bad_line = 'xx,' + self.line.strip()
self.assertFalse(parser.maybe_mine(bad_line))
def test_record_format(self):
parser = self._cut()
record = parser.parse(self.line)
self.assertIn('stamp', record)
self.assertEqual(record['stamp'], datetime(1970, 1, 1))
#!/usr/bin/env python
import os
import sys
import glob
import logging
from datetime import datetime
from metobs.data import wind_vector_components
from metobscommon.model import RrdModel
from metobscommon.model import RrdModel, ModelError
from aosstower.record import Record, LineParseError
from aosstower.model import initialize
from aosstower.model import initialize_rrd, VARS
LOG = logging
......@@ -22,7 +20,7 @@ if __name__ == '__main__':
argdt = lambda v: datetime.strptime(v, '%Y-%m-%d')
parser.add_argument('-s', '--db-start', type=argdt, default=datetime.now(),
help='Reference start date for database (YYYY-MM-DD)')
parser.add_argument('-d', '--db-days', type=int, default=365,
parser.add_argument('-d', '--db-days', type=int, default=366,
help='Size of DB in days')
parser.add_argument('-i', dest='files', type=argparse.FileType('r'),
help="File containing list of time sorted input data files")
......@@ -31,10 +29,12 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
assert not os.path.exists(args.outdb)
initialize(args.outdb, args.db_start, days=args.db_days)
if os.path.exists(args.outdb):
os.remove(args.outdb)
LOG.info("initializing database at start=%s for %d days",
args.db_start, args.db_days)
initialize_rrd(args.outdb, args.db_start, days=args.db_days)
rrd = RrdModel(args.outdb)
LOG.info("initilized %s", args.outdb)
if args.files is None:
LOG.info("files list not provided, reading from stdin")
......@@ -59,6 +59,6 @@ if __name__ == '__main__':
try:
rrd.add_record(record['stamp'], record)
except m.ModelError:
except ModelError:
LOG.exception("Could not insert: %s", record)
#!/usr/bin/env python
import os
import glob
import logging
from datetime import datetime
from metobs.data import wind_vector_components
from aosstower.record import RecordV1, LineParseError
from aosstower.model import RrdModel, ModelError
LOG = logging
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('db')
parser.add_argument('path')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
rrd = RrdModel(args.db)
if not os.path.exists(args.db):
LOG.info("initilizing %s", rrd)
rrd.initialize(datetime(2013, 1, 1))
for filepath in glob.glob(args.path):
LOG.info("adding %s", filepath)
for line in open(filepath).readlines():
if not line.strip():
continue
try:
record = RecordV1(line)
except LineParseError as err:
LOG.error(str(err))
continue
windspd = float(record['wind_speed'])
winddir = float(record['wind_dir'])
u_e, u_n, spd = wind_vector_components(windspd, winddir)
record['winddir_east'] = u_e
record['winddir_north'] = u_n
record['wind_speed'] = spd
try:
rrd.add_record(record)
except ModelError:
LOG.exception("Error with record %s" % record)
#!/usr/bin/env bash
rrdtool create aoss_tower.rrd \
--start=1356890400 \
--step=5 \
DS:air_temp:GAUGE:10:-40:50 \
DS:rh:GAUGE:10:0:100 \
DS:dewpoint:GAUGE:10:0:100 \
DS:wind_speed:GAUGE:10:0:100 \
DS:winddir_north:GAUGE:10:-100:100 \
DS:winddir_east:GAUGE:10:-100:100 \
DS:pressure:GAUGE:10:0:1100 \
DS:precip:GAUGE:10:0:100 \
DS:accum_precip:GAUGE:10:0:100 \
DS:solar_flux:GAUGE:10:0:1000 \
DS:altimeter:GAUGE:10:0:100 \
RRA:AVERAGE:0.5:1:6307200 \
RRA:AVERAGE:0.5:12:525600 \
RRA:AVERAGE:0.5:60:105120 \
RRA:AVERAGE:0.5:360:17520
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment