From 2daf7d4212d07aaa985fb704855fc3a245d3abbe Mon Sep 17 00:00:00 2001 From: davidh-ssec <david.hoese@ssec.wisc.edu> Date: Tue, 28 Mar 2017 14:09:19 -0500 Subject: [PATCH] Huge rewrite of the netcdf level_b1 writer Properly calculate wind gust Properly assign QC flags Tons of style changes and better performant array operations --- aosstower/level_00/parser.py | 8 + aosstower/level_b1/calc.py | 12 +- aosstower/level_b1/nc.py | 527 ++++++++++++++++------------------- aosstower/schema.py | 90 ++++-- 4 files changed, 322 insertions(+), 315 deletions(-) diff --git a/aosstower/level_00/parser.py b/aosstower/level_00/parser.py index f55f03e..4dc22a1 100644 --- a/aosstower/level_00/parser.py +++ b/aosstower/level_00/parser.py @@ -15,6 +15,8 @@ The original data format was a key value, space separated data format PAROSCI, WSPD05305, TEMP107_3, CS10162, RAIN380M, TEMP107_2, TEMP41372, WDIR05305`. +XXX: Fill value in version 0 seems to be -99999. + Version 1 --------- Effective 2010-06-01T00:27:51Z to 2012-12-03T17:34:17Z. @@ -25,12 +27,16 @@ pressure, paro_cal_sig, box_rh, box_air_temp, air_temp_2, air_temp_3, air_temp_4, wind_speed, wind_dir, rh_shield_freq, rh, air_temp_6_3m, dewpoint, rtd_shield_freq, air_temp, solar_flux, precip, accum_precip, altimeter. +XXX: Fill value in version 1 seems to be -99999. + Version 2 --------- Effective 2012-12-03T17:34:17Z to present. Same as Version 1 with the addition of altimeter2 at the end. I'm not sure why we have 2 altimeter values but as far as I know altimeter2 is not used. + +XXX: Fill value in version 2 seems to be -99999. """ import re @@ -78,6 +84,7 @@ def _make_frame(data): class ParserV0(object): """Parses Version 0 data lines. """ + fill_value = -99999. # maps v0 names to names in schema db names = {'ACCURAIN': 'accum_precip', @@ -124,6 +131,7 @@ class ParserV0(object): class ParserV1V2(object): """Parses Version 1 & 2 data lines. """ + fill_value = -99999. names = ['station_id', 'year', 'doy', 'hhmm', 'sec', 'box_pressure', 'paro_air_temp_period', 'paro_pressure_period', 'paro_air_temp', diff --git a/aosstower/level_b1/calc.py b/aosstower/level_b1/calc.py index 322c037..1eba391 100644 --- a/aosstower/level_b1/calc.py +++ b/aosstower/level_b1/calc.py @@ -2,10 +2,20 @@ import math import numpy as np +try: + from pandas import Series +except ImportError: + # expected to use for isinstance + Series = np.ndarray + NaN = float('nan') is_nan = lambda a: a != a +def knots_to_mps(knots): + return knots * 0.51444 + + def dewpoint(tempC, relhum): """ Algorithm from Tom Whittaker tempC is the temperature in degrees Celsius, @@ -130,7 +140,7 @@ def wind_vector_degrees(vector_east, vector_north): """ rads = np.arctan2(vector_east, vector_north) winddir = np.rad2deg(rads) - if isinstance(winddir, np.ndarray): + if isinstance(winddir, (np.ndarray, Series)): winddir[np.less(winddir, 0)] += 360 elif winddir < 0: winddir += 360 diff --git a/aosstower/level_b1/nc.py b/aosstower/level_b1/nc.py index e0621d7..2f3ba60 100644 --- a/aosstower/level_b1/nc.py +++ b/aosstower/level_b1/nc.py @@ -3,15 +3,23 @@ import sys import logging import pandas as pd from datetime import datetime as dt -from aosstower.level_00 import parser from netCDF4 import Dataset import numpy as np import platform -from aosstower import station +from aosstower import station, schema +from aosstower.level_00 import parser from datetime import timedelta as delta from aosstower.level_b1 import calc LOG = logging.getLogger(__name__) +STATION_NAME = "AOSS Tower" +SOFTWARE_VERSION = '00' +# Knots used to do ASOS wind gust calculations +KNOTS_10 = calc.knots_to_mps(10.) +KNOTS_9 = calc.knots_to_mps(9.) +KNOTS_5 = calc.knots_to_mps(5.) +KNOTS_3 = calc.knots_to_mps(3.) +KNOTS_2 = calc.knots_to_mps(2.) def make_mean_dict(source_dict): @@ -24,36 +32,48 @@ def make_mean_dict(source_dict): return dest_dict -mean_database = make_mean_dict(parser.database) - - -def filter_array(array, valid_min, valid_max): - qcControl = [] +MEAN_DATABASE = make_mean_dict(schema.database) - for value in array: - if value == float(-99999): - qcControl.append(np.byte(0b1)) - elif valid_min != '' and value < float(valid_min): - qcControl.append(np.byte(0b10)) +def filter_array(arr, valid_min, valid_max, valid_delta): + """Create QC field array. - elif valid_max != '' and value > float(valid_max): - qcControl.append(np.byte(0b100)) + Meanings + -------- - else: - qcControl.append(np.byte(0b0)) + Bit 0 (0b1): value is equal to missing_value + Bit 1 (0b10): value is less than the valid min + Bit 2 (0b100): value is greater than the valid max + Bit 3 (0b1000): difference between current and previous values exceeds valid_delta - return np.array(qcControl) + Arguments: + arr (Series): Data array this QC is for + valid_min (float): Valid minimum of the input data + valid_max (float): Valid maximum of the input data + valid_delta (float): Valid delta of one input data point to the previous point + Returns: + Series: Bit-mask describing the input data (see above bit meanings) + """ + qcControl = np.zeros(arr.shape, dtype=np.int8) + qcControl[arr.isnull().values] |= 0b1 + if valid_min: + qcControl[(arr < valid_min).values] |= 0b10 + if valid_max: + qcControl[(arr > valid_max).values] |= 0b100 + if valid_delta: + # prepend to fix off-by-one array shape + d = np.append([0], np.diff(arr)) + qcControl[(d > valid_delta).values] |= 0b1000 + return qcControl -def write_dimensions(ncFile): - ncFile.createDimension('time', None) - ncFile.createDimension('max_len_station_name', 32) - return ncFile +def write_dimensions(nc_file): + nc_file.createDimension('time', None) + nc_file.createDimension('max_len_station_name', 32) -def create_variables(ncFile, firstStamp, chunksizes, zlib, database=parser.database): +def create_variables(nc_file, first_stamp, database, chunk_sizes=None, zlib=False): # base_time long name btln = 'base time as unix timestamp' @@ -61,40 +81,37 @@ def create_variables(ncFile, firstStamp, chunksizes, zlib, database=parser.datab btu = 'seconds since 1970-01-01 00:00:00' # base time string - bts = firstStamp.strftime('%Y-%m-%d 00:00:00Z') + bts = first_stamp.strftime('%Y-%m-%d 00:00:00Z') # time long name - tln = 'time offset from base_time' + tln = 'Time offset from base_time' # time units - tu = 'seconds since ' + firstStamp.strftime('%Y-%m-%d 00:00:00Z') + tu = 'seconds since ' + first_stamp.strftime('%Y-%m-%d 00:00:00Z') coordinates = { # fields: type, dimension, fill, valid_min, std_name, longname, units, valid_max, cf_role, axis - 'lon': [np.float32, None, float(-999), '-180L', 'longitude', None, 'degrees_east', '180L', None], - 'lat': [np.float32, None, float(-999), '-90L', 'latitude', None, 'degrees_north', '90L', None], - 'alt': [np.float32, None, float(-999), None, 'height', 'vertical distance', 'm', None, None], - 'base_time': [np.int32, None, float(-999), None, 'time', btln, btu, None, None], - 'time_offset': [np.float64, 'time', float(-999), None, 'time', tln, tu, None, None], - 'station_name': ['c', 'max_len_station_name', '-', None, None, 'station name', None, None, 'timeseries_id'], - 'time': [np.float32, 'time', float(-999), None, None, "Time offset from epoch", - "seconds since 1970-01-01 00:00:00Z", None, None, None] + 'time': [np.float64, ('time',), -999., None, None, "Time offset from epoch", + "seconds since 1970-01-01 00:00:00Z", None, None, None], + 'lon': [np.float32, tuple(), -999., -180., 'longitude', None, 'degrees_east', 180., None], + 'lat': [np.float32, tuple(), -999., -90., 'latitude', None, 'degrees_north', 90., None], + 'alt': [np.float32, tuple(), -999., None, 'height', 'vertical distance', 'm', None, None], + 'base_time': [np.int32, tuple(), -999., None, 'time', btln, btu, None, None], + 'time_offset': [np.float64, ('time',), -999., None, 'time', tln, tu, None, None], + 'station_name': ['c', ('max_len_station_name',), '\0', None, None, 'station name', None, None, 'timeseries_id'], } for key, attr in coordinates.items(): - if attr[1]: - if attr[1] == 'max_len_station_name': - if chunksizes and chunksizes[0] > 32: - variable = ncFile.createVariable(key, attr[0], dimensions=(attr[1]), fill_value=attr[2], zlib=zlib, - chunksizes=[32]) - else: - variable = ncFile.createVariable(key, attr[0], dimensions=(attr[1]), fill_value=attr[2], zlib=zlib, - chunksizes=chunksizes) - else: - variable = ncFile.createVariable(key, attr[0], dimensions=(attr[1]), fill_value=attr[2], zlib=zlib, - chunksizes=chunksizes) + if attr[1] == ('max_len_station_name',) and chunk_sizes and chunk_sizes[0] > 32: + cs = [32] else: - variable = ncFile.createVariable(key, attr[0], fill_value=attr[1], zlib=zlib, chunksizes=chunksizes) + cs = chunk_sizes + + variable = nc_file.createVariable(key, attr[0], + dimensions=attr[1], + fill_value=attr[2], + zlib=zlib, + chunksizes=cs) # create var attributes if key == 'alt': @@ -119,18 +136,22 @@ def create_variables(ncFile, firstStamp, chunksizes, zlib, database=parser.datab if key == 'base_time': variable.string = bts + variable.ancillary_variables = 'time_offset' + elif key == 'time_offset': + variable.ancillary_variables = 'base_time' - if 'time' in key: - variable.calendar = 'gregorian' + # CF default + # if 'time' in key: + # variable.calendar = 'gregorian' for entry in database: if entry == 'stamp': continue varTup = database[entry] - variable = ncFile.createVariable(entry, np.float32, - dimensions=('time'), fill_value=float(-99999), zlib=zlib, - chunksizes=chunksizes) + variable = nc_file.createVariable(entry, np.float32, + dimensions=('time',), fill_value=float(-999), zlib=zlib, + chunksizes=chunk_sizes) variable.standard_name = varTup[1] variable.description = varTup[3] @@ -140,8 +161,8 @@ def create_variables(ncFile, firstStamp, chunksizes, zlib, database=parser.datab variable.valid_min = float(varTup[5]) variable.valid_max = float(varTup[6]) - qcVariable = ncFile.createVariable('qc_' + entry, 'b', - dimensions=('time'), fill_value=0b0, zlib=zlib, chunksizes=chunksizes) + qcVariable = nc_file.createVariable('qc_' + entry, 'b', + dimensions=('time',), zlib=zlib, chunksizes=chunk_sizes) qcVariable.long_name = 'data quality' qcVariable.valid_range = np.byte(0b1), np.byte(0b1111) @@ -154,172 +175,122 @@ def create_variables(ncFile, firstStamp, chunksizes, zlib, database=parser.datab qcVariable.flag_meaning = ', '.join(flagMeaning) - # create global attributes - ncFile.source = 'surface observation' - ncFile.conventions = 'ARM-1.2 CF-1.6' - ncFile.institution = 'UW SSEC' - ncFile.featureType = 'timeSeries' - ncFile.data_level = 'b1' - # monthly files end with .month.nc - # these end with .day.nc - - ncFile.datastream = 'aoss.tower.nc-1d-1m.b1.v00' - ncFile.software_version = '00' - - # generate history - ncFile.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__) +def calculate_wind_gust(wind_speed_5s, wind_speed_2m): + """Calculate reportable wind gust from wind speed averages. - return ncFile + Determination of wind gusts follows the Autmated Surface Observing System + (ASOS) User's Guide created by NOAA. The guide can be found here: + http://www.nws.noaa.gov/asos/pdfs/aum-toc.pdf -def get_gust(rollingAvg, speeds): - averages = rollingAvg.tolist() - maxSpeed = speeds['wind_speed'].tolist() + Criteria Summary + ---------------- - gust = [] - for idx, average in enumerate(averages): - if not average: - gust.append(np.nan) - continue - elif average >= 4.63 and maxSpeed[idx] > average + 2.573: - gust.append(maxSpeed[idx]) - else: - gust.append(np.nan) - continue + Wind Gust: - return gust + 1. 2 minute average + Arguments: + wind_speed_5s (Series): 5-second average of instantaneous wind speed magnitudes in m/s + wind_speed_2m (Series): 2-minute rolling average of the 5 second wind speed averages in m/s -def get_rolling(series, minutes): - """Get the rolling mean closest to the nearest minute""" - returnSeries = series.rolling(25, win_type='boxcar').mean() - data = {} - for minute in minutes: - # doesn't go past the minute - data[minute] = returnSeries[returnSeries.index.asof(minute)] + Returns: + Series: 5-second rolling wind speed gusts meeting the ASOS criteria + for being a reportable wind gust. - return pd.Series(data) + """ + # 1 minute rolling peaks + wind_peak_1m = wind_speed_5s.rolling(window=12, center=False).max() + # criteria for a fast wind to be considered a wind gust + gust_mask = (wind_speed_2m >= KNOTS_9) & \ + (wind_peak_1m >= wind_speed_2m + KNOTS_5) + gusts = wind_peak_1m.mask(~gust_mask) + # determine highest gust in the last 10 minutes + # 5 seconds * 120 = 10 minutes + max_10m_gusts = gusts.rolling(window=120, center=False).max() + # Minimum 5-second average in the past 10 minutes + min_10m_5avg = wind_speed_5s.rolling(window=120, center=False).min() + # criteria for a wind gust to be reportable + reportable_mask = (max_10m_gusts >= wind_speed_2m + KNOTS_3) & \ + (wind_speed_2m > KNOTS_2) & \ + (max_10m_gusts >= min_10m_5avg + KNOTS_10) + # Reportable gusts at 5 second resolution + reportable_gusts = max_10m_gusts.mask(~reportable_mask) + return reportable_gusts -def get_new_wind_direction(wind_dir, wind_speed, stamps): - newWindDir = {} - for stamp in stamps: - before = stamp - delta(minutes=1) - - if before not in wind_speed.index: - newWindDir[stamp] = None - else: - speed = wind_speed[before: stamp].tolist() - dire = wind_dir[before: stamp].tolist() +def minute_averages(frame): + """Average the data frame at a 1 minute interval. - wind_dire = calc.mean_wind_vector(speed, dire)[0] + Wind direction and wind speed must be handled in a special manner + """ + # Add wind direction components so we can average wind direction properly + frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) + # round up each 1 minute group so data at time T is the average of data + # from T - 1 (exclusive) to T (inclusive). + new_frame = frame.resample('1T', closed='right', loffset='1T').mean() - newWindDir[stamp] = wind_dire + # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes) + winds_frame_5s = frame[['wind_speed', 'wind_east', 'wind_north']] + winds_frame_5s = winds_frame_5s.resample('5S', closed='right', loffset='5S').mean() + winds_frame_2m = winds_frame_5s.rolling(24, win_type='boxcar').mean() - return pd.Series(newWindDir) + # rolling average is used for 1 minute output + new_frame.update(winds_frame_2m.ix[new_frame.index]) # adds wind_speed, wind_east/north + new_frame['wind_dir'] = calc.wind_vector_degrees(new_frame['wind_east'], new_frame['wind_north']) + del new_frame['wind_east'] + del new_frame['wind_north'] + gust = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed']) + # "average" the gusts to minute resolution to match the rest of the data + new_frame['gust'] = gust.resample('1T', closed='right', loffset='1T').max() -def minute_averages(frame): - frame['minute'] = [(ts + delta(minutes=1)).replace(second=0) for ts in frame.index] - newFrame = frame.groupby('minute').mean() - newFrame.index.names = [''] - - columns = list(newFrame.columns.values) - if 'wind_speed' in columns: - del newFrame['wind_speed'] - - windSeries = frame['wind_speed'] - windSeries = get_rolling(windSeries, list(newFrame.index)) - newFrame['wind_speed'] = windSeries - rollingAvg = newFrame['wind_speed'] - maxSpeed = pd.DataFrame() - maxSpeed['minute'] = frame['minute'] - maxSpeed['speed'] = frame['wind_speed'] - maxSpeed = frame.groupby('minute').max() - gust = get_gust(rollingAvg, maxSpeed) - newFrame['gust'] = gust - - if 'wind_dir' in columns: - del newFrame['wind_dir'] - - dupFrame = frame.set_index('minute') - stamps = newFrame.index - windDirSeries = dupFrame['wind_dir'] - windSeries = dupFrame['wind_speed'] - windDirSeries = get_new_wind_direction(windDirSeries, windSeries, stamps) - newFrame['wind_dir'] = windDirSeries - - del frame['minute'] - return newFrame.fillna(-99999) + return new_frame.fillna(np.nan) def average_over_interval(frame, interval_width): """takes a frame and an interval to average it over, and returns a minimum, - maximum, and average dataframe for that interval""" - ts = frame.index - # round each timestamp to the nearest n minutes - frame['interval'] = (ts.astype(int) - ts.astype(int) % (interval_width * 60e9)).astype('datetime64[ns]') - out_frames = { - 'low': frame.groupby('interval').min(), - 'high': frame.groupby('interval').max(), - 'mean': frame.groupby('interval').mean(), - } - del frame['interval'] - for key in out_frames: - # append the appropriate suffix to each column - columns = out_frames[key].columns - out_frames[key].columns = ['_'.join([col, key]) for col in columns] - out_frames = pd.concat(out_frames.values(), axis=1) + maximum, and average dataframe for that interval + """ + # round each timestamp to the nearest minute + # the value at time X is for the data X - interval_width minutes + gb = frame.resample(interval_width, closed='right', loffset=interval_width) + + low = gb.min() + low.columns = [c + "_low" for c in low.columns] + high = gb.max() + high.columns = [c + "_high" for c in high.columns] + mean = gb.mean() + mean.columns = [c + "_mean" for c in mean.columns] + + out_frames = pd.concat((low, high, mean), axis=1) return out_frames -def get_data(inputFiles): - dictData = {} - for filename in inputFiles: - getFrames = list(parser.read_frames(filename)) - - for frame in getFrames: - if 'stamp' not in frame: - continue - - stamp = frame['stamp'] - del frame['stamp'] - - dictData[stamp] = frame - - return pd.DataFrame(dictData).transpose().replace(-99999, np.nan) - - -def write_vars(ncFile, frame, database=parser.database): - stamps = list(frame.index) - baseDTObj = dt.strptime(str(stamps[0]).split(' ')[0], '%Y-%m-%d') - - # find out how much time elapsed - # since the origin to the start of the day - # in seconds - baseTimeValue = baseDTObj - dt(1970, 1, 1) - baseTimeValue = baseTimeValue.total_seconds() - - # create time numpy - timeNumpy = np.empty(len(stamps), dtype='float64') - - counter = 0 +def get_data(input_files): + frames = [] + for filename in input_files: + frame = pd.DataFrame(parser.read_frames(filename)) + frame.set_index('stamp', inplace=True) + frame.mask(frame == -99999., inplace=True) + frame.fillna(value=np.nan, inplace=True) + frames.append(frame) - # write stamps in, yo + return pd.concat(frames, axis=1, copy=False) - for stamp in stamps: - stampObj = dt.strptime(str(stamp), '%Y-%m-%d %H:%M:%S') - timeValue = (stampObj - baseDTObj).total_seconds() - timeNumpy[counter] = timeValue - counter += 1 +def write_vars(nc_file, frame, database): + # all time points from epoch + time_epoch = (frame.index.astype(np.int64) // 10**9).values + # midnight of data's first day + base_epoch = frame.index[0].replace(hour=0, minute=0, second=0, microsecond=0).value // 10**9 - fileVar = ncFile.variables - fileVar['base_time'].assignValue(baseTimeValue) - fileVar['time_offset'][:] = timeNumpy - fileVar['time'][:] = timeNumpy + fileVar = nc_file.variables + fileVar['base_time'].assignValue(base_epoch) + fileVar['time_offset'][:] = time_epoch - base_epoch + fileVar['time'][:] = time_epoch # write coordinate var values to file # alt might not be right, need to verify @@ -327,47 +298,55 @@ def write_vars(ncFile, frame, database=parser.database): fileVar['lat'].assignValue(station.LATITUDE) fileVar['alt'].assignValue(328) - # might change - stationName = ("AOSS Tower") - - # transfer station name into array of chars - statChars = list(stationName) - statNumpy = np.asarray(statChars) - # write station name to file - fileVar['station_name'][0:len(statNumpy)] = statNumpy + fileVar['station_name'][:len(STATION_NAME)] = STATION_NAME # writes data into file for varName in frame: if varName not in fileVar: - logging.warn('Extraneous key: %s in frame' % varName) + LOG.warning('Extraneous key: %s in frame' % varName) continue - dataList = frame[varName].tolist() - - dataArray = np.asarray(dataList) - fileVar[varName][:] = dataArray + fileVar[varName][:] = frame[varName].fillna(-999.).values valid_min = database[varName][5] valid_max = database[varName][6] + valid_delta = database[varName][7] - fileVar['qc_' + varName][:] = filter_array(dataArray, valid_min, valid_max) + fileVar['qc_' + varName][:] = filter_array(frame[varName], + float(valid_min) if valid_min else valid_min, + float(valid_max) if valid_max else valid_max, + float(valid_delta) if valid_delta else valid_delta) coordinates = ['lon', 'lat', 'alt', 'base_time', 'time_offset', 'station_name', 'time'] for varName in fileVar: - if varName.startswith('qc_'): - continue + if not varName.startswith('qc_') and \ + varName not in frame and \ + varName not in coordinates: + # if a variable should be in file (database), but isn't in the + # input data then make sure the QC field lists it as all fills + fileVar['qc_' + varName][:] |= 0b1 - elif varName in frame: - continue - elif varName in coordinates: - continue +def write_global_attributes(nc_file, input_sources): + # create global attributes + nc_file.source = 'surface observation' + nc_file.conventions = 'ARM-1.2 CF-1.6' + nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)' + nc_file.featureType = 'timeSeries' + nc_file.data_level = 'b1' - else: - fileVar['qc_' + varName][:] = np.full(len(list(frame.index)), np.byte(0b1), dtype='b') + # monthly files end with .month.nc + # these end with .day.nc - return ncFile + nc_file.datastream = 'aoss.tower.nc-1d-1m.b1.v{}'.format(SOFTWARE_VERSION) + nc_file.software_version = SOFTWARE_VERSION + nc_file.command_line = " ".join(sys.argv) + + # generate history + nc_file.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__) + nc_file.input_source = input_sources[0] + nc_file.input_sources = ', '.join(input_sources) # The purpose of this method is to take a begin date, and end date @@ -378,74 +357,57 @@ def write_vars(ncFile, frame, database=parser.database): # @param input filenames - list of filenames # @param output filename - filename of the netcdf file -def create_giant_netCDF(start, end, inputFiles, outputName, zlib, chunkSize, - interval_width=None, database=parser.database): - default = False - - if (chunkSize): - chunksizes = [chunkSize] - - else: - default = True +def create_giant_netcdf(input_files, outputName, zlib, chunk_size, + start=None, end=None, interval_width=None, + database=schema.database): + frame = get_data(input_files) + if frame.empty: + raise ValueError("No data found from input files: {}".format(", ".join(input_files))) - frame = get_data(inputFiles) + frame = minute_averages(frame) + if interval_width: + frame = average_over_interval(frame, interval_width) - if (frame.empty): - return False + if start and end: + frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')] + if chunk_size and not isinstance(chunk_size, (list, tuple)): + chunk_sizes = [chunk_size] else: + chunk_sizes = [frame.shape[0]] - frame = minute_averages(frame) - if interval_width: - frame = average_over_interval(frame, interval_width) + first_stamp = dt.strptime(str(frame.index[0]), '%Y-%m-%d %H:%M:%S') + nc_file = Dataset(outputName, 'w', format='NETCDF4_CLASSIC') + write_dimensions(nc_file) + create_variables(nc_file, first_stamp, database, chunk_sizes, zlib) + write_vars(nc_file, frame, database) + write_global_attributes(nc_file, input_files) + nc_file.close() + return nc_file - if (start and end): - frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')] - if (default): - chunksizes = [len(list(frame.index))] - - firstStamp = dt.strptime(str(list(frame.index)[0]), '%Y-%m-%d %H:%M:%S') - - ncFile = Dataset(outputName, 'w', format='NETCDF4_CLASSIC') - - ncFile = write_dimensions(ncFile) - - ncFile = create_variables(ncFile, firstStamp, chunksizes, zlib, database) - - ncFile.inputFiles = ', '.join(inputFiles) - - ncFile = write_vars(ncFile, frame, database) - - ncFile.close() - - return True - - -def create_multiple(filenames, outputFilenames, zlib, chunkSize): - if outputFilenames and len(filenames) != len(outputFilenames): +def create_multiple(filenames, output_filenames, zlib, chunkSize): + if output_filenames and len(filenames) != len(output_filenames): raise ValueError( 'Number of output filenames must equal number of input filenames when start and end times are not specified') - results = [] + success = False for idx, filename in enumerate(filenames): - results.append(create_giant_netCDF(None, None, [filename], outputFilenames[idx], zlib, chunkSize)) - - if not any(results): - raise IOError('All ASCII files were empty') + try: + create_giant_netcdf([filename], output_filenames[idx], zlib, chunkSize, None, None) + success = True + except (ValueError, TypeError): + LOG.error("Could not generate NetCDF file for {}".format(filename), exc_info=1) + if not success: + raise IOError('All ASCII files were empty or could not be read') -# The purpose of this method is to take a string in the format -# YYYY-mm-ddTHH:MM:SS and convert that to a datetime object -# used in coordination with argparse -s and -e params -# @param datetime string -# @return datetime object def _dt_convert(datetime_str): - # parse datetime string, return datetime object + """Parse datetime string, return datetime object""" try: return dt.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S') - except: + except ValueError: return dt.strptime(datetime_str, '%Y-%m-%d') @@ -464,39 +426,36 @@ def main(): argparser.add_argument('-e', '--end-time', type=_dt_convert, help='End time of massive netcdf file. Formats allowed:' + "\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'") - argparser.add_argument('-i', '--interval', type=float, - help='Width of the interval to average input data over in minutes.' + - " If not specified, 1 is assumed. (Use 60 for one hour and 1440 for 1 day)") + argparser.add_argument('-i', '--interval', + help="""Width of the interval to average input data +over in Pandas offset format. If not specified, 1 minute averages are used. If +specified then '_high', '_mean', and '_low' versions of the data fields are +written to the output NetCDF. +Use '1D' for daily or '5T' for 5 minute averages. +See this page for more details: +http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""") argparser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file') argparser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib') argparser.add_argument("input_files", nargs="+", help="aoss_tower level_00 paths. Use @filename to red a list of paths from that file.") - argparser.add_argument('-o', '--output', required=True, nargs="+", help="filename pattern or filename. " + - "Should be along the lines of <filepath>/aoss_tower.YYYY-MM-DD.nc") + argparser.add_argument('-o', '--output', nargs="+", help="NetCDF filename(s) to create from input") args = argparser.parse_args() levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] level = levels[min(3, args.verbosity)] logging.basicConfig(level=level) - database = mean_database if args.interval else parser.database - if args.start_time and args.end_time: - result = create_giant_netCDF(args.start_time, args.end_time, args.input_files, args.output[0], args.zlib, - args.chunk_size, - args.interval, database) - if not result: - raise IOError('An empty ASCII file was found') - elif args.start_time: - end_time = args.start_time.replace(hour=23, minute=59, second=59) - result = create_giant_netCDF(args.start_time, end_time, args.input_files, args.output[0], args.zlib, - args.chunk_size, - args.interval, database) - if not result: - raise IOError('An empty ASCII file was found') - elif args.end_time: + if args.start_time and not args.end_time: + args.end_time = args.start_time.replace(hour=23, minute=59, second=59) + elif not args.start_time and args.end_time: raise ValueError('start time must be specified when end time is specified') + + database = MEAN_DATABASE if args.interval else schema.database + if args.start_time and args.end_time: + create_giant_netcdf(args.input_files, args.output[0], args.zlib, args.chunk_size, args.start_time, + args.end_time, args.interval, database) else: create_multiple(args.input_files, args.output, args.zlib, args.chunk_size) diff --git a/aosstower/schema.py b/aosstower/schema.py index 65fa4d2..2165074 100644 --- a/aosstower/schema.py +++ b/aosstower/schema.py @@ -2,7 +2,7 @@ from numpy import float32 from collections import namedtuple -Var = namedtuple('Var', ['type', 'standard_name', 'name', 'description', 'units', 'valid_min', 'valid_max']) +Var = namedtuple('Var', ['type', 'standard_name', 'name', 'description', 'units', 'valid_min', 'valid_max', 'valid_delta']) database = dict( @@ -13,7 +13,8 @@ database = dict( 'Auxillary Temperature', 'degC', '', - '' + '', + '', ), box_pressure=Var( float32, @@ -22,7 +23,9 @@ database = dict( 'Pressure inside the data logger enclosure', 'hPa', '850', - '1100'), + '1100', + '', + ), paro_air_temp_period=Var( float32, '', @@ -30,7 +33,8 @@ database = dict( '', '1', '', - '' + '', + '', ), paro_pressure_period=Var( float32, @@ -39,7 +43,9 @@ database = dict( '', '1', '', - ''), + '', + '', + ), paro_air_temp=Var( float32, 'air_temperature', @@ -47,7 +53,8 @@ database = dict( '', 'degC', '-50', - '50' + '50', + '', ), pressure=Var( float32, @@ -56,7 +63,8 @@ database = dict( 'Air pressure as measured from the PAROSCI pressure sensor', 'hPa', '850', - '1100' + '1100', + '', ), paro_cal_sig=Var( float32, @@ -65,7 +73,8 @@ database = dict( '', '', '', - '' + '', + '', ), box_rh=Var( float32, @@ -74,7 +83,8 @@ database = dict( 'Relative humidity inside the data logger enclosure', '%', '0', - '100' + '100', + '', ), box_air_temp=Var( float32, @@ -83,7 +93,8 @@ database = dict( 'Air temperature inside the data logger enclosure', 'degC', '-50', - '50' + '50', + '', ), air_temp_2=Var( float32, @@ -91,9 +102,10 @@ database = dict( 'air_temp_2', 'Auxillary air temperature', 'degC', - '-50', - '50', - ), + '-50', + '50', + '', + ), air_temp_3=Var( float32, 'air_temperature', @@ -101,7 +113,8 @@ database = dict( 'Auxillary air temperature', 'degC', '-50', - '50' + '50', + '', ), air_temp_4=Var( float32, @@ -110,7 +123,8 @@ database = dict( 'Auxillary air temperature', 'degC', '-50', - '50' + '50', + '', ), air_temp_5=Var( float32, @@ -119,7 +133,8 @@ database = dict( 'Auxillary air temperature', 'degC', '-50', - '50' + '50', + '', ), wind_speed=Var( float32, @@ -128,7 +143,8 @@ database = dict( 'Wind speed', 'm*s^-1', '0', - '50' + '50', + '', ), wind_dir=Var( float32, @@ -137,7 +153,8 @@ database = dict( 'Wind direction', 'degrees', '0', - '360' + '360', + '', ), rh_shield_freq=Var( float32, @@ -146,7 +163,8 @@ database = dict( '', 'hz', '', - '' + '', + '', ), rh=Var( float32, @@ -155,7 +173,8 @@ database = dict( 'Relative humidity', '%', '0', - '100' + '100', + '', ), air_temp_6_3m=Var( float32, @@ -164,7 +183,8 @@ database = dict( 'Air temperature 6.3m from tower base', 'degC', '-50', - '50' + '50', + '', ), dewpoint=Var( float32, @@ -173,7 +193,9 @@ database = dict( 'Calculated dewpoint temperature', 'degC', '-50', - '50'), + '50', + '', + ), rtd_shield_freq=Var( float32, '', @@ -181,7 +203,8 @@ database = dict( '', '', '', - '' + '', + '', ), air_temp=Var( float32, @@ -190,7 +213,8 @@ database = dict( 'Air temperature', 'degC', '-50', - '50' + '50', + '', ), solar_flux=Var( float32, @@ -199,7 +223,9 @@ database = dict( 'Solar flux', 'w*m^-2', '0', - '3000'), + '3000', + '', + ), precip=Var( float32, '', @@ -207,7 +233,8 @@ database = dict( 'Precipitation', 'mm', '0', - '254' + '254', + '', ), accum_precip=Var( float32, @@ -216,7 +243,8 @@ database = dict( 'Precipitation accumulated since 0Z', 'mm', '0', - '254' + '254', + '', ), altimeter=Var( float32, @@ -225,7 +253,8 @@ database = dict( '', 'inHg', '', - '' + '', + '', ), gust=Var( float32, @@ -234,10 +263,11 @@ database = dict( 'Wind gust over the previous 2 minutes', 'm/s', '0', - '50' + '50', + '', ) ) met_vars = {'air_temp', 'rh', 'solar_flux', 'pressure', 'precip', 'accum_precip', - 'wind_speed', 'wind_dir'} + 'wind_speed', 'wind_dir', 'gust'} engr_vars = set(database.keys()) - met_vars -- GitLab