diff --git a/aosstower/level_00/influxdb.py b/aosstower/level_00/influxdb.py index 4dee9a4fd4b5fb50389517b0538a4862fc97922d..61c59ed9458fb92105799dab8f9a472555fbad2e 100644 --- a/aosstower/level_00/influxdb.py +++ b/aosstower/level_00/influxdb.py @@ -92,9 +92,8 @@ class Updater(object): return self._calculate_averages() def _calculate_averages(self): - index = self.data.pop('timestamp') - frame = pd.DataFrame(self.data, index=index) - frame.mask(frame == -99999., inplace=True) + frame = pd.DataFrame(self.data).set_index('timestamp') + frame = frame.mask(frame == -99999.) # Add wind direction components so we can average wind direction properly. frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) @@ -108,22 +107,21 @@ class Updater(object): # https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases # 2 minute rolling average. winds_frame_2m = frame[['wind_speed', 'wind_east', 'wind_north']].rolling('2T', closed='right').mean() - frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] - frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) # Makes 2 minute averages nans if given less than 2 minutes of data. if len(frame[frame.index > frame.index[-1] - timedelta(minutes=2)]) < 120 / self.data_interval: - frame['wind_speed_2m'].mask(frame['wind_speed_2m'] > -1., inplace=True) - frame['wind_dir_2m'].mask(frame['wind_dir_2m'] > -1., inplace=True) + winds_frame_2m = winds_frame_2m.mask(winds_frame_2m['wind_speed'] > -1) + frame['wind_speed_2m'] = winds_frame_2m['wind_speed'] + frame['wind_dir_2m'] = calc.wind_vector_degrees(winds_frame_2m['wind_east'], winds_frame_2m['wind_north']) # 1 minute rolling peaks wind_peak_1m = frame['wind_speed'].rolling(window='1T', closed='right').max() - # criteria for a fast wind to be considered a wind gust + # criteria for a fast wind to be considered a wind gust. Note that it needs winds_frame_2m['wind_speed']. gust_mask = (winds_frame_2m['wind_speed'] >= calc.knots_to_mps(9.)) &\ (wind_peak_1m >= winds_frame_2m['wind_speed'] + calc.knots_to_mps(5.)) frame['gust_1m'] = wind_peak_1m.mask(~gust_mask) frame['gust_10m'] = calculate_wind_gust(frame['wind_speed'], winds_frame_2m['wind_speed']) # Makes 10 minute gusts before 12 minutes nans because data is insufficient. if len(frame) < 720 / self.data_interval: - frame['gust_10m'].mask(frame['gust_10m'] > -1., inplace=True) + frame['gust_10m'] = frame['gust_10m'].mask(frame['gust_10m'] > -1.) return frame.fillna(value=np.nan) @@ -133,12 +131,11 @@ def convert_to_influx_frame(record_gen, symbols, debug=False): record = {symbols[k] or k: v for k, v in record.items() if k in symbols} if debug: print(idx, record) - continue yield record def construct_url(data): - # Sends data as empty string to show that recording worked, but that the data is nonexistent. + # Sends null data as empty string to show that recording worked, but that the data is nonexistent. for key, val in data.items(): if val is None or isinstance(val, float) and np.isnan(val): data[key] = '' @@ -237,14 +234,17 @@ def main(): influx_gen = influxdb.grouper(influx_gen, args.bulk) updater = Updater() for record in influx_gen: - lines = influxdb.frame_records(record, **station_tags) - influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) + if not args.debug: + lines = influxdb.frame_records(record, **station_tags) + influxdb.insert(lines, host=args.host, port=args.port, dbname=args.dbname) # Record is in a list of size 1, but want just the record. avg = updater.rolling_average(record[0]) # Once every 5 minutes: 0 through 295 seconds inclusive in 5 second intervals. - if avg is not None: + if avg is not None and (args.debug or wu_pw and args.ldmp): url = construct_url(get_url_data(avg, args.wu_id, wu_pw)) - if wu_pw and args.ldmp: + if args.debug: + print(url) + else: resp = requests.post(url) if resp.status_code != 200: warnings.warn('Data failed to upload to {0} with status code {1}: {2}'.format( diff --git a/aosstower/level_b1/nc.py b/aosstower/level_b1/nc.py index c85f9a76b1a9625a0cafbff07b5f74794da5a51e..84e36b81dcb39f6fc3665f7dc33aff879019955e 100644 --- a/aosstower/level_b1/nc.py +++ b/aosstower/level_b1/nc.py @@ -37,9 +37,7 @@ def _get_data(input_files): def get_data(input_files): frame = pd.DataFrame(_get_data(input_files)) - frame.set_index('stamp', inplace=True) - frame.mask(frame == -99999., inplace=True) - frame.fillna(value=np.nan, inplace=True) + frame = frame.set_index('stamp').mask(frame == -99999.).fillna(value=np.nan) return frame @@ -108,7 +106,7 @@ def create_giant_netcdf(input_files, output_fn, zlib, chunk_size, frame = new_frame.resample(interval_width, closed='right', loffset=interval_width).mean() frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north']) frame['gust'] = new_frame['gust'].resample(interval_width, closed='right', loffset=interval_width).max() - frame.fillna(np.nan, inplace=True) + frame = frame.fillna(np.nan) if start and end: frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')] diff --git a/aosstower/tests/level_00/test_influxdb.py b/aosstower/tests/level_00/test_influxdb.py new file mode 100755 index 0000000000000000000000000000000000000000..7292c6b82db8a31aa5eaa50c8219ef621fa48665 --- /dev/null +++ b/aosstower/tests/level_00/test_influxdb.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +import unittest +import numpy as np +import pandas as pd +import datetime +from aosstower.level_00.influxdb import Updater, construct_url, get_url_data + + +class TestCase: + def __init__(self, input, expected_avg, expected_url): + self.input = input + self.expected_avg = expected_avg + self.expected_url = expected_url + + +def create_data(size, data_interval=datetime.timedelta(seconds=5)): + return [{'wind_speed': i % 119, 'wind_dir': 0, 'box_pressure': i, 'paro_air_temp_period': i, + 'paro_pressure_period': i, 'paro_air_temp': i, 'pressure': i, 'paro_cal_sig': i, + 'box_air_temp': i, 'air_temp_2': i, 'air_temp_3': i, 'air_temp_4': i, 'rh_shield_freq': i, 'rel_hum': i, + 'air_temp_6_3m': i, 'dewpoint': i, 'rtd_shied_freq': i, 'air_temp': i, 'solar_flux': i, 'precip': i, + 'accum_precip': -99999, 'altimeter': i, + 'timestamp': datetime.datetime(2019, 1, 1, 0, 3, 33) + data_interval * i} for i in range(size)] + + +def _isnan(x): + return isinstance(x, float) and np.isnan(x) + + +class TestInfluxdb(unittest.TestCase): + def setUp(self): + self.updater = Updater() + self.test_data = TestCase(create_data(209), [ + {'wind_speed': 17, 'wind_dir': 0, 'box_pressure': 17, 'paro_air_temp_period': 17, + 'paro_pressure_period': 17, 'paro_air_temp': 17, 'pressure': 17, 'paro_cal_sig': 17, + 'box_air_temp': 17, 'air_temp_2': 17, 'air_temp_3': 17, 'air_temp_4': 17, 'rh_shield_freq': 17, + 'rel_hum': 17, 'air_temp_6_3m': 17, 'dewpoint': 17, 'rtd_shied_freq': 17, 'air_temp': 17, + 'solar_flux': 17, 'precip': 17, 'accum_precip': np.nan, 'altimeter': 17, 'wind_east': 0, + 'wind_north': 17, 'wind_speed_2m': np.nan, 'wind_dir_2m': np.nan, 'gust_1m': np.nan, 'gust_10m': np.nan, + 'index': pd.Timestamp('2019-01-01 00:04:58')}, + {'wind_speed': 77, 'wind_dir': 0, 'box_pressure': 77, 'paro_air_temp_period': 77, + 'paro_pressure_period': 77, 'paro_air_temp': 77, 'pressure': 77, 'paro_cal_sig': 77, + 'box_air_temp': 77, 'air_temp_2': 77, 'air_temp_3': 77, 'air_temp_4': 77, 'rh_shield_freq': 77, + 'rel_hum': 77, 'air_temp_6_3m': 77, 'dewpoint': 77, 'rtd_shied_freq': 77, 'air_temp': 77, + 'solar_flux': 77, 'precip': 77, 'accum_precip': np.nan, 'altimeter': 77, 'wind_east': 0, + 'wind_north': 77, 'wind_speed_2m': 65.5, 'wind_dir_2m': 0, 'gust_1m': 77, 'gust_10m': np.nan, + 'index': pd.Timestamp('2019-01-01 00:09:58')}, + {'wind_speed': 18, 'wind_dir': 0, 'box_pressure': 137, 'paro_air_temp_period': 137, + 'paro_pressure_period': 137, 'paro_air_temp': 137, 'pressure': 137, 'paro_cal_sig': 137, + 'box_air_temp': 137, 'air_temp_2': 137, 'air_temp_3': 137, 'air_temp_4': 137, + 'rh_shield_freq': 137, 'rel_hum': 137, 'air_temp_6_3m': 137, 'dewpoint': 137, + 'rtd_shied_freq': 137, 'air_temp': 137, 'solar_flux': 137, 'precip': 137, 'accum_precip': np.nan, + 'altimeter': 137, 'wind_east': 0, 'wind_north': 18, 'wind_speed_2m': 31.291666666666668, + 'wind_dir_2m': 0, 'gust_1m': np.nan, 'gust_10m': np.nan, 'index': pd.Timestamp('2019-01-01 00:14:58')}, + {'wind_speed': 78, 'wind_dir': 0, 'box_pressure': 197, 'paro_air_temp_period': 197, + 'paro_pressure_period': 197, 'paro_air_temp': 197, 'pressure': 197, 'paro_cal_sig': 197, + 'box_air_temp': 197, 'air_temp_2': 197, 'air_temp_3': 197, 'air_temp_4': 197, + 'rh_shield_freq': 197, 'rel_hum': 197, 'air_temp_6_3m': 197, 'dewpoint': 197, + 'rtd_shied_freq': 197, 'air_temp': 197, 'solar_flux': 197, 'precip': 197, 'accum_precip': np.nan, + 'altimeter': 197, 'wind_east': 0, 'wind_north': 78, 'wind_speed_2m': 66.5, 'wind_dir_2m': 0, + 'gust_1m': 78, 'gust_10m': 118, 'index': pd.Timestamp('2019-01-01 00:19:58')}], + ['http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + 'ID=&PASSWORD=&dateutc=2019-01-01+00%3A04%3A58&winddir=0.0&winddir_avg2m=' + '&windspeedmph=38.02798&windspdmph_avg2m=&windgustmph=&windgustmph_10m=' + '&humidity=17.0&tempf=62.6&baromin=0.5020096628001095&dewptf=62.6' + '&solarradiation=17.0&rainin=17.0&dailyrainin=&softwaretype=SSEC-RIG' + '&action=updateraw', + 'http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + 'ID=&PASSWORD=&dateutc=2019-01-01+00%3A09%3A58&winddir=0.0&winddir_avg2m=0.0' + '&windspeedmph=172.24438&windspdmph_avg2m=146.51957000000002&windgustmph=172.24438' + '&windgustmph_10m=&humidity=77.0&tempf=170.6&baromin=2.2738084726828487' + '&dewptf=170.6&solarradiation=77.0&rainin=77.0&dailyrainin=&softwaretype=SSEC-RIG' + '&action=updateraw', + 'http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + 'ID=&PASSWORD=&dateutc=2019-01-01+00%3A14%3A58&winddir=0.0&winddir_avg2m=0.0' + '&windspeedmph=40.264920000000004&windspdmph_avg2m=69.99758083333334&windgustmph=' + '&windgustmph_10m=&humidity=137.0&tempf=278.6&baromin=4.045607282565588' + '&dewptf=278.6&solarradiation=137.0&rainin=137.0&dailyrainin=&softwaretype=SSEC-RIG' + '&action=updateraw', + 'http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php?' + 'ID=&PASSWORD=&dateutc=2019-01-01+00%3A19%3A58&winddir=0.0&winddir_avg2m=0.0' + '&windspeedmph=174.48132&windspdmph_avg2m=148.75651000000002&windgustmph=174.48132' + '&windgustmph_10m=263.95892000000003&humidity=197.0&tempf=386.6' + '&baromin=5.817406092448327&dewptf=386.6&solarradiation=197.0&rainin=197.0' + '&dailyrainin=&softwaretype=SSEC-RIG&action=updateraw']) + + def test_updater(self): + output = [] + for record in self.test_data.input: + avg = self.updater.rolling_average(record) + if avg is not None: + output.append({key: avg[key][-1] for key in avg}) + output[-1]['index'] = avg.index[-1] + self.assertGreaterEqual(len(self.test_data.expected_avg), len(output)) + self.assertEqual(len(self.test_data.expected_avg[len(output) - 1]), len(output[-1])) + for key in output[-1]: + if not (_isnan(output[-1][key]) and _isnan(self.test_data.expected_avg[len(output) - 1][key])): + self.assertEqual(self.test_data.expected_avg[len(output) - 1][key], output[-1][key]) + self.assertEqual(len(self.test_data.expected_avg), len(output)) + + def test_construct_url(self): + output = [] + for record in self.test_data.input: + avg = self.updater.rolling_average(record) + if avg is not None: + output.append(construct_url(get_url_data(avg, '', ''))) + self.assertGreaterEqual(len(self.test_data.expected_url), len(output)) + self.assertEqual(self.test_data.expected_url[len(output) - 1], output[-1]) + self.assertEqual(len(self.test_data.expected_url), len(output)) + + +def suite(): + """The test suite for influxdb.""" + loader = unittest.TestLoader() + mysuite = unittest.TestSuite() + mysuite.addTest(loader.loadTestsFromTestCase(TestInfluxdb)) + return mysuite + + +if __name__ == "__main__": + unittest.main()