diff --git a/aosstower/level_b1/nc.py b/aosstower/level_b1/nc.py index 5d89d1d841dcee5e7c2ada34378a78b1b1fc0cca..099eea9af804d203fa29aabcd168ddf839d7bfd1 100644 --- a/aosstower/level_b1/nc.py +++ b/aosstower/level_b1/nc.py @@ -1,3 +1,7 @@ +#!/usr/bin/env python +# encoding: utf8 +"""Generate AOSS Tower NetCDF4 files from Level 00 ASCII files. +""" import os import sys import logging @@ -6,299 +10,16 @@ from datetime import datetime from netCDF4 import Dataset import numpy as np import platform -from aosstower import station, schema +from aosstower import schema +from aosstower.station import station_info from aosstower.level_00 import parser -from aosstower.level_b1 import calc +from metobscommon.util import calc +from metobscommon.util.nc import (make_summary_dict, write_vars, + calculate_wind_gust, summary_over_interval, + create_variables) LOG = logging.getLogger(__name__) -STATION_NAME = "AOSS Tower" SOFTWARE_VERSION = '00' -# Knots used to do ASOS wind gust calculations -KNOTS_10 = calc.knots_to_mps(10.) -KNOTS_9 = calc.knots_to_mps(9.) -KNOTS_5 = calc.knots_to_mps(5.) -KNOTS_3 = calc.knots_to_mps(3.) -KNOTS_2 = calc.knots_to_mps(2.) -DEFAULT_FLOAT_FILL = -9999. - - -def make_summary_dict(source_dict): - """Create the '_mean','_min','_max' file structure.""" - dest_dict = {} - for key in source_dict: - if key == 'wind_dir': - dest_dict['wind_speed_max_dir'] = source_dict[key] - dest_dict['wind_speed_mean_dir'] = source_dict[key] - dest_dict['wind_speed_min_dir'] = source_dict[key] - dest_dict['peak_gust_dir'] = source_dict[key] - elif key == 'gust': - dest_dict['peak_gust'] = source_dict[key] - else: - dest_dict[key + '_max'] = source_dict[key] - dest_dict[key + '_mean'] = source_dict[key] - dest_dict[key + '_min'] = source_dict[key] - return dest_dict - - -def filter_array(arr, valid_min, valid_max, valid_delta): - """Create QC field array. - - Meanings - -------- - - Bit 0 (0b1): value is equal to missing_value - Bit 1 (0b10): value is less than the valid min - Bit 2 (0b100): value is greater than the valid max - Bit 3 (0b1000): difference between current and previous values exceeds valid_delta - - Arguments: - arr (Series): Data array this QC is for - valid_min (float): Valid minimum of the input data - valid_max (float): Valid maximum of the input data - valid_delta (float): Valid delta of one input data point to the previous point - - Returns: - Series: Bit-mask describing the input data (see above bit meanings) - """ - qcControl = np.zeros(arr.shape, dtype=np.int8) - qcControl[arr.isnull().values] |= 0b1 - if valid_min: - qcControl[(arr < valid_min).values] |= 0b10 - if valid_max: - qcControl[(arr > valid_max).values] |= 0b100 - if valid_delta: - # prepend to fix off-by-one array shape - d = np.append([0], np.diff(arr)) - qcControl[(d > valid_delta).values] |= 0b1000 - return qcControl - - -def write_dimensions(nc_file): - nc_file.createDimension('time', None) - nc_file.createDimension('max_len_station_name', 32) - - -def create_variables(nc_file, first_stamp, database, chunk_sizes=None, zlib=False): - # base_time long name - btln = 'Base time in Epoch' - - # base time units - btu = 'seconds since 1970-01-01 00:00:00' - - # base time string - bts = first_stamp.strftime('%Y-%m-%d 00:00:00Z') - - # time offset long name - to_ln = 'Time offset from base_time' - - # time offset units - to_u = 'seconds since ' + first_stamp.strftime('%Y-%m-%d 00:00:00Z') - # 'time' units - t_u = 'hours since ' + first_stamp.strftime('%Y-%m-%d 00:00:00Z') - - coordinates = { - # fields: type, dimension, fill, valid_min, std_name, longname, units, valid_max, cf_role, axis - 'time': [np.float64, ('time',), DEFAULT_FLOAT_FILL, None, None, "Hour offset from midnight", - t_u, None, None, None], - 'lon': [np.float32, tuple(), DEFAULT_FLOAT_FILL, -180., 'longitude', None, 'degrees_east', 180., None], - 'lat': [np.float32, tuple(), DEFAULT_FLOAT_FILL, -90., 'latitude', None, 'degrees_north', 90., None], - 'alt': [np.float32, tuple(), DEFAULT_FLOAT_FILL, None, 'height', 'vertical distance', 'm', None, None], - # int64 for base_time would be best, but NetCDF4 Classic does not support it - # NetCDF4 Classic mode was chosen so users can use MFDatasets (multi-file datasets) - 'base_time': [np.int32, tuple(), DEFAULT_FLOAT_FILL, None, 'time', btln, btu, None, None], - 'time_offset': [np.float64, ('time',), DEFAULT_FLOAT_FILL, None, 'time', to_ln, to_u, None, None], - 'station_name': ['c', ('max_len_station_name',), '\0', None, None, 'station name', None, None, 'timeseries_id'], - } - - for key, attr in coordinates.items(): - if attr[1] == ('max_len_station_name',) and chunk_sizes and chunk_sizes[0] > 32: - cs = [32] - else: - cs = chunk_sizes - - variable = nc_file.createVariable(key, attr[0], - dimensions=attr[1], - fill_value=attr[2], - zlib=zlib, - chunksizes=cs) - - # create var attributes - if key == 'alt': - variable.positive = 'up' - variable.axis = 'Z' - - if attr[3]: - variable.valid_min = attr[3] - variable.valid_max = attr[7] - - if attr[4]: - variable.standard_name = attr[4] - - if attr[5]: - variable.long_name = attr[5] - - if attr[6]: - variable.units = attr[6] - - if attr[8]: - variable.cf_role = attr[8] - - if key == 'base_time': - variable.string = bts - variable.ancillary_variables = 'time_offset' - elif key == 'time_offset': - variable.ancillary_variables = 'base_time' - - # CF default - if 'time' in key: - variable.calendar = 'gregorian' - - for entry in sorted(database.keys()): - if entry == 'stamp': - continue - - varTup = database[entry] - variable = nc_file.createVariable(entry, np.float32, - dimensions=('time',), fill_value=DEFAULT_FLOAT_FILL, zlib=zlib, - chunksizes=chunk_sizes) - - variable.standard_name = varTup[1] - variable.description = varTup[3] - variable.units = varTup[4] - - if varTup[5] != '': - variable.valid_min = float(varTup[5]) - variable.valid_max = float(varTup[6]) - - qcVariable = nc_file.createVariable('qc_' + entry, 'b', - dimensions=('time',), zlib=zlib, chunksizes=chunk_sizes) - - qcVariable.long_name = 'data quality' - qcVariable.valid_range = np.byte(0b1), np.byte(0b1111) - qcVariable.flag_masks = np.byte(0b1), np.byte(0b10), np.byte(0b100), np.byte(0b1000) - - flagMeaning = ['value is equal to missing_value', - 'value is less than the valid min', - 'value is greater than the valid max', - 'difference between current and previous values exceeds valid_delta.'] - - qcVariable.flag_meaning = ', '.join(flagMeaning) - - -def calculate_wind_gust(wind_speed_5s, wind_speed_2m): - """Calculate reportable wind gust from wind speed averages. - - Determination of wind gusts follows the Autmated Surface Observing System - (ASOS) User's Guide created by NOAA. The guide can be found here: - - http://www.nws.noaa.gov/asos/pdfs/aum-toc.pdf - - Note: This operates on wind data in meters per second even though the - ASOS User's Guide operates on knots. - - Arguments: - wind_speed_5s (Series): 5-second average of instantaneous wind speed magnitudes in m/s - wind_speed_2m (Series): 2-minute rolling average of the 5 second wind speed averages in m/s - - Returns: - Series: 5-second rolling wind speed gusts meeting the ASOS criteria - for being a reportable wind gust. - - """ - # 1 minute rolling peaks - wind_peak_1m = wind_speed_5s.rolling(window='1T', center=False).max() - # criteria for a fast wind to be considered a wind gust - gust_mask = (wind_speed_2m >= KNOTS_9) & \ - (wind_peak_1m >= wind_speed_2m + KNOTS_5) - gusts = wind_peak_1m.mask(~gust_mask) - - # determine highest gust in the last 10 minutes - # 5 seconds * 120 = 10 minutes - max_10m_gusts = gusts.rolling(window='10T', center=False).max() - # Minimum 5-second average in the past 10 minutes - min_10m_5avg = wind_speed_5s.rolling(window='10T', center=False).min() - # criteria for a wind gust to be reportable - reportable_mask = (max_10m_gusts >= wind_speed_2m + KNOTS_3) & \ - (wind_speed_2m > KNOTS_2) & \ - (max_10m_gusts >= min_10m_5avg + KNOTS_10) - # Reportable gusts at 5 second resolution - reportable_gusts = max_10m_gusts.mask(~reportable_mask) - return reportable_gusts - - -def minute_averages(frame): - """Average the data frame at a 1 minute interval. - - Wind direction and wind speed must be handled in a special manner - """ - # Add wind direction components so we can average wind direction properly - frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) - # round up each 1 minute group so data at time T is the average of data - # from T - 1 (exclusive) to T (inclusive). - new_frame = frame.resample('1T', closed='right', loffset='1T').mean() - - # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes) - winds_frame_5s = frame[['wind_speed', 'wind_east', 'wind_north']] - winds_frame_5s = winds_frame_5s.resample('5S', closed='right', loffset='5S').mean() - winds_frame_2m = winds_frame_5s.rolling(24, win_type='boxcar').mean() - - # rolling average is used for 1 minute output - new_frame.update(winds_frame_2m.ix[new_frame.index]) # adds wind_speed, wind_east/north - new_frame['wind_dir'] = calc.wind_vector_degrees(new_frame['wind_east'], new_frame['wind_north']) - del new_frame['wind_east'] - del new_frame['wind_north'] - - gust = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed']) - # "average" the gusts to minute resolution to match the rest of the data - new_frame['gust'] = gust.resample('1T', closed='right', loffset='1T').max() - - return new_frame.fillna(np.nan) - - -def summary_over_interval(frame, interval_width): - """takes a frame and an interval to average it over, and returns a minimum, - maximum, and average dataframe for that interval - """ - # round each timestamp to the nearest minute - # the value at time X is for the data X - interval_width minutes - exclude = ['gust', 'wind_east', 'wind_north'] - include = [c for c in frame.columns if c not in exclude] - gb = frame[include].resample(interval_width, closed='left') - - low = gb.min() - low.rename(columns=lambda x: x + "_min", inplace=True) - high = gb.max() - high.rename(columns=lambda x: x + "_max", inplace=True) - mean = gb.mean() - mean.rename(columns=lambda x: x + "_mean", inplace=True) - - out_frames = pd.concat((low, high, mean), axis=1) - - # wind fields need to be handled specially - ws_min_idx = frame['wind_speed'].resample(interval_width, closed='left').apply(lambda arr_like: arr_like.argmin()) - ws_max_idx = frame['wind_speed'].resample(interval_width, closed='left').apply(lambda arr_like: arr_like.argmax()) - # probably redundant but need to make sure the direction indexes are - # the same as those used in the wind speed values - # must use .values so we don't take data at out_frames index, but rather - # fill in the out_frames index values with the min/max values - out_frames['wind_speed_min'] = frame['wind_speed'][ws_min_idx].values - out_frames['wind_speed_max'] = frame['wind_speed'][ws_max_idx].values - out_frames['wind_speed_min_dir'] = calc.wind_vector_degrees(frame['wind_east'][ws_min_idx], frame['wind_north'][ws_min_idx]).values - out_frames['wind_speed_max_dir'] = calc.wind_vector_degrees(frame['wind_east'][ws_max_idx], frame['wind_north'][ws_max_idx]).values - we = frame['wind_east'].resample(interval_width, closed='left').mean() - wn = frame['wind_north'].resample(interval_width, closed='left').mean() - out_frames['wind_speed_mean_dir'] = calc.wind_vector_degrees(we, wn).values - - gust_idx = frame['gust'].resample(interval_width, closed='left').apply(lambda arr_like: arr_like.argmax()) - # gusts may be NaN so this argmax will be NaN indexes which don't work great - gust_idx = gust_idx.astype('datetime64[ns]', copy=False) - peak_gust = frame['gust'][gust_idx] - out_frames['peak_gust'] = peak_gust.values - we = frame['wind_east'][gust_idx] - wn = frame['wind_north'][gust_idx] - out_frames['peak_gust_dir'] = calc.wind_vector_degrees(we, wn).values - - return out_frames def _get_data(input_files): @@ -322,59 +43,10 @@ def get_data(input_files): return frame -def write_vars(nc_file, frame, database): - # all time points from epoch - time_epoch = (frame.index.astype(np.int64) // 10**9).values - # midnight of data's first day - base_epoch = frame.index[0].replace(hour=0, minute=0, second=0, microsecond=0).value // 10**9 - - fileVar = nc_file.variables - # base_time is midnight of the current day - fileVar['base_time'].assignValue(base_epoch) - fileVar['time_offset'][:] = time_epoch - base_epoch - # hours since midnight - fileVar['time'][:] = (time_epoch - base_epoch) / (60 * 60) - - # write coordinate var values to file - # alt might not be right, need to verify - fileVar['lon'].assignValue(station.LONGITUDE) - fileVar['lat'].assignValue(station.LATITUDE) - fileVar['alt'].assignValue(station.ELEVATION) - - # write station name to file - fileVar['station_name'][:len(STATION_NAME)] = STATION_NAME - - # writes data into file - for varName in frame: - if varName not in fileVar: - LOG.debug('Unused input variable: %s', varName) - continue - fileVar[varName][:] = frame[varName].fillna(DEFAULT_FLOAT_FILL).values - - valid_min = database[varName][5] - valid_max = database[varName][6] - valid_delta = database[varName][7] - - fileVar['qc_' + varName][:] = filter_array(frame[varName], - float(valid_min) if valid_min else valid_min, - float(valid_max) if valid_max else valid_max, - float(valid_delta) if valid_delta else valid_delta) - - coordinates = ['lon', 'lat', 'alt', 'base_time', 'time_offset', 'station_name', 'time'] - - for varName in fileVar: - if not varName.startswith('qc_') and \ - varName not in frame and \ - varName not in coordinates: - # if a variable should be in file (database), but isn't in the - # input data then make sure the QC field lists it as all fills - fileVar['qc_' + varName][:] |= 0b1 - - def write_global_attributes(nc_file, input_sources, interval=None, datastream=None): # create global attributes nc_file.source = 'surface observation' - nc_file.conventions = 'ARM-1.2 CF-1.6' + nc_file.Conventions = 'ARM-1.2 CF-1.6' nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)' nc_file.featureType = 'timeSeries' nc_file.data_level = 'b1' @@ -402,7 +74,7 @@ def write_global_attributes(nc_file, input_sources, interval=None, datastream=No def create_giant_netcdf(input_files, output_fn, zlib, chunk_size, start=None, end=None, interval_width=None, summary=False, - database=schema.database, datastream=None): + database=schema.database_dict, datastream=None): frame = get_data(input_files) if frame.empty: raise ValueError("No data found from input files: {}".format(", ".join(input_files))) @@ -410,8 +82,10 @@ def create_giant_netcdf(input_files, output_fn, zlib, chunk_size, # Add wind direction components so we can average wind direction properly frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) - if 'air_temp' in frame and 'rh' in frame and 'dewpoint' in database: - LOG.info("'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity") + if 'air_temp' in frame and 'rh' in frame and \ + ('dewpoint' in database or 'dewpoint_mean' in database): + LOG.info("'dewpoint' is missing from the input file, will calculate " + "it from air temp and relative humidity") frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) # round up each 1 minute group so data at time T is the average of data @@ -448,9 +122,10 @@ def create_giant_netcdf(input_files, output_fn, zlib, chunk_size, # NETCDF4_CLASSIC was chosen so that MFDataset reading would work. See: # http://unidata.github.io/netcdf4-python/#netCDF4.MFDataset nc_file = Dataset(output_fn, 'w', format='NETCDF4_CLASSIC') - write_dimensions(nc_file) + nc_file.createDimension('time', None) + nc_file.createDimension('max_len_station_name', 32) create_variables(nc_file, first_stamp, database, chunk_sizes, zlib) - write_vars(nc_file, frame, database) + write_vars(nc_file, frame, database, station_info) write_global_attributes(nc_file, [os.path.basename(x) for x in input_files], interval=interval_width, @@ -519,7 +194,7 @@ each input file is mapped to the corresponding output file. elif not args.start_time and args.end_time: raise ValueError('start time must be specified when end time is specified') - mini_database = {k: schema.database[k] for k in args.fields} + mini_database = {k: schema.database_dict[k] for k in args.fields} if args.summary: mini_database = make_summary_dict(mini_database) diff --git a/aosstower/schema.py b/aosstower/schema.py index b8f38a688faa79a180d2dcfdccb4d936ed6faf1f..18bffa23300d79cde6e1a13d1ffe871222b18ae0 100644 --- a/aosstower/schema.py +++ b/aosstower/schema.py @@ -148,7 +148,7 @@ database = dict( ), wind_dir=Var( float32, - 'wind_direction', + 'wind_from_direction', 'wind_dir', 'Wind direction', 'degrees', @@ -188,7 +188,7 @@ database = dict( ), dewpoint=Var( float32, - 'dewpoint_temperature', + 'dew_point_temperature', 'dewpoint', 'Calculated dewpoint temperature', 'degC', @@ -268,6 +268,8 @@ database = dict( ) ) +database_dict = {k: v._asdict() for k, v in database.items()} + met_vars = {'air_temp', 'dewpoint', 'rh', 'solar_flux', 'pressure', 'precip', 'accum_precip', 'wind_speed', 'wind_dir', 'gust'} engr_vars = set(database.keys()) - met_vars diff --git a/aosstower/station.py b/aosstower/station.py index 88b301fe9926d94acea51680220c595ea4ecb1b2..62630ff69638987ac958574cdd9a3ca588d44a15 100644 --- a/aosstower/station.py +++ b/aosstower/station.py @@ -15,3 +15,13 @@ ID = 1 # Buoy 43.098386, -89.405281 LATITUDE = 43.070786 LONGITUDE = -89.406939 + +station_info = { + 'site': 'mendota', + 'inst': 'buoy', + 'long_name': 'AOSS Tower', + 'short_name': 'aoss.tower', + 'alt': ELEVATION, + 'lon': LONGITUDE, + 'lat': LATITUDE, +} \ No newline at end of file