Newer
Older
import os
import sys
import logging
import pandas as pd
from datetime import datetime as dt
from netCDF4 import Dataset
from aosstower import station, schema
from aosstower.level_00 import parser
from aosstower.level_b1 import calc
STATION_NAME = "AOSS Tower"
SOFTWARE_VERSION = '00'
# Knots used to do ASOS wind gust calculations
KNOTS_10 = calc.knots_to_mps(10.)
KNOTS_9 = calc.knots_to_mps(9.)
KNOTS_5 = calc.knots_to_mps(5.)
KNOTS_3 = calc.knots_to_mps(3.)
KNOTS_2 = calc.knots_to_mps(2.)
def make_mean_dict(source_dict):
"""Create the '_mean','_low','_high' file structure."""
dest_dict = {}
for key in source_dict:
dest_dict[key + '_high'] = source_dict[key]
dest_dict[key + '_mean'] = source_dict[key]
dest_dict[key + '_low'] = source_dict[key]
MEAN_DATABASE = make_mean_dict(schema.database)
def filter_array(arr, valid_min, valid_max, valid_delta):
"""Create QC field array.
Bit 0 (0b1): value is equal to missing_value
Bit 1 (0b10): value is less than the valid min
Bit 2 (0b100): value is greater than the valid max
Bit 3 (0b1000): difference between current and previous values exceeds valid_delta
Arguments:
arr (Series): Data array this QC is for
valid_min (float): Valid minimum of the input data
valid_max (float): Valid maximum of the input data
valid_delta (float): Valid delta of one input data point to the previous point
Returns:
Series: Bit-mask describing the input data (see above bit meanings)
"""
qcControl = np.zeros(arr.shape, dtype=np.int8)
qcControl[arr.isnull().values] |= 0b1
if valid_min:
qcControl[(arr < valid_min).values] |= 0b10
if valid_max:
qcControl[(arr > valid_max).values] |= 0b100
if valid_delta:
# prepend to fix off-by-one array shape
d = np.append([0], np.diff(arr))
qcControl[(d > valid_delta).values] |= 0b1000
return qcControl
def write_dimensions(nc_file):
nc_file.createDimension('time', None)
nc_file.createDimension('max_len_station_name', 32)
def create_variables(nc_file, first_stamp, database, chunk_sizes=None, zlib=False):
bts = first_stamp.strftime('%Y-%m-%d 00:00:00Z')
tln = 'Time offset from base_time'
tu = 'seconds since ' + first_stamp.strftime('%Y-%m-%d 00:00:00Z')
# fields: type, dimension, fill, valid_min, std_name, longname, units, valid_max, cf_role, axis
'time': [np.float64, ('time',), -999., None, None, "Time offset from epoch",
"seconds since 1970-01-01 00:00:00Z", None, None, None],
'lon': [np.float32, tuple(), -999., -180., 'longitude', None, 'degrees_east', 180., None],
'lat': [np.float32, tuple(), -999., -90., 'latitude', None, 'degrees_north', 90., None],
'alt': [np.float32, tuple(), -999., None, 'height', 'vertical distance', 'm', None, None],
'base_time': [np.int32, tuple(), -999., None, 'time', btln, btu, None, None],
'time_offset': [np.float64, ('time',), -999., None, 'time', tln, tu, None, None],
'station_name': ['c', ('max_len_station_name',), '\0', None, None, 'station name', None, None, 'timeseries_id'],
}
for key, attr in coordinates.items():
if attr[1] == ('max_len_station_name',) and chunk_sizes and chunk_sizes[0] > 32:
cs = [32]
cs = chunk_sizes
variable = nc_file.createVariable(key, attr[0],
dimensions=attr[1],
fill_value=attr[2],
zlib=zlib,
chunksizes=cs)
if key == 'alt':
variable.positive = 'up'
variable.axis = 'Z'
variable.valid_min = attr[3]
variable.valid_max = attr[7]
if attr[8]:
variable.cf_role = attr[8]
variable.ancillary_variables = 'time_offset'
elif key == 'time_offset':
variable.ancillary_variables = 'base_time'
# CF default
# if 'time' in key:
# variable.calendar = 'gregorian'
Matthew Westphall
committed
for entry in database:
Matthew Westphall
committed
varTup = database[entry]
variable = nc_file.createVariable(entry, np.float32,
dimensions=('time',), fill_value=float(-999), zlib=zlib,
chunksizes=chunk_sizes)
variable.standard_name = varTup[1]
variable.description = varTup[3]
variable.units = varTup[4]
variable.valid_min = float(varTup[5])
variable.valid_max = float(varTup[6])
qcVariable = nc_file.createVariable('qc_' + entry, 'b',
dimensions=('time',), zlib=zlib, chunksizes=chunk_sizes)
qcVariable.long_name = 'data quality'
qcVariable.valid_range = np.byte(0b1), np.byte(0b1111)
qcVariable.flag_masks = np.byte(0b1), np.byte(0b10), np.byte(0b100), np.byte(0b1000)
flagMeaning = ['value is equal to missing_value',
'value is less than the valid min',
'value is greater than the valid max',
'difference between current and previous values exceeds valid_delta.']
qcVariable.flag_meaning = ', '.join(flagMeaning)
def calculate_wind_gust(wind_speed_5s, wind_speed_2m):
"""Calculate reportable wind gust from wind speed averages.
Determination of wind gusts follows the Autmated Surface Observing System
(ASOS) User's Guide created by NOAA. The guide can be found here:
http://www.nws.noaa.gov/asos/pdfs/aum-toc.pdf
Criteria Summary
----------------
Arguments:
wind_speed_5s (Series): 5-second average of instantaneous wind speed magnitudes in m/s
wind_speed_2m (Series): 2-minute rolling average of the 5 second wind speed averages in m/s
Returns:
Series: 5-second rolling wind speed gusts meeting the ASOS criteria
for being a reportable wind gust.
"""
# 1 minute rolling peaks
wind_peak_1m = wind_speed_5s.rolling(window=12, center=False).max()
# criteria for a fast wind to be considered a wind gust
gust_mask = (wind_speed_2m >= KNOTS_9) & \
(wind_peak_1m >= wind_speed_2m + KNOTS_5)
gusts = wind_peak_1m.mask(~gust_mask)
# determine highest gust in the last 10 minutes
# 5 seconds * 120 = 10 minutes
max_10m_gusts = gusts.rolling(window=120, center=False).max()
# Minimum 5-second average in the past 10 minutes
min_10m_5avg = wind_speed_5s.rolling(window=120, center=False).min()
# criteria for a wind gust to be reportable
reportable_mask = (max_10m_gusts >= wind_speed_2m + KNOTS_3) & \
(wind_speed_2m > KNOTS_2) & \
(max_10m_gusts >= min_10m_5avg + KNOTS_10)
# Reportable gusts at 5 second resolution
reportable_gusts = max_10m_gusts.mask(~reportable_mask)
return reportable_gusts
def minute_averages(frame):
"""Average the data frame at a 1 minute interval.
Wind direction and wind speed must be handled in a special manner
"""
# Add wind direction components so we can average wind direction properly
frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir'])
# round up each 1 minute group so data at time T is the average of data
# from T - 1 (exclusive) to T (inclusive).
new_frame = frame.resample('1T', closed='right', loffset='1T').mean()
# 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes)
winds_frame_5s = frame[['wind_speed', 'wind_east', 'wind_north']]
winds_frame_5s = winds_frame_5s.resample('5S', closed='right', loffset='5S').mean()
winds_frame_2m = winds_frame_5s.rolling(24, win_type='boxcar').mean()
# rolling average is used for 1 minute output
new_frame.update(winds_frame_2m.ix[new_frame.index]) # adds wind_speed, wind_east/north
new_frame['wind_dir'] = calc.wind_vector_degrees(new_frame['wind_east'], new_frame['wind_north'])
del new_frame['wind_east']
del new_frame['wind_north']
gust = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed'])
# "average" the gusts to minute resolution to match the rest of the data
new_frame['gust'] = gust.resample('1T', closed='right', loffset='1T').max()
return new_frame.fillna(np.nan)
def average_over_interval(frame, interval_width):
Matthew Westphall
committed
"""takes a frame and an interval to average it over, and returns a minimum,
maximum, and average dataframe for that interval
"""
# round each timestamp to the nearest minute
# the value at time X is for the data X - interval_width minutes
gb = frame.resample(interval_width, closed='right', loffset=interval_width)
low = gb.min()
low.columns = [c + "_low" for c in low.columns]
high = gb.max()
high.columns = [c + "_high" for c in high.columns]
mean = gb.mean()
mean.columns = [c + "_mean" for c in mean.columns]
out_frames = pd.concat((low, high, mean), axis=1)
def get_data(input_files):
frames = []
for filename in input_files:
frame = pd.DataFrame(parser.read_frames(filename))
frame.set_index('stamp', inplace=True)
frame.mask(frame == -99999., inplace=True)
frame.fillna(value=np.nan, inplace=True)
frames.append(frame)
return pd.concat(frames, axis=1, copy=False)
def write_vars(nc_file, frame, database):
# all time points from epoch
time_epoch = (frame.index.astype(np.int64) // 10**9).values
# midnight of data's first day
base_epoch = frame.index[0].replace(hour=0, minute=0, second=0, microsecond=0).value // 10**9
fileVar = nc_file.variables
fileVar['base_time'].assignValue(base_epoch)
fileVar['time_offset'][:] = time_epoch - base_epoch
fileVar['time'][:] = time_epoch
# write coordinate var values to file
# alt might not be right, need to verify
fileVar['lon'].assignValue(station.LONGITUDE)
fileVar['lat'].assignValue(station.LATITUDE)
fileVar['alt'].assignValue(328)
fileVar['station_name'][:len(STATION_NAME)] = STATION_NAME
Matthew Westphall
committed
if varName not in fileVar:
LOG.warning('Extraneous key: %s in frame' % varName)
Matthew Westphall
committed
continue
fileVar[varName][:] = frame[varName].fillna(-999.).values
Matthew Westphall
committed
valid_min = database[varName][5]
valid_max = database[varName][6]
valid_delta = database[varName][7]
fileVar['qc_' + varName][:] = filter_array(frame[varName],
float(valid_min) if valid_min else valid_min,
float(valid_max) if valid_max else valid_max,
float(valid_delta) if valid_delta else valid_delta)
coordinates = ['lon', 'lat', 'alt', 'base_time', 'time_offset', 'station_name', 'time']
for varName in fileVar:
if not varName.startswith('qc_') and \
varName not in frame and \
varName not in coordinates:
# if a variable should be in file (database), but isn't in the
# input data then make sure the QC field lists it as all fills
fileVar['qc_' + varName][:] |= 0b1
def write_global_attributes(nc_file, input_sources):
# create global attributes
nc_file.source = 'surface observation'
nc_file.conventions = 'ARM-1.2 CF-1.6'
nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)'
nc_file.featureType = 'timeSeries'
nc_file.data_level = 'b1'
# monthly files end with .month.nc
# these end with .day.nc
nc_file.datastream = 'aoss.tower.nc-1d-1m.b1.v{}'.format(SOFTWARE_VERSION)
nc_file.software_version = SOFTWARE_VERSION
nc_file.command_line = " ".join(sys.argv)
# generate history
nc_file.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__)
nc_file.input_source = input_sources[0]
nc_file.input_sources = ', '.join(input_sources)
# The purpose of this method is to take a begin date, and end date
# input filenames and output filename and create a netCDF file
# based upon that
# @param start time - a start datetime object
# @param end time - an end datetime object
# @param input filenames - list of filenames
# @param output filename - filename of the netcdf file
def create_giant_netcdf(input_files, outputName, zlib, chunk_size,
start=None, end=None, interval_width=None,
database=schema.database):
frame = get_data(input_files)
if frame.empty:
raise ValueError("No data found from input files: {}".format(", ".join(input_files)))
frame = minute_averages(frame)
if interval_width:
frame = average_over_interval(frame, interval_width)
if start and end:
frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')]
if chunk_size and not isinstance(chunk_size, (list, tuple)):
chunk_sizes = [chunk_size]
first_stamp = dt.strptime(str(frame.index[0]), '%Y-%m-%d %H:%M:%S')
nc_file = Dataset(outputName, 'w', format='NETCDF4_CLASSIC')
write_dimensions(nc_file)
create_variables(nc_file, first_stamp, database, chunk_sizes, zlib)
write_vars(nc_file, frame, database)
write_global_attributes(nc_file, input_files)
nc_file.close()
return nc_file
Matthew Westphall
committed
def create_multiple(filenames, output_filenames, zlib, chunkSize):
if output_filenames and len(filenames) != len(output_filenames):
raise ValueError(
'Number of output filenames must equal number of input filenames when start and end times are not specified')
for idx, filename in enumerate(filenames):
try:
create_giant_netcdf([filename], output_filenames[idx], zlib, chunkSize, None, None)
success = True
except (ValueError, TypeError):
LOG.error("Could not generate NetCDF file for {}".format(filename), exc_info=1)
if not success:
raise IOError('All ASCII files were empty or could not be read')
"""Parse datetime string, return datetime object"""
Matthew Westphall
committed
argparser = argparse.ArgumentParser(description="Convert level_00 aoss tower data to level_a0",
fromfile_prefix_chars='@')
Matthew Westphall
committed
argparser.add_argument('-v', '--verbose', action="count", default=int(os.environ.get("VERBOSITY", 2)),
dest='verbosity',
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)')
argparser.add_argument('-s', '--start-time', type=_dt_convert,
help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given" +
". Formats allowed: \'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
argparser.add_argument('-e', '--end-time', type=_dt_convert,
help='End time of massive netcdf file. Formats allowed:' +
"\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
argparser.add_argument('-i', '--interval',
help="""Width of the interval to average input data
over in Pandas offset format. If not specified, 1 minute averages are used. If
specified then '_high', '_mean', and '_low' versions of the data fields are
written to the output NetCDF.
Use '1D' for daily or '5T' for 5 minute averages.
See this page for more details:
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""")
argparser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file')
Matthew Westphall
committed
argparser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib')
Matthew Westphall
committed
argparser.add_argument("input_files", nargs="+",
help="aoss_tower level_00 paths. Use @filename to red a list of paths from that file.")
argparser.add_argument('-o', '--output', nargs="+", help="NetCDF filename(s) to create from input")
Matthew Westphall
committed
args = argparser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
level = levels[min(3, args.verbosity)]
if args.start_time and not args.end_time:
args.end_time = args.start_time.replace(hour=23, minute=59, second=59)
elif not args.start_time and args.end_time:
raise ValueError('start time must be specified when end time is specified')
database = MEAN_DATABASE if args.interval else schema.database
if args.start_time and args.end_time:
create_giant_netcdf(args.input_files, args.output[0], args.zlib, args.chunk_size, args.start_time,
args.end_time, args.interval, database)
create_multiple(args.input_files, args.output, args.zlib, args.chunk_size)
Matthew Westphall
committed