import os import sys import logging import pandas as pd from datetime import datetime as dt from netCDF4 import Dataset import numpy as np import platform from aosstower import station, schema from aosstower.level_00 import parser from datetime import timedelta as delta from aosstower.level_b1 import calc LOG = logging.getLogger(__name__) STATION_NAME = "AOSS Tower" SOFTWARE_VERSION = '00' # Knots used to do ASOS wind gust calculations KNOTS_10 = calc.knots_to_mps(10.) KNOTS_9 = calc.knots_to_mps(9.) KNOTS_5 = calc.knots_to_mps(5.) KNOTS_3 = calc.knots_to_mps(3.) KNOTS_2 = calc.knots_to_mps(2.) def make_mean_dict(source_dict): """Create the '_mean','_low','_high' file structure.""" dest_dict = {} for key in source_dict: dest_dict[key + '_high'] = source_dict[key] dest_dict[key + '_mean'] = source_dict[key] dest_dict[key + '_low'] = source_dict[key] return dest_dict MEAN_DATABASE = make_mean_dict(schema.database) def filter_array(arr, valid_min, valid_max, valid_delta): """Create QC field array. Meanings -------- Bit 0 (0b1): value is equal to missing_value Bit 1 (0b10): value is less than the valid min Bit 2 (0b100): value is greater than the valid max Bit 3 (0b1000): difference between current and previous values exceeds valid_delta Arguments: arr (Series): Data array this QC is for valid_min (float): Valid minimum of the input data valid_max (float): Valid maximum of the input data valid_delta (float): Valid delta of one input data point to the previous point Returns: Series: Bit-mask describing the input data (see above bit meanings) """ qcControl = np.zeros(arr.shape, dtype=np.int8) qcControl[arr.isnull().values] |= 0b1 if valid_min: qcControl[(arr < valid_min).values] |= 0b10 if valid_max: qcControl[(arr > valid_max).values] |= 0b100 if valid_delta: # prepend to fix off-by-one array shape d = np.append([0], np.diff(arr)) qcControl[(d > valid_delta).values] |= 0b1000 return qcControl def write_dimensions(nc_file): nc_file.createDimension('time', None) nc_file.createDimension('max_len_station_name', 32) def create_variables(nc_file, first_stamp, database, chunk_sizes=None, zlib=False): # base_time long name btln = 'base time as unix timestamp' # base time units btu = 'seconds since 1970-01-01 00:00:00' # base time string bts = first_stamp.strftime('%Y-%m-%d 00:00:00Z') # time long name tln = 'Time offset from base_time' # time units tu = 'seconds since ' + first_stamp.strftime('%Y-%m-%d 00:00:00Z') coordinates = { # fields: type, dimension, fill, valid_min, std_name, longname, units, valid_max, cf_role, axis 'time': [np.float64, ('time',), -999., None, None, "Time offset from epoch", "seconds since 1970-01-01 00:00:00Z", None, None, None], 'lon': [np.float32, tuple(), -999., -180., 'longitude', None, 'degrees_east', 180., None], 'lat': [np.float32, tuple(), -999., -90., 'latitude', None, 'degrees_north', 90., None], 'alt': [np.float32, tuple(), -999., None, 'height', 'vertical distance', 'm', None, None], 'base_time': [np.int32, tuple(), -999., None, 'time', btln, btu, None, None], 'time_offset': [np.float64, ('time',), -999., None, 'time', tln, tu, None, None], 'station_name': ['c', ('max_len_station_name',), '\0', None, None, 'station name', None, None, 'timeseries_id'], } for key, attr in coordinates.items(): if attr[1] == ('max_len_station_name',) and chunk_sizes and chunk_sizes[0] > 32: cs = [32] else: cs = chunk_sizes variable = nc_file.createVariable(key, attr[0], dimensions=attr[1], fill_value=attr[2], zlib=zlib, chunksizes=cs) # create var attributes if key == 'alt': variable.positive = 'up' variable.axis = 'Z' if attr[3]: variable.valid_min = attr[3] variable.valid_max = attr[7] if attr[4]: variable.standard_name = attr[4] if attr[5]: variable.long_name = attr[5] if attr[6]: variable.units = attr[6] if attr[8]: variable.cf_role = attr[8] if key == 'base_time': variable.string = bts variable.ancillary_variables = 'time_offset' elif key == 'time_offset': variable.ancillary_variables = 'base_time' # CF default # if 'time' in key: # variable.calendar = 'gregorian' for entry in database: if entry == 'stamp': continue varTup = database[entry] variable = nc_file.createVariable(entry, np.float32, dimensions=('time',), fill_value=float(-999), zlib=zlib, chunksizes=chunk_sizes) variable.standard_name = varTup[1] variable.description = varTup[3] variable.units = varTup[4] if varTup[5] != '': variable.valid_min = float(varTup[5]) variable.valid_max = float(varTup[6]) qcVariable = nc_file.createVariable('qc_' + entry, 'b', dimensions=('time',), zlib=zlib, chunksizes=chunk_sizes) qcVariable.long_name = 'data quality' qcVariable.valid_range = np.byte(0b1), np.byte(0b1111) qcVariable.flag_masks = np.byte(0b1), np.byte(0b10), np.byte(0b100), np.byte(0b1000) flagMeaning = ['value is equal to missing_value', 'value is less than the valid min', 'value is greater than the valid max', 'difference between current and previous values exceeds valid_delta.'] qcVariable.flag_meaning = ', '.join(flagMeaning) def calculate_wind_gust(wind_speed_5s, wind_speed_2m): """Calculate reportable wind gust from wind speed averages. Determination of wind gusts follows the Autmated Surface Observing System (ASOS) User's Guide created by NOAA. The guide can be found here: http://www.nws.noaa.gov/asos/pdfs/aum-toc.pdf Criteria Summary ---------------- Wind Gust: 1. 2 minute average Arguments: wind_speed_5s (Series): 5-second average of instantaneous wind speed magnitudes in m/s wind_speed_2m (Series): 2-minute rolling average of the 5 second wind speed averages in m/s Returns: Series: 5-second rolling wind speed gusts meeting the ASOS criteria for being a reportable wind gust. """ # 1 minute rolling peaks wind_peak_1m = wind_speed_5s.rolling(window=12, center=False).max() # criteria for a fast wind to be considered a wind gust gust_mask = (wind_speed_2m >= KNOTS_9) & \ (wind_peak_1m >= wind_speed_2m + KNOTS_5) gusts = wind_peak_1m.mask(~gust_mask) # determine highest gust in the last 10 minutes # 5 seconds * 120 = 10 minutes max_10m_gusts = gusts.rolling(window=120, center=False).max() # Minimum 5-second average in the past 10 minutes min_10m_5avg = wind_speed_5s.rolling(window=120, center=False).min() # criteria for a wind gust to be reportable reportable_mask = (max_10m_gusts >= wind_speed_2m + KNOTS_3) & \ (wind_speed_2m > KNOTS_2) & \ (max_10m_gusts >= min_10m_5avg + KNOTS_10) # Reportable gusts at 5 second resolution reportable_gusts = max_10m_gusts.mask(~reportable_mask) return reportable_gusts def minute_averages(frame): """Average the data frame at a 1 minute interval. Wind direction and wind speed must be handled in a special manner """ # Add wind direction components so we can average wind direction properly frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) # round up each 1 minute group so data at time T is the average of data # from T - 1 (exclusive) to T (inclusive). new_frame = frame.resample('1T', closed='right', loffset='1T').mean() # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes) winds_frame_5s = frame[['wind_speed', 'wind_east', 'wind_north']] winds_frame_5s = winds_frame_5s.resample('5S', closed='right', loffset='5S').mean() winds_frame_2m = winds_frame_5s.rolling(24, win_type='boxcar').mean() # rolling average is used for 1 minute output new_frame.update(winds_frame_2m.ix[new_frame.index]) # adds wind_speed, wind_east/north new_frame['wind_dir'] = calc.wind_vector_degrees(new_frame['wind_east'], new_frame['wind_north']) del new_frame['wind_east'] del new_frame['wind_north'] gust = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed']) # "average" the gusts to minute resolution to match the rest of the data new_frame['gust'] = gust.resample('1T', closed='right', loffset='1T').max() return new_frame.fillna(np.nan) def average_over_interval(frame, interval_width): """takes a frame and an interval to average it over, and returns a minimum, maximum, and average dataframe for that interval """ # round each timestamp to the nearest minute # the value at time X is for the data X - interval_width minutes gb = frame.resample(interval_width, closed='right', loffset=interval_width) low = gb.min() low.columns = [c + "_low" for c in low.columns] high = gb.max() high.columns = [c + "_high" for c in high.columns] mean = gb.mean() mean.columns = [c + "_mean" for c in mean.columns] out_frames = pd.concat((low, high, mean), axis=1) return out_frames def get_data(input_files): frames = [] for filename in input_files: frame = pd.DataFrame(parser.read_frames(filename)) frame.set_index('stamp', inplace=True) frame.mask(frame == -99999., inplace=True) frame.fillna(value=np.nan, inplace=True) frames.append(frame) return pd.concat(frames, axis=1, copy=False) def write_vars(nc_file, frame, database): # all time points from epoch time_epoch = (frame.index.astype(np.int64) // 10**9).values # midnight of data's first day base_epoch = frame.index[0].replace(hour=0, minute=0, second=0, microsecond=0).value // 10**9 fileVar = nc_file.variables fileVar['base_time'].assignValue(base_epoch) fileVar['time_offset'][:] = time_epoch - base_epoch fileVar['time'][:] = time_epoch # write coordinate var values to file # alt might not be right, need to verify fileVar['lon'].assignValue(station.LONGITUDE) fileVar['lat'].assignValue(station.LATITUDE) fileVar['alt'].assignValue(328) # write station name to file fileVar['station_name'][:len(STATION_NAME)] = STATION_NAME # writes data into file for varName in frame: if varName not in fileVar: LOG.warning('Extraneous key: %s in frame' % varName) continue fileVar[varName][:] = frame[varName].fillna(-999.).values valid_min = database[varName][5] valid_max = database[varName][6] valid_delta = database[varName][7] fileVar['qc_' + varName][:] = filter_array(frame[varName], float(valid_min) if valid_min else valid_min, float(valid_max) if valid_max else valid_max, float(valid_delta) if valid_delta else valid_delta) coordinates = ['lon', 'lat', 'alt', 'base_time', 'time_offset', 'station_name', 'time'] for varName in fileVar: if not varName.startswith('qc_') and \ varName not in frame and \ varName not in coordinates: # if a variable should be in file (database), but isn't in the # input data then make sure the QC field lists it as all fills fileVar['qc_' + varName][:] |= 0b1 def write_global_attributes(nc_file, input_sources): # create global attributes nc_file.source = 'surface observation' nc_file.conventions = 'ARM-1.2 CF-1.6' nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)' nc_file.featureType = 'timeSeries' nc_file.data_level = 'b1' # monthly files end with .month.nc # these end with .day.nc nc_file.datastream = 'aoss.tower.nc-1d-1m.b1.v{}'.format(SOFTWARE_VERSION) nc_file.software_version = SOFTWARE_VERSION nc_file.command_line = " ".join(sys.argv) # generate history nc_file.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__) nc_file.input_source = input_sources[0] nc_file.input_sources = ', '.join(input_sources) # The purpose of this method is to take a begin date, and end date # input filenames and output filename and create a netCDF file # based upon that # @param start time - a start datetime object # @param end time - an end datetime object # @param input filenames - list of filenames # @param output filename - filename of the netcdf file def create_giant_netcdf(input_files, outputName, zlib, chunk_size, start=None, end=None, interval_width=None, database=schema.database): frame = get_data(input_files) if frame.empty: raise ValueError("No data found from input files: {}".format(", ".join(input_files))) frame = minute_averages(frame) if interval_width: frame = average_over_interval(frame, interval_width) if start and end: frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')] if chunk_size and not isinstance(chunk_size, (list, tuple)): chunk_sizes = [chunk_size] else: chunk_sizes = [frame.shape[0]] first_stamp = dt.strptime(str(frame.index[0]), '%Y-%m-%d %H:%M:%S') nc_file = Dataset(outputName, 'w', format='NETCDF4_CLASSIC') write_dimensions(nc_file) create_variables(nc_file, first_stamp, database, chunk_sizes, zlib) write_vars(nc_file, frame, database) write_global_attributes(nc_file, input_files) nc_file.close() return nc_file def create_multiple(filenames, output_filenames, zlib, chunkSize): if output_filenames and len(filenames) != len(output_filenames): raise ValueError( 'Number of output filenames must equal number of input filenames when start and end times are not specified') success = False for idx, filename in enumerate(filenames): try: create_giant_netcdf([filename], output_filenames[idx], zlib, chunkSize, None, None) success = True except (ValueError, TypeError): LOG.error("Could not generate NetCDF file for {}".format(filename), exc_info=1) if not success: raise IOError('All ASCII files were empty or could not be read') def _dt_convert(datetime_str): """Parse datetime string, return datetime object""" try: return dt.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S') except ValueError: return dt.strptime(datetime_str, '%Y-%m-%d') def main(): import argparse argparser = argparse.ArgumentParser(description="Convert level_00 aoss tower data to level_a0", fromfile_prefix_chars='@') argparser.add_argument('-v', '--verbose', action="count", default=int(os.environ.get("VERBOSITY", 2)), dest='verbosity', help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)') argparser.add_argument('-s', '--start-time', type=_dt_convert, help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given" + ". Formats allowed: \'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'") argparser.add_argument('-e', '--end-time', type=_dt_convert, help='End time of massive netcdf file. Formats allowed:' + "\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'") argparser.add_argument('-i', '--interval', help="""Width of the interval to average input data over in Pandas offset format. If not specified, 1 minute averages are used. If specified then '_high', '_mean', and '_low' versions of the data fields are written to the output NetCDF. Use '1D' for daily or '5T' for 5 minute averages. See this page for more details: http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""") argparser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file') argparser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib') argparser.add_argument("input_files", nargs="+", help="aoss_tower level_00 paths. Use @filename to red a list of paths from that file.") argparser.add_argument('-o', '--output', nargs="+", help="NetCDF filename(s) to create from input") args = argparser.parse_args() levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] level = levels[min(3, args.verbosity)] logging.basicConfig(level=level) if args.start_time and not args.end_time: args.end_time = args.start_time.replace(hour=23, minute=59, second=59) elif not args.start_time and args.end_time: raise ValueError('start time must be specified when end time is specified') database = MEAN_DATABASE if args.interval else schema.database if args.start_time and args.end_time: create_giant_netcdf(args.input_files, args.output[0], args.zlib, args.chunk_size, args.start_time, args.end_time, args.interval, database) else: create_multiple(args.input_files, args.output, args.zlib, args.chunk_size) if __name__ == "__main__": sys.exit(main())