Skip to content
Snippets Groups Projects
Commit d3ace430 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Fix bug in tests for V1V2 parser. Add frame module. Change terms from

records to frames.
parent c8f18830
No related branches found
No related tags found
No related merge requests found
from aosstower import station
class Frame(dict):
def __init__(self, width=station.DATA_INTERVAL):
self.width = width
def __getattr__(self, name, default=None):
return self.get(name, default)
......@@ -37,15 +37,14 @@ import logging
from datetime import datetime, timedelta
from metobs import data as d
from aosstower import station
from aosstower.schema import database
from aosstower.frame import Frame
LOG = logging.getLogger(__name__)
class LineParseError(Exception):
"""Error parsing line of record data.
"""Error parsing line of frame data.
"""
@classmethod
def raise_wrapped(cls, exception, msg=None):
......@@ -54,32 +53,25 @@ class LineParseError(Exception):
msg = msg or str(exception)
raise cls(msg), None, traceback
def add_winds(self):
east, north, spd = d.wind_vector_components(float(self['wind_speed']),
float(self['wind_dir']))
self['winddir_east'] = '%.3d' % east
self['winddir_north'] = '%.3d' % north
self['wind_speed'] = '%.3d' % spd
def add_altimeter(self, elev=station.ELEVATION):
self['altimeter'] = '%.3d' % d.altimeter(float(self['pressure']), elev)
def add_dewpoint(self):
self['dewpoint'] = '%.3d' % d.dewpoint(float(self['air_temp']),
float(self['rh']))
def _make_record(data):
for key in data:
def _make_frame(data):
"""Construct a frame from a list of tuples.
"""
frame = Frame()
for key, value in data:
if key == 'stamp':
continue
if key in database:
data[key] = database[key].type(data[key])
return data
frame[key] = value
elif key in database:
frame[key] = database[key].type(value)
else:
frame[key] = value
return frame
class ParserV0(object):
"""Parses Version 0 data lines.
"""
# maps v0 names to names in schema db
names = {'ACCURAIN': 'accum_precip',
......@@ -102,29 +94,30 @@ class ParserV0(object):
def maybe_mine(line):
return line.startswith('TIME')
def parse(self, line):
def make_frame(self, line):
parts = line.split()
if len(parts) != 32:
raise LineParseError("Expected 32 components", line)
raw_data = {}
for k1, v1 in {k: v for k, v in zip(parts[0::2], parts[1::2])}.items():
raw_data = [('version', 0)]
for k1, v1 in zip(parts[0::2], parts[1::2]):
if k1 == 'TIME':
continue
if k1 in self.names:
raw_data[self.names[k1]] = v1
raw_data.append((self.names[k1], v1))
else:
raise LineParseError("Unexpected var: %s" % k1, line)
raw_data['version'] = 0
try:
time_str = parts[1]
unix_time = int(time_str)
raw_data['stamp'] = datetime.utcfromtimestamp(unix_time)
raw_data.append(('stamp', datetime.utcfromtimestamp(unix_time)))
except (ValueError, TypeError):
raise LineParseError("Could not parse stamp", line)
return _make_record(raw_data)
return _make_frame(raw_data)
class ParserV1V2(object):
"""Parses Version 1 & 2 data lines.
"""
names = ['station_id', 'year', 'doy', 'hhmm', 'sec', 'box_pressure',
'paro_air_temp_period', 'paro_pressure_period', 'paro_air_temp',
......@@ -138,32 +131,32 @@ class ParserV1V2(object):
def maybe_mine(line):
return re.search('^\d,\d{4},\d{1,3}', line) is not None
def _get_stamp(self, data):
year = int(data['year'])
doy = int(data['doy'])
def _get_stamp(self, parts):
year = int(parts[1])
doy = int(parts[2])
dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
secs = d.hhmm_to_offset(data['hhmm'])
secs += float(data['sec'])
secs = d.hhmm_to_offset(parts[3])
secs += float(parts[4])
secs -= (secs % 5)
dt += timedelta(seconds=secs)
return dt
def parse(self, line):
def make_frame(self, line):
parts = line.split(',')
if len(parts) < 28:
raise LineParseError("Expected >= 28 parts", line)
raw_data = {k: v for k, v in zip(self.names, parts)}
raw_data['version'] = 1 if len(parts) == 28 else 2
if len(parts) not in [28, 29]:
raise LineParseError("Expected 28 or 29 parts", line)
version = 1 if len(parts) == 28 else 2
raw_data = [('version', version)] + zip(self.names, parts)
try:
raw_data['stamp'] = self._get_stamp(raw_data)
raw_data.append(('stamp', self._get_stamp(parts)))
except (TypeError, ValueError):
raise LineParseError("Could not parse timesamp", line)
return _make_record(raw_data)
return _make_frame(raw_data)
def read_records(source, error_handler=lambda *a: None):
"""Returns a generator for reading records from `source`. Records are
checked line-by-line so record line versions may be mixed.
def read_frames(source, error_handler=lambda *a: None):
"""Returns a generator for reading frames from `source`. Frames are
checked line-by-line so frame line versions may be mixed.
"""
if hasattr(source, 'readlines'):
fptr = source
......
......@@ -6,7 +6,8 @@ from datetime import datetime, timedelta
import rrdtool
from metobs.data import to_unix_timestamp
from metobs import data as d
from aosstower import station
# minimum set of records for the tower
......@@ -16,13 +17,31 @@ VARS = {'air_temp', 'rh', 'dewpoint',
'solar_flux', 'altimeter'}
def add_vector_winds(record):
east, north, spd = d.wind_vector_components(float(record['wind_speed']),
float(record['wind_dir']))
record['winddir_east'] = '%.3d' % east
record['winddir_north'] = '%.3d' % north
record['wind_speed'] = '%.3d' % spd
def add_altimeter(record, elev=station.ELEVATION):
record['altimeter'] = '%.3d' % d.altimeter(float(record['pressure']), elev)
def add_dewpoint(record):
record['dewpoint'] = '%.3d' % d.dewpoint(float(record['air_temp']),
float(record['rh']))
def initialize_rrd(filepath, start=None, days=365, data_interval=5):
"""Create a new empty RRD database.
"""
assert not os.path.exists(filepath), "DB already exists"
start = start or (datetime.utcnow() - timedelta(days=days))
# normalize start to data interval
secs = to_unix_timestamp(start)
secs = d.to_unix_timestamp(start)
secs -= secs % data_interval
rrdtool.create(filepath,
......
from datetime import timedelta
# Time between data samples in seconds
DATA_INTERVAL = 5
# Time between data samples in seconds
DATA_INTERVAL = timedelta(seconds=5)
# station elevation in meters above the surface in feet
ELEVATION = 325
# Id of station from v1 records
ID = 1
ID = 1
......@@ -25,7 +25,7 @@ class ParserV0Tests(unittest.TestCase):
def test_record_format(self):
parser = self._cut()
record = parser.parse(self.line)
record = parser.make_frame(self.line)
self.assertIn('stamp', record)
self.assertEqual(record['stamp'], datetime(1970, 1, 1))
......@@ -35,7 +35,7 @@ class ParserV1V2Tests(unittest.TestCase):
line = ("1,1970,1,0000,0,976.59,5.8564,30.085,25.893,977.36,58732,"
"47.375,24.234,23.865,22.615,37.219,6.9222,67.398,145.2,45.581,"
"22.669,10.417,145.2,22.665,163.94,0,0,30.015,29.89\n")
"22.669,10.417,145.2,22.665,163.94,0,0,30.015\n")
def _cut(self):
from aosstower.l00.parser import ParserV1V2
......@@ -51,7 +51,7 @@ class ParserV1V2Tests(unittest.TestCase):
def test_record_format(self):
parser = self._cut()
record = parser.parse(self.line)
record = parser.make_frame(self.line)
self.assertIn('stamp', record)
self.assertEqual(record['stamp'], datetime(1970, 1, 1))
......@@ -59,5 +59,5 @@ class ParserV1V2Tests(unittest.TestCase):
def test_record_supports_v1_and_v2(self):
parser = self._cut()
parser.parse(self.line)
parser.parse(self.line.strip() + ',999\n')
parser.make_frame(self.line)
parser.make_frame(self.line.strip() + ',999\n')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment