Skip to content
Snippets Groups Projects
parser.py 9.64 KiB
Newer Older
"""Code to parse level 00 data.

Data Versions
=============
There have so far been 3 changes to the format of the raw ASCII data over the
years.

Version 0
---------
Effective from incept to 2010-06-01T00:27:51Z.

The original data format was a key value, space separated data format
``<key> <value>``. There were a total of 16 data values including `TIME`:
`TIME, ACCURAIN, TEMP107_4, LI200X, TEMP107_1, RH41372, TEMP107_5, CS105,
PAROSCI, WSPD05305, TEMP107_3, CS10162, RAIN380M, TEMP107_2, TEMP41372,
WDIR05305`.

XXX: Fill value in version 0 seems to be -99999.

Version 1
---------
Effective 2010-06-01T00:27:51Z to 2012-12-03T17:34:17Z.

A CSV format file with a total of 28 values: station_id, year, doy, hhmm, sec,
box_pressure, paro_air_temp_period, paro_pressure_period, paro_air_temp,
pressure, paro_cal_sig, box_rh, box_air_temp, air_temp_2, air_temp_3,
air_temp_4, wind_speed, wind_dir, rh_shield_freq, rh, air_temp_6_3m, dewpoint,
rtd_shield_freq, air_temp, solar_flux, precip, accum_precip, altimeter.

XXX: Fill value in version 1 seems to be -99999.

Version 2
---------
Effective 2012-12-03T17:34:17Z to present.

Same as Version 1 with the addition of altimeter2 at the end. I'm not sure why
we have 2 altimeter values but as far as I know altimeter2 is not used.

XXX: Fill value in version 2 seems to be -99999.
"""
David Hoese's avatar
David Hoese committed
import io
Bruce Flynn's avatar
Bruce Flynn committed
import logging
Bruce Flynn's avatar
Bruce Flynn committed
from datetime import datetime, timedelta

from metobscommon.util.mytime import hhmm_to_offset
Bruce Flynn's avatar
Bruce Flynn committed
from aosstower.schema import database

LOG = logging.getLogger(__name__)


class LineParseError(Exception):
    """Error parsing line of frame data."""

Bruce Flynn's avatar
Bruce Flynn committed
    @classmethod
    def raise_wrapped(cls, exception, msg=None):
        import sys
Bruce Flynn's avatar
Bruce Flynn committed
        traceback = sys.exc_info()[2]
        msg = msg or str(exception)
David Hoese's avatar
David Hoese committed
        exc = cls(msg)
        exc.__traceback__ = traceback
        raise exc
def _make_frame(data, new_symbols=None, rename_timestamp=False):
    """Construct a frame from a list of tuples."""
Bruce Flynn's avatar
Bruce Flynn committed
    frame = {}
    for idx, (key, value) in enumerate(data):
        if key in ["stamp", "timestamp"]:
            frame["stamp" if rename_timestamp else key] = value
Bruce Flynn's avatar
Bruce Flynn committed
            continue
        if key in database:
                new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key
                frame[new_key] = database[key].type(value)
            except (ValueError, TypeError) as e:
                msg = f"error converting '{value}' using {database[key].type}"
                raise LineParseError(msg) from e
class ParserV0:
    """Parses Version 0 data lines."""

    fill_value = -99999.0
Bruce Flynn's avatar
Bruce Flynn committed

    # maps v0 names to names in schema db
    names = {
        "ACCURAIN": "accum_precip",
        "TEMP107_1": "box_air_temp",
        "TEMP107_2": "air_temp_2",
        "TEMP107_3": "air_temp_3",
        "TEMP107_4": "air_temp_4",
        "TEMP107_5": "air_temp_5",
        "LI200X": "solar_flux",
        "RH41372": "rh",
        "TEMP41372": "air_temp",
        "CS105": "box_pressure",
        "PAROSCI": "pressure",
        "WSPD05305": "wind_speed",
        "WDIR05305": "wind_dir",
        "CS10162": "box_rh",
        "RAIN380M": "precip",
    }
Bruce Flynn's avatar
Bruce Flynn committed

    @staticmethod
    def maybe_mine(line):
        return line.startswith("TIME")
Bruce Flynn's avatar
Bruce Flynn committed

Bruce Flynn's avatar
Bruce Flynn committed
        parts = line.split()
        if len(parts) != self.num_elements:
            msg = f"Expected {self.num_elements} components"
            raise LineParseError(msg, line)
        raw_data = [("version", 0)]
        for k1, v1 in zip(parts[0::2], parts[1::2], strict=False):
Bruce Flynn's avatar
Bruce Flynn committed
                continue
            if k1 in self.names:
                raw_data.append((self.names[k1], v1))
Bruce Flynn's avatar
Bruce Flynn committed
            else:
                raise LineParseError("Unexpected var: %s" % k1, line)
        try:
            time_str = parts[1]
            unix_time = int(time_str)
            raw_data.append(("stamp", datetime.utcfromtimestamp(unix_time)))
        except (ValueError, TypeError) as e:
            msg = "Could not parse stamp"
            raise LineParseError(msg, line) from e
class ParserV1V2:
    """Parses Version 1 & 2 data lines."""

    fill_value = -99999.0

    names = [
        "station_id",
        "year",
        "doy",
        "hhmm",
        "sec",
        "box_pressure",
        "paro_air_temp_period",
        "paro_pressure_period",
        "paro_air_temp",
        "pressure",
        "paro_cal_sig",
        "box_rh",
        "box_air_temp",
        "air_temp_2",
        "air_temp_3",
        "air_temp_4",
        "wind_speed",
        "wind_dir",
        "rh_shield_freq",
        "rh",
        "air_temp_6_3m",
        "dewpoint",
        "rtd_shield_freq",
        "air_temp",
        "solar_flux",
        "precip",
        "accum_precip",
        "altimeter",
    ]  # , 'altimeter2']
    # These are the new fields in the input files but unused by the rest of
    # the tower code. At the risk of breaking other pieces of software, these
    # are not included in the above list, but are documented here for future
    # reference.
    #
    # Altimeter2 (slightly different calculation, same units as Altimeter)
    # tempPyrg (Kelvin, temperature of pyrgeometer)
    # pyrgTP (W/m^2, raw reading from the pyrgeometer thermopile)
    # pyrgTC (W/m^2, temperature correction for the pyrgeometer)
    # TC_110 (degC, this is a second air temperature in the new aspirated radiation shield)
Bruce Flynn's avatar
Bruce Flynn committed

    @staticmethod
    def maybe_mine(line):
        return re.search("^\\d,\\d{4},\\d{1,3}", line) is not None
Bruce Flynn's avatar
Bruce Flynn committed

    @staticmethod
    def _get_stamp(parts):
        year = int(parts[1])
        doy = int(parts[2])
        dt = datetime.strptime(f"{int(year):d}.{int(doy):03d}", "%Y.%j")
        secs = hhmm_to_offset(parts[3])
Bruce Flynn's avatar
Bruce Flynn committed
        dt += timedelta(seconds=secs)
        return dt

        if len(parts) not in [28, 29, 33, 34]:
            msg = "Expected 28, 29, 33, or 34 parts"
            raise LineParseError(msg, line)
        version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)]
        raw_data = [("version", version), *list(zip(self.names, parts, strict=False))]
Bruce Flynn's avatar
Bruce Flynn committed
        try:
            raw_data.append(("stamp", self._get_stamp(parts)))
        except (TypeError, ValueError) as e:
            msg = "Could not parse timesamp"
            raise LineParseError(msg, line) from e
def read_frames(source, error_handler=None, tail=False):
    """Generate frames from `source`.

    Frames are checked line-by-line so frame line versions may be mixed.
David Hoese's avatar
David Hoese committed

    :param tail: starting from the end of the source (if 'seek' method) read lines forever
Bruce Flynn's avatar
Bruce Flynn committed
    """
    if error_handler is None:
        error_handler = _default_error_handler
    fptr = source if hasattr(source, "readlines") else open(source)  # noqa: SIM115, PTH123
David Hoese's avatar
David Hoese committed
    if tail and hasattr(fptr, "seek"):
        LOG.debug("Seeking to end of frame source")
        fptr.seek(0, io.SEEK_END)
David Hoese's avatar
David Hoese committed
        def gen():
            idx = 0
            while True:
                line = fptr.readline()
                if not line.strip():
                    time.sleep(0.1)
                    continue
                yield idx, line
David Hoese's avatar
David Hoese committed
                idx += 1
David Hoese's avatar
David Hoese committed
    else:
David Hoese's avatar
David Hoese committed
        def gen():
            for idx, line in enumerate(fptr):
                if not line.strip():
                    continue
                yield idx, line

    yield from _parse_lines(gen(), error_handler)


def _parse_lines(line_generator, error_handler):
    for idx, line in line_generator:
        if line.startswith("#"):
            continue
        for parser in [ParserV1V2(), ParserV0()]:
Bruce Flynn's avatar
Bruce Flynn committed
            if parser.maybe_mine(line):
                try:
Bruce Flynn's avatar
Bruce Flynn committed
                    yield parser.make_frame(line)
Bruce Flynn's avatar
Bruce Flynn committed
                except LineParseError as err:
                    error_handler(idx + 1, line, err)
                break

        # yes, I know a for/else is obscure, but in this case it does
        # exactly what I need, it only executes if `break` does not execute
Bruce Flynn's avatar
Bruce Flynn committed
        else:
            error_handler(idx + 1, line, RuntimeError("no parser found", line))
def _default_error_handler(*_):
    return None


def loggernet_to_tower(rec_dict, symbol_names):
    """Convert loggernet record dictionary to our standard naming."""
    # assume that the next record after the traditional frame is the timestamp
    old_symbols = ["timestamp", *ParserV1V2.names]
    new_symbols = ["timestamp", *symbol_names]
    return _make_frame(zip(old_symbols, rec_dict.values(), strict=False), new_symbols, rename_timestamp=True)
    """Class to manage receiving records from Loggernet LDMP server."""

    def __init__(  # noqa: PLR0913
        self,
        station_name,
        tables,
        symbol_names=ParserV1V2.names,
        host="localhost",
        port=1024,
    ):
        from metobscommon.archive.loggernet_receiver import LDMPReceiver
        self.station_name = station_name
        self.tables = tables
        self.symbol_names = symbol_names
        self.receiver = LDMPReceiver(host, port)

    def __iter__(self):
        from metobscommon.archive.loggernet_receiver import dict_records
        self.receiver.start()
        # This should be generated OrderedDicts
        dict_rec_gen = dict_records(self.receiver, self.station_name, self.tables)
        return (loggernet_to_tower(x, self.symbol_names) for x in dict_rec_gen)

    def close(self):
        self.receiver.close()

    def __del__(self):
        """Last effort to kill the background thread if not done already."""
        with contextlib.suppress(ValueError, RuntimeError, OSError):
            self.close()