Skip to content
Snippets Groups Projects
Unverified Commit 9d80d968 authored by David Hoese's avatar David Hoese
Browse files

Add initial influxdb work

parent 482f282a
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
"""Insert Tower data in to an InfluxDB for real time use.
"""
import logging
import logging.handlers
import time
import sys
import requests
from metobscommon import influxdb
from aosstower.l00.parser import read_frames
LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags
STATIONS = {
"AOSS Tower": {"instrument": "tower", "site": "aoss"},
}
def main():
import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--logfn', help='Log to rotating file')
parser.add_argument('-t', '--tail', action='store_true',
help=('Tail file forever, not returning. This will start at the end '
'of the file and insert any new data added after starting'))
parser.add_argument("--host", default=influxdb.DB_HOST,
help="Hostname of database connection")
parser.add_argument("--port", default=influxdb.DB_PORT,
help="Port of database connection")
parser.add_argument("--dbname", default=influxdb.DB_NAME,
help="Name of database to modify")
parser.add_argument('-s', '--station', dest='station', default='AOSS Tower', choices=STATIONS.keys(),
help='Name of station to use to determine symbols')
parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
parser.add_argument('src', help='Level 0 raw data file')
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
LOG.info("source: %s", args.src)
LOG.info("tail: %s", args.tail)
station_tags = STATIONS[args.station]
src = open(args.src, "r")
for idx, record in enumerate(read_frames(src, tail=args.tail)):
LOG.info("Inserting records for frame %d", idx)
lines = influxdb.frame_records(record, **station_tags)
resp = requests.post(
'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname),
data='\n'.join(lines))
resp.raise_for_status()
if __name__ == "__main__":
sys.exit(main())
...@@ -34,6 +34,8 @@ we have 2 altimeter values but as far as I know altimeter2 is not used. ...@@ -34,6 +34,8 @@ we have 2 altimeter values but as far as I know altimeter2 is not used.
""" """
import re import re
import io
import time
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
...@@ -51,7 +53,9 @@ class LineParseError(Exception): ...@@ -51,7 +53,9 @@ class LineParseError(Exception):
import sys import sys
traceback = sys.exc_info()[2] traceback = sys.exc_info()[2]
msg = msg or str(exception) msg = msg or str(exception)
raise cls(msg), None, traceback exc = cls(msg)
exc.__traceback__ = traceback
raise exc
def _make_frame(data): def _make_frame(data):
...@@ -148,7 +152,7 @@ class ParserV1V2(object): ...@@ -148,7 +152,7 @@ class ParserV1V2(object):
if len(parts) not in [28, 29]: if len(parts) not in [28, 29]:
raise LineParseError("Expected 28 or 29 parts", line) raise LineParseError("Expected 28 or 29 parts", line)
version = 1 if len(parts) == 28 else 2 version = 1 if len(parts) == 28 else 2
raw_data = [('version', version)] + zip(self.names, parts) raw_data = [('version', version)] + list(zip(self.names, parts))
try: try:
raw_data.append(('stamp', self._get_stamp(parts))) raw_data.append(('stamp', self._get_stamp(parts)))
except (TypeError, ValueError): except (TypeError, ValueError):
...@@ -156,17 +160,36 @@ class ParserV1V2(object): ...@@ -156,17 +160,36 @@ class ParserV1V2(object):
return _make_frame(raw_data) return _make_frame(raw_data)
def read_frames(source, error_handler=lambda *a: None): def read_frames(source, error_handler=lambda *a: None, tail=False):
"""Returns a generator for reading frames from `source`. Frames are """Returns a generator for reading frames from `source`. Frames are
checked line-by-line so frame line versions may be mixed. checked line-by-line so frame line versions may be mixed.
:param tail: starting from the end of the source (if 'seek' method) read lines forever
""" """
if hasattr(source, 'readlines'): if hasattr(source, 'readlines'):
fptr = source fptr = source
else: else:
fptr = open(source) fptr = open(source)
if tail and hasattr(fptr, "seek"):
for idx, line in enumerate(fptr.readlines()): LOG.debug("Seeking to end of frame source")
if not line.strip() or line.startswith('#'): fptr.seek(0, io.SEEK_END)
def gen():
idx = 0
while True:
line = fptr.readline()
if not line.strip():
time.sleep(0.1)
yield idx, line
idx += 1
else:
def gen():
for idx, line in enumerate(fptr):
if not line.strip():
continue
yield idx, line
for idx, line in gen():
if line.startswith('#'):
continue continue
for parser in [ParserV1V2(), ParserV0()]: for parser in [ParserV1V2(), ParserV0()]:
if parser.maybe_mine(line): if parser.maybe_mine(line):
......
...@@ -7,7 +7,6 @@ setup( ...@@ -7,7 +7,6 @@ setup(
url='http://metobs.ssec.wisc.edu', url='http://metobs.ssec.wisc.edu',
install_requires=[ install_requires=[
'numpy', 'numpy',
'MetObsCommon>=0.1dev'
], ],
dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'], dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'],
packages=find_packages(exclude=['aosstower.tests']), packages=find_packages(exclude=['aosstower.tests']),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment