Skip to content
Snippets Groups Projects
Select Git revision
  • 52097b2b9fb27bf0c39df905edd58ca72cff3c14
  • master default protected
  • stationplot
  • v0.2.0
4 results

parser.py

Blame
  • parser.py 9.20 KiB
    """Code to parse level 00 data.
    
    Data Versions
    =============
    There have so far been 3 changes to the format of the raw ASCII data over the
    years.
    
    Version 0
    ---------
    Effective from incept to 2010-06-01T00:27:51Z.
    
    The original data format was a key value, space separated data format
    ``<key> <value>``. There were a total of 16 data values including `TIME`:
    `TIME, ACCURAIN, TEMP107_4, LI200X, TEMP107_1, RH41372, TEMP107_5, CS105,
    PAROSCI, WSPD05305, TEMP107_3, CS10162, RAIN380M, TEMP107_2, TEMP41372,
    WDIR05305`.
    
    XXX: Fill value in version 0 seems to be -99999.
    
    Version 1
    ---------
    Effective 2010-06-01T00:27:51Z to 2012-12-03T17:34:17Z.
    
    A CSV format file with a total of 28 values: station_id, year, doy, hhmm, sec,
    box_pressure, paro_air_temp_period, paro_pressure_period, paro_air_temp,
    pressure, paro_cal_sig, box_rh, box_air_temp, air_temp_2, air_temp_3,
    air_temp_4, wind_speed, wind_dir, rh_shield_freq, rh, air_temp_6_3m, dewpoint,
    rtd_shield_freq, air_temp, solar_flux, precip, accum_precip, altimeter.
    
    XXX: Fill value in version 1 seems to be -99999.
    
    Version 2
    ---------
    Effective 2012-12-03T17:34:17Z to present.
    
    Same as Version 1 with the addition of altimeter2 at the end. I'm not sure why
    we have 2 altimeter values but as far as I know altimeter2 is not used.
    
    XXX: Fill value in version 2 seems to be -99999.
    """
    
    import re
    import io
    import time
    import logging
    from datetime import datetime, timedelta
    
    from metobscommon.util.mytime import hhmm_to_offset
    from aosstower.schema import database
    
    LOG = logging.getLogger(__name__)
    
    
    class LineParseError(Exception):
        """Error parsing line of frame data.
        """
        @classmethod
        def raise_wrapped(cls, exception, msg=None):
            import sys
            traceback = sys.exc_info()[2]
            msg = msg or str(exception)
            exc = cls(msg)
            exc.__traceback__ = traceback
            raise exc
    
    
    def _make_frame(data, new_symbols=None, rename_timestamp=False):
        """Construct a frame from a list of tuples.
        """
        frame = {}
        for idx, (key, value) in enumerate(data):
            if key in ['stamp', 'timestamp']:
                frame['stamp' if rename_timestamp else key] = value
                continue
            if key in database:
                try:
                    new_key = new_symbols[idx] if new_symbols and len(new_symbols) > idx else key
                    frame[new_key] = database[key].type(value)
                except (ValueError, TypeError):
                    raise LineParseError("error converting '%s' using %s",
                                         value, database[key].type)
        return frame 
    
    
    class ParserV0(object):
        """Parses Version 0 data lines.
        """
        fill_value = -99999.
    
        # maps v0 names to names in schema db
        names = {'ACCURAIN': 'accum_precip',
                 'TEMP107_1': 'box_air_temp',
                 'TEMP107_2': 'air_temp_2',
                 'TEMP107_3': 'air_temp_3',
                 'TEMP107_4': 'air_temp_4',
                 'TEMP107_5': 'air_temp_5',
                 'LI200X': 'solar_flux',
                 'RH41372': 'rh',
                 'TEMP41372': 'air_temp',
                 'CS105': 'box_pressure',
                 'PAROSCI': 'pressure',
                 'WSPD05305': 'wind_speed',
                 'WDIR05305': 'wind_dir',
                 'CS10162': 'box_rh',
                 'RAIN380M': 'precip'}
    
        @staticmethod
        def maybe_mine(line):
            return line.startswith('TIME')
    
        def make_frame(self, line):
            parts = line.split()
            if len(parts) != 32:
                raise LineParseError("Expected 32 components", line)
            raw_data = [('version', 0)]
            for k1, v1 in zip(parts[0::2], parts[1::2]):
                if k1 == 'TIME':
                    continue
                if k1 in self.names:
                    raw_data.append((self.names[k1], v1))
                else:
                    raise LineParseError("Unexpected var: %s" % k1, line)
            try:
                time_str = parts[1]
                unix_time = int(time_str)
                raw_data.append(('stamp', datetime.utcfromtimestamp(unix_time)))
            except (ValueError, TypeError):
                raise LineParseError("Could not parse stamp", line)
            return _make_frame(raw_data)
    
    
    class ParserV1V2(object):
        """Parses Version 1 & 2 data lines.
        """
        fill_value = -99999.
    
        names = ['station_id', 'year', 'doy', 'hhmm', 'sec', 'box_pressure',
                 'paro_air_temp_period', 'paro_pressure_period', 'paro_air_temp',
                 'pressure', 'paro_cal_sig', 'box_rh', 'box_air_temp',
                 'air_temp_2', 'air_temp_3', 'air_temp_4', 'wind_speed', 'wind_dir',
                 'rh_shield_freq', 'rh', 'air_temp_6_3m', 'dewpoint',
                 'rtd_shield_freq', 'air_temp', 'solar_flux', 'precip',
                 'accum_precip', 'altimeter']  # , 'altimeter2']
        # These are the new fields in the input files but unused by the rest of
        # the tower code. At the risk of breaking other pieces of software, these
        # are not included in the above list, but are documented here for future
        # reference.
        #
        # Altimeter2 (slightly different calculation, same units as Altimeter)
        # LW_in (W/m^2)
        # tempPyrg (Kelvin, temperature of pyrgeometer)
        # pyrgTP (W/m^2, raw reading from the pyrgeometer thermopile)
        # pyrgTC (W/m^2, temperature correction for the pyrgeometer)
        # TC_110 (degC, this is a second air temperature in the new aspirated radiation shield)
    
        @staticmethod
        def maybe_mine(line):
            return re.search('^\d,\d{4},\d{1,3}', line) is not None
    
        @staticmethod
        def _get_stamp(parts):
            year = int(parts[1])
            doy = int(parts[2])
            dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
            secs = hhmm_to_offset(parts[3])
            secs += float(parts[4])
            secs -= (secs % 5)
            dt += timedelta(seconds=secs)
            return dt
    
        def make_frame(self, line):
            parts = line.split(',')
            if len(parts) not in [28, 29, 33, 34]:
                raise LineParseError("Expected 28, 29, 33, or 34 parts", line)
            version = {28: 1, 29: 2, 33: 3, 34: 4}[len(parts)]
            raw_data = [('version', version)] + list(zip(self.names, parts))
            try:
                raw_data.append(('stamp', self._get_stamp(parts)))
            except (TypeError, ValueError):
                raise LineParseError("Could not parse timesamp", line)
            return _make_frame(raw_data)
    
    
    def read_frames(source, error_handler=lambda *a: None, tail=False):
        """Returns a generator for reading frames from `source`. Frames are
        checked line-by-line so frame line versions may be mixed.
    
        :param tail: starting from the end of the source (if 'seek' method) read lines forever
        """
        if hasattr(source, 'readlines'):
            fptr = source
        else:
            fptr = open(source)
        if tail and hasattr(fptr, "seek"):
            LOG.debug("Seeking to end of frame source")
            fptr.seek(0, io.SEEK_END)
            def gen():
                idx = 0
                while True:
                    line = fptr.readline()
                    if not line.strip():
                        time.sleep(0.1)
                        continue
                    yield idx, line
                    idx += 1
        else:
            def gen():
                for idx, line in enumerate(fptr):
                    if not line.strip():
                        continue
                    yield idx, line
    
        for idx, line in gen():
            if line.startswith('#'):
                continue
            for parser in [ParserV1V2(), ParserV0()]:
                if parser.maybe_mine(line):
                    try:
                        yield parser.make_frame(line)
                    except LineParseError as err:
                        error_handler(idx + 1, line, err)
                    break
    
            # yes, I know a for/else is obscure, but in this case it does
            # exactly what I need, it only executes if `break` does not execute
            else:
                error_handler(idx + 1, line, RuntimeError("no parser found", line))
    
    
    def loggernet_to_tower(rec_dict, symbol_names):
        """Convert loggernet record dictionary to our standard naming"""
        # assume that the next record after the traditional frame is the timestamp
        old_symbols = ['timestamp'] + ParserV1V2.names
        new_symbols = ['timestamp'] + symbol_names
        return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True)
    
    
    class LDMPGenerator(object):
        """Class to manage receiving records from Loggernet LDMP server."""
    
        def __init__(self, station_name, tables, symbol_names=ParserV1V2.names,
                     host='localhost', port=1024):
            from metobscommon.archive.loggernet_receiver import LDMPReceiver
            self.station_name = station_name
            self.tables = tables
            self.symbol_names = symbol_names
            self.receiver = LDMPReceiver(host, port)
    
        def __iter__(self):
            from metobscommon.archive.loggernet_receiver import dict_records
            self.receiver.start()
            # This should be generated OrderedDicts
            dict_rec_gen = dict_records(self.receiver, self.station_name,
                                        self.tables)
            return (loggernet_to_tower(x, self.symbol_names) for x in dict_rec_gen)
    
        def close(self):
            self.receiver.close()
    
        def __del__(self):
            """Last effort to kill the background thread if not done already."""
            try:
                self.close()
            except (ValueError, RuntimeError, IOError, OSError):
                pass