Newer
Older
# encoding: utf8
"""Insert Tower data in to an InfluxDB for real time use.
"""
import logging
import logging.handlers
import time
import sys
import requests
from metobscommon import influxdb
from aosstower.level_00.parser import read_frames
LOG = logging.getLogger(__name__)
# map station name to InfluxDB tags
STATIONS = {
"AOSS Tower": {"inst": "tower", "site": "aoss"},
# parser symbols to influxdb symbols
SYMBOL_CONVERSIONS = {
'stamp': 'timestamp',
'box_temp': 'box_temp',
'box_pressure': 'box_pressure',
'paro_air_temp_period': 'paro_air_temp_period',
'paro_pressure_period': 'paro_pressure_period',
'paro_air_temp': 'paro_air_temp',
'pressure': 'pressure',
'paro_cal_sig': 'paro_cal_sig',
'box_rh': 'box_rh',
'box_air_temp': 'box_air_temp',
'air_temp_2': 'air_temp_2',
'air_temp_3': 'air_temp_3',
'air_temp_4': 'air_temp_4',
'air_temp_5': 'air_temp_5',
'wind_speed': 'wind_speed',
'wind_dir': 'wind_dir',
'rh_shield_freq': 'rh_shield_freq',
'rh': 'rel_hum',
'air_temp_6_3m': 'air_temp_6_3m',
'dewpoint': 'dewpoint',
'rtd_shield_freq': 'rtd_shied_freq',
'air_temp': 'air_temp',
'solar_flux': 'solar_flux',
'precip': 'precip',
'accum_precip': 'accum_precip',
'altimeter': 'altimeter',
}
SYMBOLS = list(SYMBOL_CONVERSIONS.values())
def main():
import argparse
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--logfn', help='Log to rotating file (Not Implemented)')
parser.add_argument('--debug', action='store_true',
help='Don\'t submit records to the database, print them to stdout')
parser.add_argument('-t', '--tail', action='store_true',
help=('Tail file forever, not returning. This will start at the end '
'of the file and insert any new data added after starting'))
parser.add_argument('--ldmp', action='store_true',
help='Treat `src` file as a station name and read records from LoggerNet LDMP server (port: 1024)')
parser.add_argument('--tables', nargs='*', default=['1'],
help="LoggerNet LDMP tables to read in")
parser.add_argument("--host", default=influxdb.DB_HOST,
help="Hostname of database connection")
parser.add_argument("--port", default=influxdb.DB_PORT,
help="Port of database connection")
parser.add_argument("--dbname", default=influxdb.DB_NAME,
help="Name of database to modify")
parser.add_argument('-s', '--station', dest='station', default='AOSS Tower', choices=STATIONS.keys(),
help='Name of station to use to determine symbols')
parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
parser.add_argument('--sleep-interval', type=float,
help="Seconds to wait between submitting each record")
# parser.add_argument('--weather-underground', action='store_true',
# help="Send new records to wunderground.com")
# parser.add_argument('--wu-id', default='KWIMADIS52',
# help='Weather underground station ID')
# parser.add_argument('--wu-password-file', default='/home/metobs/wunderground_password.txt',
# help='File containing the password for the weather underground upload')
parser.add_argument('src', help='Level 0 raw data file')
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
LOG.info("source: %s", args.src)
LOG.info("tail: %s", args.tail)
station_tags = STATIONS[args.station]
symbols = SYMBOL_CONVERSIONS
# wu_pw = None
# if args.weather_underground:
# wu_pw = open(args.wu_password_file, 'r').read().strip()
if args.ldmp:
from aosstower.level_00.parser import ldmp_generator
record_gen = ldmp_generator(args.src, args.tables)
else:
src = open(args.src, "r")
record_gen = read_frames(src, tail=args.tail)
for idx, record in enumerate(record_gen):
record = {symbols[k] or k: v for k, v in record.items() if k in symbols}
if args.debug:
print(idx, record)
else:
lines = influxdb.frame_records(record, **station_tags)
resp = requests.post(
'http://{host}:{port:d}/write?db={dbname}'.format(host=args.host, port=args.port, dbname=args.dbname),
data='\n'.join(lines))
resp.raise_for_status()
if wu_pw and args.ldmp:
stamp = record['timestamp'].isoformat('+')
record_parameters = "dateutc={stamp}6&winddir={winddir:d}&windspeedmph={windspeed:0.1f}&windgustmph={gust}&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00".format(
stamp=stamp,
winddir=int(record['wind_dir']),
)
resp = requests.post('http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?ID={wu_id}&PASSWORD={wu_pw}&action=updateraw&dateutc=2017-04-16+01:54:46&winddir=198&windspeedmph=14.7&windgustmph=21.7&humidity=64&tempf=70.5&baromin=29.742&dewptf=57.9&solarradiation=0.5&rainin=0.00&dailyrainin=0.00&softwaretype=SSEC-RIG')
if args.sleep_interval:
time.sleep(args.sleep_interval)
if __name__ == "__main__":
sys.exit(main())