Skip to content
Snippets Groups Projects
Verified Commit 54cf52dc authored by David Hoese's avatar David Hoese
Browse files

Add loggernet parsing to influxdb injector

parent eec24620
No related branches found
No related tags found
No related merge requests found
...@@ -16,6 +16,7 @@ LOG = logging.getLogger(__name__) ...@@ -16,6 +16,7 @@ LOG = logging.getLogger(__name__)
STATIONS = { STATIONS = {
"AOSS Tower": {"inst": "tower", "site": "aoss"}, "AOSS Tower": {"inst": "tower", "site": "aoss"},
} }
# parser symbols to influxdb symbols
SYMBOL_CONVERSIONS = { SYMBOL_CONVERSIONS = {
'stamp': 'timestamp', 'stamp': 'timestamp',
'box_temp': 'box_temp', 'box_temp': 'box_temp',
...@@ -46,6 +47,7 @@ SYMBOL_CONVERSIONS = { ...@@ -46,6 +47,7 @@ SYMBOL_CONVERSIONS = {
} }
SYMBOLS = list(SYMBOL_CONVERSIONS.values()) SYMBOLS = list(SYMBOL_CONVERSIONS.values())
def main(): def main():
import argparse import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
...@@ -53,6 +55,10 @@ def main(): ...@@ -53,6 +55,10 @@ def main():
parser.add_argument('-t', '--tail', action='store_true', parser.add_argument('-t', '--tail', action='store_true',
help=('Tail file forever, not returning. This will start at the end ' help=('Tail file forever, not returning. This will start at the end '
'of the file and insert any new data added after starting')) 'of the file and insert any new data added after starting'))
parser.add_argument('--ldmp', action='store_true',
help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)')
parser.add_argument('--tables', nargs='*', default=['RIGTower'],
help="LoggerNet LDMP tables to read in")
parser.add_argument("--host", default=influxdb.DB_HOST, parser.add_argument("--host", default=influxdb.DB_HOST,
help="Hostname of database connection") help="Hostname of database connection")
parser.add_argument("--port", default=influxdb.DB_PORT, parser.add_argument("--port", default=influxdb.DB_PORT,
...@@ -74,10 +80,16 @@ def main(): ...@@ -74,10 +80,16 @@ def main():
LOG.info("source: %s", args.src) LOG.info("source: %s", args.src)
LOG.info("tail: %s", args.tail) LOG.info("tail: %s", args.tail)
station_tags = STATIONS[args.station] station_tags = STATIONS[args.station]
src = open(args.src, "r")
symbols = SYMBOL_CONVERSIONS symbols = SYMBOL_CONVERSIONS
for idx, record in enumerate(read_frames(src, tail=args.tail)): if args.ldmp:
from aosstower.level_00.parser import ldmp_generator
record_gen = ldmp_generator(args.src, args.tables)
else:
src = open(args.src, "r")
record_gen = read_frames(src, tail=args.tail)
for idx, record in enumerate(record_gen):
LOG.info("Inserting records for frame %d", idx) LOG.info("Inserting records for frame %d", idx)
record = {symbols[k] or k: v for k, v in record.items() if k in symbols} record = {symbols[k] or k: v for k, v in record.items() if k in symbols}
lines = influxdb.frame_records(record, **station_tags) lines = influxdb.frame_records(record, **station_tags)
......
...@@ -145,7 +145,8 @@ class ParserV1V2(object): ...@@ -145,7 +145,8 @@ class ParserV1V2(object):
def maybe_mine(line): def maybe_mine(line):
return re.search('^\d,\d{4},\d{1,3}', line) is not None return re.search('^\d,\d{4},\d{1,3}', line) is not None
def _get_stamp(self, parts): @staticmethod
def _get_stamp(parts):
year = int(parts[1]) year = int(parts[1])
doy = int(parts[2]) doy = int(parts[2])
dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j') dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j')
...@@ -212,3 +213,16 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): ...@@ -212,3 +213,16 @@ def read_frames(source, error_handler=lambda *a: None, tail=False):
# exactly what I need, it only executes if `break` does not execute # exactly what I need, it only executes if `break` does not execute
else: else:
error_handler(idx + 1, line, RuntimeError("no parser found", line)) error_handler(idx + 1, line, RuntimeError("no parser found", line))
def loggernet_to_tower(rec_dict):
"""Convert loggernet record dictionary to our standard naming"""
return _make_frame(zip(ParserV1V2.names, rec_dict.values()))
def ldmp_generator(station_name, tables):
from metobscommon.archive.loggernet_receiver import dict_records, ldmp_receiver
receiver = ldmp_receiver()
# This should be generated OrderedDicts
dict_rec_gen = dict_records(receiver, station_name, tables)
return (loggernet_to_tower(x) for x in dict_rec_gen)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment