diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index 4afe9d38edbdf310e61981b40c55fa6267525378..4abc566d78b7262a76a598f7270e338f89a5d0b6 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -48,6 +48,16 @@ SYMBOL_CONVERSIONS = { SYMBOLS = list(SYMBOL_CONVERSIONS.values()) +def convert_to_influx_frame(record_gen, symbols, debug=False): + for idx, record in enumerate(record_gen): + LOG.info("Inserting records for frame %d", idx) + record = {symbols[k] or k: v for k, v in record.items() if k in symbols} + if debug: + print(idx, record) + continue + yield record + + def main(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -79,6 +89,10 @@ def main(): # help='Weather underground station ID') # parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt', # help='File containing the password for the weather underground upload') + parser.add_argument('--bulk', type=int, default=1, + help="Number of records to buffer before sending to " + "the database. For large inserts this should be " + "between 5000 to 10000. Default: 1") parser.add_argument('src', help='Level 0 raw data file or station name ' 'for LDMP reading') @@ -102,17 +116,12 @@ def main(): src = open(args.src, "r") record_gen = read_frames(src, tail=args.tail) - for idx, record in enumerate(record_gen): - LOG.info("Inserting records for frame %d", idx) - record = {symbols[k] or k: v for k, v in record.items() if k in symbols} - if args.debug: - print(idx, record) - else: + try: + influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug) + influx_gen = influxdb.grouper(influx_gen, args.bulk) + for record in influx_gen: lines = influxdb.frame_records(record, **station_tags) - resp = requests.post( - 'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname), - data='\n'.join(lines)) - resp.raise_for_status() + influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) # if wu_pw and args.ldmp: # stamp = record['timestamp'].isoformat('+') @@ -121,8 +130,11 @@ def main(): # winddir=int(record['wind_dir']), # ) # resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG') - if args.sleep_interval: - time.sleep(args.sleep_interval) + if args.sleep_interval: + time.sleep(args.sleep_interval) + except (RuntimeError, ValueError, KeyError, requests.RequestException): + if hasattr(record_gen, 'close'): + record_gen.close() if __name__ == "__main__": diff --git a/aosstower/level_00/parser.py b/aosstower/level_00/parser.py index c1188d52bf38b818bfe0662123560620c8223119..026a4004fd42edfacb992c26212a83e581ce537f 100644 --- a/aosstower/level_00/parser.py +++ b/aosstower/level_00/parser.py @@ -224,9 +224,31 @@ def loggernet_to_tower(rec_dict, symbol_names): return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True) -def ldmp_generator(station_name, tables, symbol_names=ParserV1V2.names): - from metobscommon.archive.loggernet_receiver import dict_records, ldmp_receiver - receiver = ldmp_receiver() - # This should be generated OrderedDicts - dict_rec_gen = dict_records(receiver, station_name, tables) - return (loggernet_to_tower(x, symbol_names) for x in dict_rec_gen) +class LDMPGenerator(object): + """Class to manage receiving records from Loggernet LDMP server.""" + + def __init__(self, station_name, tables, symbols_names=ParserV1V2.names, + host='localhost', port=1024): + from metobscommon.archive.loggernet_receiver import LDMPReceiver + self.station_name = station_name + self.tables = tables + self.symbol_names = symbols_names + self.receiver = LDMPReceiver(host, port) + + def __iter__(self): + from metobscommon.archive.loggernet_receiver import dict_records + self.receiver.start() + # This should be generated OrderedDicts + dict_rec_gen = dict_records(self.receiver, self.station_name, + self.tables) + return (loggernet_to_tower(x, self.symbol_names) for x in dict_rec_gen) + + def close(self): + self.receiver.close() + + def __del__(self): + """Last effort to kill the background thread if not done already.""" + try: + self.close() + except (ValueError, RuntimeError, IOError, OSError): + pass