Select Git revision
David Hoese authored
parser.py 9.20 KiB
"""Code to parse level 00 data.
Data Versions
=============
There have so far been 3 changes to the format of the raw ASCII data over the
years.
Version 0
---------
Effective from incept to 2010-06-01T00:27:51Z.
The original data format was a key value, space separated data format
``<key> <value>``. There were a total of 16 data values including `TIME`:
`TIME, ACCURAIN, TEMP107_4, LI200X, TEMP107_1, RH41372, TEMP107_5, CS105,
PAROSCI, WSPD05305, TEMP107_3, CS10162, RAIN380M, TEMP107_2, TEMP41372,
WDIR05305`.
XXX: Fill value in version 0 seems to be -99999.
Version 1
---------
Effective 2010-06-01T00:27:51Z to 2012-12-03T17:34:17Z.
A CSV format file with a total of 28 values: station_id, year, doy, hhmm, sec,
box_pressure, paro_air_temp_period, paro_pressure_period, paro_air_temp,
pressure, paro_cal_sig, box_rh, box_air_temp, air_temp_2, air_temp_3,
air_temp_4, wind_speed, wind_dir, rh_shield_freq, rh, air_temp_6_3m, dewpoint,
rtd_shield_freq, air_temp, solar_flux, precip, accum_precip, altimeter.
XXX: Fill value in version 1 seems to be -99999.
Version 2
---------
Effective 2012-12-03T17:34:17Z to present.
Same as Version 1 with the addition of altimeter2 at the end. I'm not sure why
we have 2 altimeter values but as far as I know altimeter2 is not used.
XXX: Fill value in version 2 seems to be -99999.
"""
import re
import io
import time
import logging
from datetime import datetime, timedelta
from metobscommon.util.mytime import hhmm_to_offset
from aosstower.schema import database
LOG = logging.getLogger(__name__)
class LineParseError(Exception):
"""Error parsing line of frame data.
"""
@classmethod
def raise_wrapped(cls, exception, msg=None):
import sys
traceback = sys.exc_info()[2]
msg = msg or str(exception)
exc = cls(msg)
exc.__traceback__ = traceback
raise exc
def _make_frame(data, new_symbols=None, rename_timestamp=False):
"""Construct a frame from a list of tuples.
"""
frame = {}
for idx, (key, value) in enumerate(data):
if key in ['stamp', 'timestamp']:
frame['stamp' if rename_timestamp else key] = value
continue
if key in database:
try:
new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key
frame[new_key] = database[key].type(value)
except (ValueError, TypeError):
raise LineParseError("error converting '%s' using %s",
value, database[key].type)
return frame
class ParserV0(object):
"""Parses Version 0 data lines.
"""
fill_value = -99999.
# maps v0 names to names in schema db
names = {'ACCURAIN': 'accum_precip',
'TEMP107_1': 'box_air_temp',
'TEMP107_2': 'air_temp_2',
'TEMP107_3': 'air_temp_3',
'TEMP107_4': 'air_temp_4',
'TEMP107_5': 'air_temp_5',
'LI200X': 'solar_flux',
'RH41372': 'rh',
'TEMP41372': 'air_temp',
'CS105': 'box_pressure',
'PAROSCI': 'pressure',
'WSPD05305': 'wind_speed',
'WDIR05305': 'wind_dir',
'CS10162': 'box_rh',
'RAIN380M': 'precip'}
@staticmethod
def maybe_mine(line):
return line.startswith('TIME')
def make_frame(self, line):
parts = line.split()
if len(parts) != 32:
raise LineParseError("Expected 32 components", line)
raw_data = [('version', 0)]
for k1, v1 in zip(parts[0::2], parts[1::2]):
if k1 == 'TIME':
continue
if k1 in self.names:
raw_data.append((self.names[k1], v1))
else:
raise LineParseError("Unexpected var: %s" % k1, line)
try:
time_str = parts[1]
unix_time = int(time_str)
raw_data.append(('stamp', datetime.utcfromtimestamp(unix_time)))
except (ValueError, TypeError):
raise LineParseError("Could not parse stamp", line)
return _make_frame(raw_data)
class ParserV1V2(object):
"""Parses Version 1 & 2 data lines.
"""
fill_value = -99999.
names = ['station_id', 'year', 'doy', 'hhmm', 'sec', 'box_pressure',
'paro_air_temp_period', 'paro_pressure_period', 'paro_air_temp',
'pressure', 'paro_cal_sig', 'box_rh', 'box_air_temp',
'air_temp_2', 'air_temp_3', 'air_temp_4', 'wind_speed', 'wind_dir',
'rh_shield_freq', 'rh', 'air_temp_6_3m', 'dewpoint',
'rtd_shield_freq', 'air_temp', 'solar_flux', 'precip',
'accum_precip', 'altimeter'] # , 'altimeter2']
# These are the new fields in the input files but unused by the rest of
# the tower code. At the risk of breaking other pieces of software, these
# are not included in the above list, but are documented here for future
# reference.
#
# Altimeter2 (slightly different calculation, same units as Altimeter)
# LW_in (W/m^2)
# tempPyrg (Kelvin, temperature of pyrgeometer)
# pyrgTP (W/m^2, raw reading from the pyrgeometer thermopile)
# pyrgTC (W/m^2, temperature correction for the pyrgeometer)
# TC_110 (degC, this is a second air temperature in the new aspirated radiation shield)
@staticmethod
def maybe_mine(line):
return re.search('^\d,\d{4},\d{1,3}', line) is not None
@staticmethod
def _get_stamp(parts):
year = int(parts[1])
doy = int(parts[2])
dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
secs = hhmm_to_offset(parts[3])
secs += float(parts[4])
secs -= (secs % 5)
dt += timedelta(seconds=secs)
return dt
def make_frame(self, line):
parts = line.split(',')
if len(parts) not in [28, 29, 33, 34]:
raise LineParseError("Expected 28, 29, 33, or 34 parts", line)
version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)]
raw_data = [('version', version)] + list(zip(self.names, parts))
try:
raw_data.append(('stamp', self._get_stamp(parts)))
except (TypeError, ValueError):
raise LineParseError("Could not parse timesamp", line)
return _make_frame(raw_data)
def read_frames(source, error_handler=lambda *a: None, tail=False):
"""Returns a generator for reading frames from `source`. Frames are
checked line-by-line so frame line versions may be mixed.
:param tail: starting from the end of the source (if 'seek' method) read lines forever
"""
if hasattr(source, 'readlines'):
fptr = source
else:
fptr = open(source)
if tail and hasattr(fptr, "seek"):
LOG.debug("Seeking to end of frame source")
fptr.seek(0, io.SEEK_END)
def gen():
idx = 0
while True:
line = fptr.readline()
if not line.strip():
time.sleep(0.1)
continue
yield idx, line
idx += 1
else:
def gen():
for idx, line in enumerate(fptr):
if not line.strip():
continue
yield idx, line
for idx, line in gen():
if line.startswith('#'):
continue
for parser in [ParserV1V2(), ParserV0()]:
if parser.maybe_mine(line):
try:
yield parser.make_frame(line)
except LineParseError as err:
error_handler(idx + 1, line, err)
break
# yes, I know a for/else is obscure, but in this case it does
# exactly what I need, it only executes if `break` does not execute
else:
error_handler(idx + 1, line, RuntimeError("no parser found", line))
def loggernet_to_tower(rec_dict, symbol_names):
"""Convert loggernet record dictionary to our standard naming"""
# assume that the next record after the traditional frame is the timestamp
old_symbols = ['timestamp'] + ParserV1V2.names
new_symbols = ['timestamp'] + symbol_names
return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True)
class LDMPGenerator(object):
"""Class to manage receiving records from Loggernet LDMP server."""
def __init__(self, station_name, tables, symbol_names=ParserV1V2.names,
host='localhost', port=1024):
from metobscommon.archive.loggernet_receiver import LDMPReceiver
self.station_name = station_name
self.tables = tables
self.symbol_names = symbol_names
self.receiver = LDMPReceiver(host, port)
def __iter__(self):
from metobscommon.archive.loggernet_receiver import dict_records
self.receiver.start()
# This should be generated OrderedDicts
dict_rec_gen = dict_records(self.receiver, self.station_name,
self.tables)
return (loggernet_to_tower(x, self.symbol_names) for x in dict_rec_gen)
def close(self):
self.receiver.close()
def __del__(self):
"""Last effort to kill the background thread if not done already."""
try:
self.close()
except (ValueError, RuntimeError, IOError, OSError):
pass