diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index dbd9f762c9d68a696fe628b640d4aa04f7cf40dd..e24ac299b82bd59f03b53ee467b0a611a91b1cde 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -16,6 +16,7 @@ LOG = logging.getLogger(__name__) STATIONS = { "AOSS Tower": {"inst": "tower", "site": "aoss"}, } +# parser symbols to influxdb symbols SYMBOL_CONVERSIONS = { 'stamp': 'timestamp', 'box_temp': 'box_temp', @@ -46,6 +47,7 @@ SYMBOL_CONVERSIONS = { } SYMBOLS = list(SYMBOL_CONVERSIONS.values()) + def main(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -53,6 +55,10 @@ def main(): parser.add_argument('-t', '--tail', action='store_true', help=('Tail file forever, not returning. This will start at the end ' 'of the file and insert any new data added after starting')) + parser.add_argument('--ldmp', action='store_true', + help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)') + parser.add_argument('--tables', nargs='*', default=['RIGTower'], + help="LoggerNet LDMP tables to read in") parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection") parser.add_argument("--port", default=influxdb.DB_PORT, @@ -74,10 +80,16 @@ def main(): LOG.info("source: %s", args.src) LOG.info("tail: %s", args.tail) station_tags = STATIONS[args.station] - src = open(args.src, "r") symbols = SYMBOL_CONVERSIONS - for idx, record in enumerate(read_frames(src, tail=args.tail)): + if args.ldmp: + from aosstower.level_00.parser import ldmp_generator + record_gen = ldmp_generator(args.src, args.tables) + else: + src = open(args.src, "r") + record_gen = read_frames(src, tail=args.tail) + + for idx, record in enumerate(record_gen): LOG.info("Inserting records for frame %d", idx) record = {symbols[k] or k: v for k, v in record.items() if k in symbols} lines = influxdb.frame_records(record, **station_tags) diff --git a/aosstower/level_00/parser.py b/aosstower/level_00/parser.py index 4dc22a1774e51237740726446f12eb48d4279471..17e8f118df99cd7530d55e2f8a1d37e1b1009d85 100644 --- a/aosstower/level_00/parser.py +++ b/aosstower/level_00/parser.py @@ -145,7 +145,8 @@ class ParserV1V2(object): def maybe_mine(line): return re.search('^\d,\d{4},\d{1,3}', line) is not None - def _get_stamp(self, parts): + @staticmethod + def _get_stamp(parts): year = int(parts[1]) doy = int(parts[2]) dt = datetime.strptime('{:d}.{:03d}'.format(int(year), int(doy)), '%Y.%j') @@ -212,3 +213,16 @@ def read_frames(source, error_handler=lambda *a: None, tail=False): # exactly what I need, it only executes if `break` does not execute else: error_handler(idx + 1, line, RuntimeError("no parser found", line)) + + +def loggernet_to_tower(rec_dict): + """Convert loggernet record dictionary to our standard naming""" + return _make_frame(zip(ParserV1V2.names, rec_dict.values())) + + +def ldmp_generator(station_name, tables): + from metobscommon.archive.loggernet_receiver import dict_records, ldmp_receiver + receiver = ldmp_receiver() + # This should be generated OrderedDicts + dict_rec_gen = dict_records(receiver, station_name, tables) + return (loggernet_to_tower(x) for x in dict_rec_gen)