From 54cf52dcd469d4e40b6f0251afc59374a17a86c1 Mon Sep 17 00:00:00 2001 From: davidh-ssec <david.hoese@ssec.wisc.edu> Date: Thu, 14 Dec 2017 17:07:06 -0600 Subject: [PATCH] Add loggernet parsing to influxdb injector --- aosstower/level_00/influxdb.py | 16 ++++++++++++++-- aosstower/level_00/parser.py | 16 +++++++++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index dbd9f76..e24ac29 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -16,6 +16,7 @@ LOG = logging.getLogger(__name__) STATIONS = { "AOSS Tower": {"inst": "tower", "site": "aoss"}, } +# parser symbols to influxdb symbols SYMBOL_CONVERSIONS = { 'stamp': 'timestamp', 'box_temp': 'box_temp', @@ -46,6 +47,7 @@ SYMBOL_CONVERSIONS = { } SYMBOLS = list(SYMBOL_CONVERSIONS.values()) + def main(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -53,6 +55,10 @@ def main(): parser.add_argument('-t', '--tail', action='store_true', help=('Tail file forever, not returning. This will start at the end ' 'of the file and insert any new data added after starting')) + parser.add_argument('--ldmp', action='store_true', + help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)') + parser.add_argument('--tables', nargs='*', default=['RIGTower'], + help="LoggerNet LDMP tables to read in") parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection") parser.add_argument("--port", default=influxdb.DB_PORT, @@ -74,10 +80,16 @@ def main(): LOG.info("source: %s", args.src) LOG.info("tail: %s", args.tail) station_tags = STATIONS[args.station] - src = open(args.src, "r") symbols = SYMBOL_CONVERSIONS - for idx, record in enumerate(read_frames(src, tail=args.tail)): + if args.ldmp: + from aosstower.level_00.parser import ldmp_generator + record_gen = ldmp_generator(args.src, args.tables) + else: + src = open(args.src, "r") + record_gen = read_frames(src, tail=args.tail) + + for idx, record in enumerate(record_gen): LOG.info("Inserting records for frame %d", idx) record = {symbols[k] or k: v for k, v in record.items() if k in symbols} lines = influxdb.frame_records(record, **station_tags) diff --git a/aosstower/level_00/parser.py b/aosstower/level_00/parser.py index 4dc22a1..17e8f11 100644 --- a/aosstower/level_00/parser.py +++ b/aosstower/level_00/parser.py @@ -145,7 +145,8 @@ class ParserV1V2(object): def maybe_mine(line): return re.search('^\d,\d{4},\d{1,3}', line) is not None - def _get_stamp(self, parts): + @staticmethod + def _get_stamp(parts): year = int(parts[1]) doy = int(parts[2]) dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j') @@ -212,3 +213,16 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): # exactly what I need, it only executes if `break` does not execute else: error_handler(idx + 1, line, RuntimeError("no parser found", line)) + + +def loggernet_to_tower(rec_dict): + """Convert loggernet record dictionary to our standard naming""" + return _make_frame(zip(ParserV1V2.names, rec_dict.values())) + + +def ldmp_generator(station_name, tables): + from metobscommon.archive.loggernet_receiver import dict_records, ldmp_receiver + receiver = ldmp_receiver() + # This should be generated OrderedDicts + dict_rec_gen = dict_records(receiver, station_name, tables) + return (loggernet_to_tower(x) for x in dict_rec_gen) -- GitLab