Skip to content
Snippets Groups Projects
Verified Commit a519f4f6 authored by David Hoese's avatar David Hoese
Browse files

Update influxdb to support bulk inserts

parent 223c5580
No related branches found
No related tags found
No related merge requests found
...@@ -48,6 +48,16 @@ SYMBOL_CONVERSIONS = { ...@@ -48,6 +48,16 @@ SYMBOL_CONVERSIONS = {
SYMBOLS = list(SYMBOL_CONVERSIONS.values()) SYMBOLS = list(SYMBOL_CONVERSIONS.values())
def convert_to_influx_frame(record_gen, symbols, debug=False):
for idx, record in enumerate(record_gen):
LOG.info("Inserting records for frame %d", idx)
record = {symbols[k] or k: v for k, v in record.items() if k in symbols}
if debug:
print(idx, record)
continue
yield record
def main(): def main():
import argparse import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
...@@ -79,6 +89,10 @@ def main(): ...@@ -79,6 +89,10 @@ def main():
# help='Weather underground station ID') # help='Weather underground station ID')
# parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt', # parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
# help='File containing the password for the weather underground upload') # help='File containing the password for the weather underground upload')
parser.add_argument('--bulk', type=int, default=1,
help="Number of records to buffer before sending to "
"the database. For large inserts this should be "
"between 5000 to 10000. Default: 1")
parser.add_argument('src', help='Level 0 raw data file or station name ' parser.add_argument('src', help='Level 0 raw data file or station name '
'for LDMP reading') 'for LDMP reading')
...@@ -102,17 +116,12 @@ def main(): ...@@ -102,17 +116,12 @@ def main():
src = open(args.src, "r") src = open(args.src, "r")
record_gen = read_frames(src, tail=args.tail) record_gen = read_frames(src, tail=args.tail)
for idx, record in enumerate(record_gen): try:
LOG.info("Inserting records for frame %d", idx) influx_gen = convert_to_influx_frame(record_gen, symbols, args.debug)
record = {symbols[k] or k: v for k, v in record.items() if k in symbols} influx_gen = influxdb.grouper(influx_gen, args.bulk)
if args.debug: for record in influx_gen:
print(idx, record)
else:
lines = influxdb.frame_records(record, **station_tags) lines = influxdb.frame_records(record, **station_tags)
resp = requests.post( influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname)
'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname),
data='\n'.join(lines))
resp.raise_for_status()
# if wu_pw and args.ldmp: # if wu_pw and args.ldmp:
# stamp = record['timestamp'].isoformat('+') # stamp = record['timestamp'].isoformat('+')
...@@ -121,8 +130,11 @@ def main(): ...@@ -121,8 +130,11 @@ def main():
# winddir=int(record['wind_dir']), # winddir=int(record['wind_dir']),
# ) # )
# resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG') # resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG')
if args.sleep_interval: if args.sleep_interval:
time.sleep(args.sleep_interval) time.sleep(args.sleep_interval)
except (RuntimeError, ValueError, KeyError, requests.RequestException):
if hasattr(record_gen, 'close'):
record_gen.close()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -224,9 +224,31 @@ def loggernet_to_tower(rec_dict, symbol_names): ...@@ -224,9 +224,31 @@ def loggernet_to_tower(rec_dict, symbol_names):
return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True) return _make_frame(zip(old_symbols, rec_dict.values()), new_symbols, rename_timestamp=True)
def ldmp_generator(station_name, tables, symbol_names=ParserV1V2.names): class LDMPGenerator(object):
from metobscommon.archive.loggernet_receiver import dict_records, ldmp_receiver """Class to manage receiving records from Loggernet LDMP server."""
receiver = ldmp_receiver()
# This should be generated OrderedDicts def __init__(self, station_name, tables, symbols_names=ParserV1V2.names,
dict_rec_gen = dict_records(receiver, station_name, tables) host='localhost', port=1024):
return (loggernet_to_tower(x, symbol_names) for x in dict_rec_gen) from metobscommon.archive.loggernet_receiver import LDMPReceiver
self.station_name = station_name
self.tables = tables
self.symbol_names = symbols_names
self.receiver = LDMPReceiver(host, port)
def __iter__(self):
from metobscommon.archive.loggernet_receiver import dict_records
self.receiver.start()
# This should be generated OrderedDicts
dict_rec_gen = dict_records(self.receiver, self.station_name,
self.tables)
return (loggernet_to_tower(x, self.symbol_names) for x in dict_rec_gen)
def close(self):
self.receiver.close()
def __del__(self):
"""Last effort to kill the background thread if not done already."""
try:
self.close()
except (ValueError, RuntimeError, IOError, OSError):
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment