Newer
Older
"""Code to parse level 00 data.
Data Versions
=============
There have so far been 3 changes to the format of the raw ASCII data over the
years.
Version 0
---------
Effective from incept to 2010-06-01T00:27:51Z.
The original data format was a key value, space separated data format
``<key> <value>``. There were a total of 16 data values including `TIME`:
`TIME, ACCURAIN, TEMP107_4, LI200X, TEMP107_1, RH41372, TEMP107_5, CS105,
PAROSCI, WSPD05305, TEMP107_3, CS10162, RAIN380M, TEMP107_2, TEMP41372,
WDIR05305`.
XXX: Fill value in version 0 seems to be -99999.
Version 1
---------
Effective 2010-06-01T00:27:51Z to 2012-12-03T17:34:17Z.
A CSV format file with a total of 28 values: station_id, year, doy, hhmm, sec,
box_pressure, paro_air_temp_period, paro_pressure_period, paro_air_temp,
pressure, paro_cal_sig, box_rh, box_air_temp, air_temp_2, air_temp_3,
air_temp_4, wind_speed, wind_dir, rh_shield_freq, rh, air_temp_6_3m, dewpoint,
rtd_shield_freq, air_temp, solar_flux, precip, accum_precip, altimeter.
XXX: Fill value in version 1 seems to be -99999.
Version 2
---------
Effective 2012-12-03T17:34:17Z to present.
Same as Version 1 with the addition of altimeter2 at the end. I'm not sure why
we have 2 altimeter values but as far as I know altimeter2 is not used.
XXX: Fill value in version 2 seems to be -99999.
import contextlib
import re
import time
from metobscommon.util.mytime import hhmm_to_offset
from aosstower.schema import database
LOG = logging.getLogger(__name__)
class LineParseError(Exception):
"""Error parsing line of frame data."""
@classmethod
def raise_wrapped(cls, exception, msg=None):
import sys
traceback = sys.exc_info()[2]
msg = msg or str(exception)
exc = cls(msg)
exc.__traceback__ = traceback
raise exc
def _make_frame(data, new_symbols=None, rename_timestamp=False):
"""Construct a frame from a list of tuples."""
for idx, (key, value) in enumerate(data):
if key in ["stamp", "timestamp"]:
frame["stamp" if rename_timestamp else key] = value
new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key
frame[new_key] = database[key].type(value)
except (ValueError, TypeError) as e:
msg = f"error converting '{value}' using {database[key].type}"
raise LineParseError(msg) from e
class ParserV0:
"""Parses Version 0 data lines."""
fill_value = -99999.0
num_elements = 32
names = {
"ACCURAIN": "accum_precip",
"TEMP107_1": "box_air_temp",
"TEMP107_2": "air_temp_2",
"TEMP107_3": "air_temp_3",
"TEMP107_4": "air_temp_4",
"TEMP107_5": "air_temp_5",
"LI200X": "solar_flux",
"RH41372": "rh",
"TEMP41372": "air_temp",
"CS105": "box_pressure",
"PAROSCI": "pressure",
"WSPD05305": "wind_speed",
"WDIR05305": "wind_dir",
"CS10162": "box_rh",
"RAIN380M": "precip",
}
return line.startswith("TIME")
def make_frame(self, line):
if len(parts) != self.num_elements:
msg = f"Expected {self.num_elements} components"
raise LineParseError(msg, line)
raw_data = [("version", 0)]
for k1, v1 in zip(parts[0::2], parts[1::2], strict=False):
raw_data.append((self.names[k1], v1))
else:
raise LineParseError("Unexpected var: %s" % k1, line)
try:
time_str = parts[1]
unix_time = int(time_str)
raw_data.append(("stamp", datetime.utcfromtimestamp(unix_time)))
except (ValueError, TypeError) as e:
msg = "Could not parse stamp"
raise LineParseError(msg, line) from e
return _make_frame(raw_data)
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class ParserV1V2:
"""Parses Version 1 & 2 data lines."""
fill_value = -99999.0
names = [
"station_id",
"year",
"doy",
"hhmm",
"sec",
"box_pressure",
"paro_air_temp_period",
"paro_pressure_period",
"paro_air_temp",
"pressure",
"paro_cal_sig",
"box_rh",
"box_air_temp",
"air_temp_2",
"air_temp_3",
"air_temp_4",
"wind_speed",
"wind_dir",
"rh_shield_freq",
"rh",
"air_temp_6_3m",
"dewpoint",
"rtd_shield_freq",
"air_temp",
"solar_flux",
"precip",
"accum_precip",
"altimeter",
] # , 'altimeter2']
# These are the new fields in the input files but unused by the rest of
# the tower code. At the risk of breaking other pieces of software, these
# are not included in the above list, but are documented here for future
# reference.
#
# Altimeter2 (slightly different calculation, same units as Altimeter)
# tempPyrg (Kelvin, temperature of pyrgeometer)
# pyrgTP (W/m^2, raw reading from the pyrgeometer thermopile)
# pyrgTC (W/m^2, temperature correction for the pyrgeometer)
# TC_110 (degC, this is a second air temperature in the new aspirated radiation shield)
return re.search("^\\d,\\d{4},\\d{1,3}", line) is not None
@staticmethod
def _get_stamp(parts):
year = int(parts[1])
doy = int(parts[2])
dt = datetime.strptime(f"{int(year):d}.{int(doy):03d}", "%Y.%j")
secs = hhmm_to_offset(parts[3])
secs += float(parts[4])
def make_frame(self, line):
parts = line.split(",")
if len(parts) not in [28, 29, 33, 34]:
msg = "Expected 28, 29, 33, or 34 parts"
raise LineParseError(msg, line)
version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)]
raw_data = [("version", version), *list(zip(self.names, parts, strict=False))]
raw_data.append(("stamp", self._get_stamp(parts)))
except (TypeError, ValueError) as e:
msg = "Could not parse timesamp"
raise LineParseError(msg, line) from e
return _make_frame(raw_data)
def read_frames(source, error_handler=None, tail=False):
"""Generate frames from `source`.
Frames are checked line-by-line so frame line versions may be mixed.
:param tail: starting from the end of the source (if 'seek' method) read lines forever
if error_handler is None:
error_handler = _default_error_handler
fptr = source if hasattr(source, "readlines") else open(source) # noqa: SIM115, PTH123
if tail and hasattr(fptr, "seek"):
LOG.debug("Seeking to end of frame source")
fptr.seek(0, io.SEEK_END)
def gen():
idx = 0
while True:
line = fptr.readline()
if not line.strip():
time.sleep(0.1)
def gen():
for idx, line in enumerate(fptr):
if not line.strip():
continue
yield idx, line
yield from _parse_lines(gen(), error_handler)
def _parse_lines(line_generator, error_handler):
for idx, line in line_generator:
if line.startswith("#"):
continue
for parser in [ParserV1V2(), ParserV0()]:
except LineParseError as err:
error_handler(idx + 1, line, err)
break
# yes, I know a for/else is obscure, but in this case it does
# exactly what I need, it only executes if `break` does not execute
else:
error_handler(idx + 1, line, RuntimeError("no parser found", line))
def _default_error_handler(*_):
return None
def loggernet_to_tower(rec_dict, symbol_names):
"""Convert loggernet record dictionary to our standard naming."""
# assume that the next record after the traditional frame is the timestamp
old_symbols = ["timestamp", *ParserV1V2.names]
new_symbols = ["timestamp", *symbol_names]
return _make_frame(zip(old_symbols, rec_dict.values(), strict=False), new_symbols, rename_timestamp=True)
class LDMPGenerator:
"""Class to manage receiving records from Loggernet LDMP server."""
def __init__( # noqa: PLR0913
self,
station_name,
tables,
symbol_names=ParserV1V2.names,
host="localhost",
port=1024,
):
from metobscommon.archive.loggernet_receiver import LDMPReceiver
self.station_name = station_name
self.tables = tables
self.symbol_names = symbol_names
self.receiver = LDMPReceiver(host, port)
def __iter__(self):
from metobscommon.archive.loggernet_receiver import dict_records
self.receiver.start()
# This should be generated OrderedDicts
dict_rec_gen = dict_records(self.receiver, self.station_name, self.tables)
return (loggernet_to_tower(x, self.symbol_names) for x in dict_rec_gen)
def close(self):
self.receiver.close()
def __del__(self):
"""Last effort to kill the background thread if not done already."""
with contextlib.suppress(ValueError, RuntimeError, OSError):