Skip to content
Snippets Groups Projects
influxdb.py 5.7 KiB
Newer Older
David Hoese's avatar
David Hoese committed
#!/usr/bin/env python
David Hoese's avatar
David Hoese committed
"""Insert Tower data in to an InfluxDB for real time use.

"""
import logging
import logging.handlers
import time
import sys
import requests
from metobscommon import influxdb
from aosstower.level_00.parser import read_frames
David Hoese's avatar
David Hoese committed

LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags
STATIONS = {
    "AOSS Tower": {"inst": "tower", "site": "aoss"},
# parser symbols to influxdb symbols
SYMBOL_CONVERSIONS = {
    'stamp': 'timestamp',
    'box_temp': 'box_temp',
    'box_pressure': 'box_pressure',
    'paro_air_temp_period': 'paro_air_temp_period',
    'paro_pressure_period': 'paro_pressure_period',
    'paro_air_temp': 'paro_air_temp',
    'pressure': 'pressure',
    'paro_cal_sig': 'paro_cal_sig',
    'box_rh': 'box_rh',
    'box_air_temp': 'box_air_temp',
    'air_temp_2': 'air_temp_2',
    'air_temp_3': 'air_temp_3',
    'air_temp_4': 'air_temp_4',
    'air_temp_5': 'air_temp_5',
    'wind_speed': 'wind_speed',
    'wind_dir': 'wind_dir',
    'rh_shield_freq': 'rh_shield_freq',
    'rh': 'rel_hum',
    'air_temp_6_3m': 'air_temp_6_3m',
    'dewpoint': 'dewpoint',
    'rtd_shield_freq': 'rtd_shied_freq',
    'air_temp': 'air_temp',
    'solar_flux': 'solar_flux',
    'precip': 'precip',
    'accum_precip': 'accum_precip',
    'altimeter': 'altimeter',
}
SYMBOLS = list(SYMBOL_CONVERSIONS.values())
David Hoese's avatar
David Hoese committed
def main():
    import argparse
    parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--logfn', help='Log to rotating file (Not Implemented)')
    parser.add_argument('--debug', action='store_true',
                        help='Don\'t submit records to the database, print them to stdout')
David Hoese's avatar
David Hoese committed
    parser.add_argument('-t', '--tail', action='store_true',
                        help=('Tail file forever, not returning. This will start at the end '
                              'of the file and insert any new data added after starting'))
    parser.add_argument('--ldmp', action='store_true',
                        help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)')
    parser.add_argument('--tables', nargs='*', default=['1'],
                        help="LoggerNet LDMP tables to read in")
David Hoese's avatar
David Hoese committed
    parser.add_argument("--host", default=influxdb.DB_HOST,
                        help="Hostname of database connection")
    parser.add_argument("--port", default=influxdb.DB_PORT,
                        help="Port of database connection")
    parser.add_argument("--dbname", default=influxdb.DB_NAME,
                        help="Name of database to modify")
    parser.add_argument('-s', '--station', dest='station', default='AOSS Tower', choices=STATIONS.keys(),
                        help='Name of station to use to determine symbols')
    parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
                        help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
    parser.add_argument('--sleep-interval', type=float,
                        help="Seconds to wait between submitting each record")
    # parser.add_argument('--weather-underground', action='store_true',
    #                     help="Send new records to wunderground.com")
    # parser.add_argument('--wu-id', default='KWIMADIS52',
    #                     help='Weather underground station ID')
    # parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
    #                     help='File containing the password for the weather underground upload')
David Hoese's avatar
David Hoese committed
    parser.add_argument('src', help='Level 0 raw data file')

    args = parser.parse_args()
    levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
    logging.basicConfig(level=levels[min(3, args.verbosity)])

    LOG.info("source: %s", args.src)
    LOG.info("tail: %s", args.tail)
    station_tags = STATIONS[args.station]
    symbols = SYMBOL_CONVERSIONS
    # wu_pw = None
    # if args.weather_underground:
    #     wu_pw = open(args.wu_password_file, 'r').read().strip()

    if args.ldmp:
        from aosstower.level_00.parser import ldmp_generator
        record_gen = ldmp_generator(args.src, args.tables)
    else:
        src = open(args.src, "r")
        record_gen = read_frames(src, tail=args.tail)

    for idx, record in enumerate(record_gen):
David Hoese's avatar
David Hoese committed
        LOG.info("Inserting records for frame %d", idx)
        record = {symbols[k] or k: v for k, v in record.items() if k in symbols}
        if args.debug:
            print(idx, record)
        else:
            lines = influxdb.frame_records(record, **station_tags)
            resp = requests.post(
                'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname),
                data='\n'.join(lines))
            resp.raise_for_status()

            if wu_pw and args.ldmp:
                stamp = record['timestamp'].isoformat('+')
                record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format(
                    stamp=stamp,
                    winddir=int(record['wind_dir']),
                )
                resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG')
        if args.sleep_interval:
            time.sleep(args.sleep_interval)
David Hoese's avatar
David Hoese committed

if __name__ == "__main__":
    sys.exit(main())