#!/usr/bin/env python # encoding: utf8 """Insert Tower data in to an InfluxDB for real time use. """ import logging import logging.handlers import time import sys import requests from metobscommon import influxdb from aosstower.level_00.parser import read_frames LOG = logging.getLogger(__name__) # map station name to InfluxDB tags STATIONS = { "AOSS Tower": {"inst": "tower", "site": "aoss"}, } # parser symbols to influxdb symbols SYMBOL_CONVERSIONS = { 'stamp': 'timestamp', 'box_temp': 'box_temp', 'box_pressure': 'box_pressure', 'paro_air_temp_period': 'paro_air_temp_period', 'paro_pressure_period': 'paro_pressure_period', 'paro_air_temp': 'paro_air_temp', 'pressure': 'pressure', 'paro_cal_sig': 'paro_cal_sig', 'box_rh': 'box_rh', 'box_air_temp': 'box_air_temp', 'air_temp_2': 'air_temp_2', 'air_temp_3': 'air_temp_3', 'air_temp_4': 'air_temp_4', 'air_temp_5': 'air_temp_5', 'wind_speed': 'wind_speed', 'wind_dir': 'wind_dir', 'rh_shield_freq': 'rh_shield_freq', 'rh': 'rel_hum', 'air_temp_6_3m': 'air_temp_6_3m', 'dewpoint': 'dewpoint', 'rtd_shield_freq': 'rtd_shied_freq', 'air_temp': 'air_temp', 'solar_flux': 'solar_flux', 'precip': 'precip', 'accum_precip': 'accum_precip', 'altimeter': 'altimeter', } SYMBOLS = list(SYMBOL_CONVERSIONS.values()) def main(): import argparse parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--logfn', help='Log to rotating file (Not Implemented)') parser.add_argument('--debug', action='store_true', help='Don\'t submit records to the database, print them to stdout') parser.add_argument('-t', '--tail', action='store_true', help=('Tail file forever, not returning. This will start at the end ' 'of the file and insert any new data added after starting')) parser.add_argument('--ldmp', action='store_true', help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)') parser.add_argument('--tables', nargs='*', default=['1'], help="LoggerNet LDMP tables to read in") parser.add_argument("--host", default=influxdb.DB_HOST, help="Hostname of database connection") parser.add_argument("--port", default=influxdb.DB_PORT, help="Port of database connection") parser.add_argument("--dbname", default=influxdb.DB_NAME, help="Name of database to modify") parser.add_argument('-s', '--station', dest='station', default='AOSS Tower', choices=STATIONS.keys(), help='Name of station to use to determine symbols') parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0, help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') parser.add_argument('--sleep-interval', type=float, help="Seconds to wait between submitting each record") # parser.add_argument('--weather-underground', action='store_true', # help="Send new records to wunderground.com") # parser.add_argument('--wu-id', default='KWIMADIS52', # help='Weather underground station ID') # parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt', # help='File containing the password for the weather underground upload') parser.add_argument('src', help='Level 0 raw data file') args = parser.parse_args() levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] logging.basicConfig(level=levels[min(3, args.verbosity)]) LOG.info("source: %s", args.src) LOG.info("tail: %s", args.tail) station_tags = STATIONS[args.station] symbols = SYMBOL_CONVERSIONS # wu_pw = None # if args.weather_underground: # wu_pw = open(args.wu_password_file, 'r').read().strip() if args.ldmp: from aosstower.level_00.parser import ldmp_generator record_gen = ldmp_generator(args.src, args.tables) else: src = open(args.src, "r") record_gen = read_frames(src, tail=args.tail) for idx, record in enumerate(record_gen): LOG.info("Inserting records for frame %d", idx) record = {symbols[k] or k: v for k, v in record.items() if k in symbols} if args.debug: print(idx, record) else: lines = influxdb.frame_records(record, **station_tags) resp = requests.post( 'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname), data='\n'.join(lines)) resp.raise_for_status() if wu_pw and args.ldmp: stamp = record['timestamp'].isoformat('+') record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format( stamp=stamp, winddir=int(record['wind_dir']), ) resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG') if args.sleep_interval: time.sleep(args.sleep_interval) if __name__ == "__main__": sys.exit(main())