From 9d80d96812e73b1ce0e33f3905e7ad484fb6e007 Mon Sep 17 00:00:00 2001 From: davidh-ssec <david.hoese@ssec.wisc.edu> Date: Tue, 19 Apr 2016 14:42:07 -0500 Subject: [PATCH] Add initial influxdb work --- aosstower/l00/influxdb.py | 57 +++++++++++++++++++++++++++++++++++++++ aosstower/l00/parser.py | 35 +++++++++++++++++++----- setup.py | 1 - 3 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 aosstower/l00/influxdb.py diff --git a/aosstower/l00/influxdb.py b/aosstower/l00/influxdb.py new file mode 100644 index 0000000..50a1cb7 --- /dev/null +++ b/aosstower/l00/influxdb.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +"""Insert Tower data in to an InfluxDB for real time use. + +""" +import logging +import logging.handlers +import time +import sys +import requests +from metobscommon import influxdb +from aosstower.l00.parser import read_frames + +LOG = logging.getLogger(__name__) +# map station name to InfluxDB tags +STATIONS = { + "AOSS Tower": {"instrument": "tower", "site": "aoss"}, +} + + +def main(): + import argparse + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--logfn', help='Log to rotating file') + parser.add_argument('-t', '--tail', action='store_true', + help=('Tail file forever, not returning. This will start at the end ' + 'of the file and insert any new data added after starting')) + parser.add_argument("--host", default=influxdb.DB_HOST, + help="Hostname of database connection") + parser.add_argument("--port", default=influxdb.DB_PORT, + help="Port of database connection") + parser.add_argument("--dbname", default=influxdb.DB_NAME, + help="Name of database to modify") + parser.add_argument('-s', '--station', dest='station', default='AOSS Tower', choices=STATIONS.keys(), + help='Name of station to use to determine symbols') + parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0, + help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') + parser.add_argument('src', help='Level 0 raw data file') + + args = parser.parse_args() + levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] + logging.basicConfig(level=levels[min(3, args.verbosity)]) + + LOG.info("source: %s", args.src) + LOG.info("tail: %s", args.tail) + station_tags = STATIONS[args.station] + src = open(args.src, "r") + + for idx, record in enumerate(read_frames(src, tail=args.tail)): + LOG.info("Inserting records for frame %d", idx) + lines = influxdb.frame_records(record, **station_tags) + resp = requests.post( + 'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname), + data='\n'.join(lines)) + resp.raise_for_status() + +if __name__ == "__main__": + sys.exit(main()) diff --git a/aosstower/l00/parser.py b/aosstower/l00/parser.py index b80067e..483c26f 100644 --- a/aosstower/l00/parser.py +++ b/aosstower/l00/parser.py @@ -34,6 +34,8 @@ we have 2 altimeter values but as far as I know altimeter2 is not used. """ import re +import io +import time import logging from datetime import datetime, timedelta @@ -51,7 +53,9 @@ class LineParseError(Exception): import sys traceback = sys.exc_info()[2] msg = msg or str(exception) - raise cls(msg), None, traceback + exc = cls(msg) + exc.__traceback__ = traceback + raise exc def _make_frame(data): @@ -148,7 +152,7 @@ class ParserV1V2(object): if len(parts) not in [28, 29]: raise LineParseError("Expected 28 or 29 parts", line) version = 1 if len(parts) == 28 else 2 - raw_data = [('version', version)] + zip(self.names, parts) + raw_data = [('version', version)] + list(zip(self.names, parts)) try: raw_data.append(('stamp', self._get_stamp(parts))) except (TypeError, ValueError): @@ -156,17 +160,36 @@ class ParserV1V2(object): return _make_frame(raw_data) -def read_frames(source, error_handler=lambda *a: None): +def read_frames(source, error_handler=lambda *a: None, tail=False): """Returns a generator for reading frames from `source`. Frames are checked line-by-line so frame line versions may be mixed. + + :param tail: starting from the end of the source (if 'seek' method) read lines forever """ if hasattr(source, 'readlines'): fptr = source else: fptr = open(source) - - for idx, line in enumerate(fptr.readlines()): - if not line.strip() or line.startswith('#'): + if tail and hasattr(fptr, "seek"): + LOG.debug("Seeking to end of frame source") + fptr.seek(0, io.SEEK_END) + def gen(): + idx = 0 + while True: + line = fptr.readline() + if not line.strip(): + time.sleep(0.1) + yield idx, line + idx += 1 + else: + def gen(): + for idx, line in enumerate(fptr): + if not line.strip(): + continue + yield idx, line + + for idx, line in gen(): + if line.startswith('#'): continue for parser in [ParserV1V2(), ParserV0()]: if parser.maybe_mine(line): diff --git a/setup.py b/setup.py index 54ee24a..800457c 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,6 @@ setup( url='http://metobs.ssec.wisc.edu', install_requires=[ 'numpy', - 'MetObsCommon>=0.1dev' ], dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'], packages=find_packages(exclude=['aosstower.tests']), -- GitLab