Skip to content
Snippets Groups Projects
nc.py 18 KiB
Newer Older
kgao's avatar
kgao committed
import os
import sys
import logging
import pandas as pd
from datetime import datetime as dt
from netCDF4 import Dataset
import numpy as np
kgao's avatar
kgao committed
import platform
from aosstower import station, schema
from aosstower.level_00 import parser
from datetime import timedelta as delta
from aosstower.level_b1 import calc
kgao's avatar
kgao committed

LOG = logging.getLogger(__name__)
STATION_NAME = "AOSS Tower"
SOFTWARE_VERSION = '00'
# Knots used to do ASOS wind gust calculations
KNOTS_10 = calc.knots_to_mps(10.)
KNOTS_9 = calc.knots_to_mps(9.)
KNOTS_5 = calc.knots_to_mps(5.)
KNOTS_3 = calc.knots_to_mps(3.)
KNOTS_2 = calc.knots_to_mps(2.)
kgao's avatar
kgao committed


def make_mean_dict(source_dict):
    """Create the '_mean','_low','_high' file structure."""
    dest_dict = {}
    for key in source_dict:
        dest_dict[key + '_high'] = source_dict[key]
        dest_dict[key + '_mean'] = source_dict[key]
        dest_dict[key + '_low'] = source_dict[key]
MEAN_DATABASE = make_mean_dict(schema.database)
kgao's avatar
kgao committed


def filter_array(arr, valid_min, valid_max, valid_delta):
    """Create QC field array.
    Meanings
    --------
        Bit 0 (0b1): value is equal to missing_value
        Bit 1 (0b10): value is less than the valid min
        Bit 2 (0b100): value is greater than the valid max
        Bit 3 (0b1000): difference between current and previous values exceeds valid_delta
kgao's avatar
kgao committed

    Arguments:
        arr (Series): Data array this QC is for
        valid_min (float): Valid minimum of the input data
        valid_max (float): Valid maximum of the input data
        valid_delta (float): Valid delta of one input data point to the previous point
kgao's avatar
kgao committed

    Returns:
        Series: Bit-mask describing the input data (see above bit meanings)
   """
    qcControl = np.zeros(arr.shape, dtype=np.int8)
    qcControl[arr.isnull().values] |= 0b1
    if valid_min:
        qcControl[(arr < valid_min).values] |= 0b10
    if valid_max:
        qcControl[(arr > valid_max).values] |= 0b100
    if valid_delta:
        # prepend to fix off-by-one array shape
        d = np.append([0], np.diff(arr))
        qcControl[(d > valid_delta).values] |= 0b1000
    return qcControl
kgao's avatar
kgao committed

kgao's avatar
kgao committed

def write_dimensions(nc_file):
    nc_file.createDimension('time', None)
    nc_file.createDimension('max_len_station_name', 32)
kgao's avatar
kgao committed

def create_variables(nc_file, first_stamp, database, chunk_sizes=None, zlib=False):
    # base_time long name
kgao's avatar
kgao committed
    btln = 'base time as unix timestamp'

    # base time units
kgao's avatar
kgao committed
    btu = 'seconds since 1970-01-01 00:00:00'

    # base time string
    bts = first_stamp.strftime('%Y-%m-%d 00:00:00Z')
kgao's avatar
kgao committed

    # time long name
    tln = 'Time offset from base_time'
kgao's avatar
kgao committed

    # time units
    tu = 'seconds since ' + first_stamp.strftime('%Y-%m-%d 00:00:00Z')
kgao's avatar
kgao committed

    coordinates = {
        # fields: type, dimension, fill, valid_min, std_name, longname, units, valid_max, cf_role, axis
        'time': [np.float64, ('time',), -999., None, None, "Time offset from epoch",
                 "seconds since 1970-01-01 00:00:00Z", None, None, None],
        'lon': [np.float32, tuple(), -999., -180., 'longitude', None, 'degrees_east', 180., None],
        'lat': [np.float32, tuple(), -999., -90., 'latitude', None, 'degrees_north', 90., None],
        'alt': [np.float32, tuple(), -999., None, 'height', 'vertical distance', 'm', None, None],
        'base_time': [np.int32, tuple(), -999., None, 'time', btln, btu, None, None],
        'time_offset': [np.float64, ('time',), -999., None, 'time', tln, tu, None, None],
        'station_name': ['c', ('max_len_station_name',), '\0', None, None, 'station name', None, None, 'timeseries_id'],
    }

    for key, attr in coordinates.items():
        if attr[1] == ('max_len_station_name',) and chunk_sizes and chunk_sizes[0] > 32:
            cs = [32]
kgao's avatar
kgao committed
        else:
            cs = chunk_sizes

        variable = nc_file.createVariable(key, attr[0],
                                          dimensions=attr[1],
                                          fill_value=attr[2],
                                          zlib=zlib,
                                          chunksizes=cs)
kgao's avatar
kgao committed

        # create var attributes
kgao's avatar
kgao committed
        if key == 'alt':
            variable.positive = 'up'
            variable.axis = 'Z'

kgao's avatar
kgao committed
            variable.valid_min = attr[3]
            variable.valid_max = attr[7]

kgao's avatar
kgao committed
            variable.standard_name = attr[4]

kgao's avatar
kgao committed
            variable.long_name = attr[5]

kgao's avatar
kgao committed
            variable.units = attr[6]

        if attr[8]:
            variable.cf_role = attr[8]
kgao's avatar
kgao committed

        if key == 'base_time':
            variable.string = bts
            variable.ancillary_variables = 'time_offset'
        elif key == 'time_offset':
            variable.ancillary_variables = 'base_time'
kgao's avatar
kgao committed

        # CF default
        # if 'time' in key:
        #     variable.calendar = 'gregorian'
        if entry == 'stamp':
kgao's avatar
kgao committed
            continue

        variable = nc_file.createVariable(entry, np.float32,
                                          dimensions=('time',), fill_value=float(-999), zlib=zlib,
                                          chunksizes=chunk_sizes)
kgao's avatar
kgao committed
        variable.standard_name = varTup[1]
        variable.description = varTup[3]
        variable.units = varTup[4]

        if varTup[5] != '':
            variable.valid_min = float(varTup[5])
            variable.valid_max = float(varTup[6])

        qcVariable = nc_file.createVariable('qc_' + entry, 'b',
                                            dimensions=('time',), zlib=zlib, chunksizes=chunk_sizes)
kgao's avatar
kgao committed

        qcVariable.long_name = 'data quality'
        qcVariable.valid_range = np.byte(0b1), np.byte(0b1111)
        qcVariable.flag_masks = np.byte(0b1), np.byte(0b10), np.byte(0b100), np.byte(0b1000)

        flagMeaning = ['value is equal to missing_value',
                       'value is less than the valid min',
kgao's avatar
kgao committed
                       'value is greater than the valid max',
                       'difference between current and previous values exceeds valid_delta.']

        qcVariable.flag_meaning = ', '.join(flagMeaning)


def calculate_wind_gust(wind_speed_5s, wind_speed_2m):
    """Calculate reportable wind gust from wind speed averages.
    Determination of wind gusts follows the Autmated Surface Observing System
    (ASOS) User's Guide created by NOAA. The guide can be found here:
kgao's avatar
kgao committed

        http://www.nws.noaa.gov/asos/pdfs/aum-toc.pdf
    Criteria Summary
    ----------------
kgao's avatar
kgao committed

kgao's avatar
kgao committed

        1. 2 minute average
kgao's avatar
kgao committed

    Arguments:
        wind_speed_5s (Series): 5-second average of instantaneous wind speed magnitudes in m/s
        wind_speed_2m (Series): 2-minute rolling average of the 5 second wind speed averages in m/s
kgao's avatar
kgao committed

    Returns:
        Series: 5-second rolling wind speed gusts meeting the ASOS criteria
                for being a reportable wind gust.
kgao's avatar
kgao committed

    """
    # 1 minute rolling peaks
    wind_peak_1m = wind_speed_5s.rolling(window=12, center=False).max()
    # criteria for a fast wind to be considered a wind gust
    gust_mask = (wind_speed_2m >= KNOTS_9) & \
                (wind_peak_1m >= wind_speed_2m + KNOTS_5)
    gusts = wind_peak_1m.mask(~gust_mask)
    # determine highest gust in the last 10 minutes
    # 5 seconds * 120 = 10 minutes
    max_10m_gusts = gusts.rolling(window=120, center=False).max()
    # Minimum 5-second average in the past 10 minutes
    min_10m_5avg = wind_speed_5s.rolling(window=120, center=False).min()
    # criteria for a wind gust to be reportable
    reportable_mask = (max_10m_gusts >= wind_speed_2m + KNOTS_3) & \
                      (wind_speed_2m > KNOTS_2) & \
                      (max_10m_gusts >= min_10m_5avg + KNOTS_10)
    # Reportable gusts at 5 second resolution
    reportable_gusts = max_10m_gusts.mask(~reportable_mask)
    return reportable_gusts
kgao's avatar
kgao committed

def minute_averages(frame):
    """Average the data frame at a 1 minute interval.
    Wind direction and wind speed must be handled in a special manner
    """
    # Add wind direction components so we can average wind direction properly
    frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir'])
    # round up each 1 minute group so data at time T is the average of data
    # from T - 1 (exclusive) to T (inclusive).
    new_frame = frame.resample('1T', closed='right', loffset='1T').mean()
    # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes)
    winds_frame_5s = frame[['wind_speed', 'wind_east', 'wind_north']]
    winds_frame_5s = winds_frame_5s.resample('5S', closed='right', loffset='5S').mean()
    winds_frame_2m = winds_frame_5s.rolling(24, win_type='boxcar').mean()
    # rolling average is used for 1 minute output
    new_frame.update(winds_frame_2m.ix[new_frame.index])  # adds wind_speed, wind_east/north
    new_frame['wind_dir'] = calc.wind_vector_degrees(new_frame['wind_east'], new_frame['wind_north'])
    del new_frame['wind_east']
    del new_frame['wind_north']
    gust = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed'])
    # "average" the gusts to minute resolution to match the rest of the data
    new_frame['gust'] = gust.resample('1T', closed='right', loffset='1T').max()
    return new_frame.fillna(np.nan)

def average_over_interval(frame, interval_width):
    """takes a frame and an interval to average it over, and returns a minimum,
    maximum, and average dataframe for that interval
    """
    # round each timestamp to the nearest minute
    # the value at time X is for the data X - interval_width minutes
    gb = frame.resample(interval_width, closed='right', loffset=interval_width)

    low = gb.min()
    low.columns = [c + "_low" for c in low.columns]
    high = gb.max()
    high.columns = [c + "_high" for c in high.columns]
    mean = gb.mean()
    mean.columns = [c + "_mean" for c in mean.columns]

    out_frames = pd.concat((low, high, mean), axis=1)
    return out_frames
kgao's avatar
kgao committed

def get_data(input_files):
    frames = []
    for filename in input_files:
        frame = pd.DataFrame(parser.read_frames(filename))
        frame.set_index('stamp', inplace=True)
        frame.mask(frame == -99999., inplace=True)
        frame.fillna(value=np.nan, inplace=True)
        frames.append(frame)
kgao's avatar
kgao committed

    return pd.concat(frames, axis=1, copy=False)
def write_vars(nc_file, frame, database):
    # all time points from epoch
    time_epoch = (frame.index.astype(np.int64) // 10**9).values
    # midnight of data's first day
    base_epoch = frame.index[0].replace(hour=0, minute=0, second=0, microsecond=0).value // 10**9
kgao's avatar
kgao committed

    fileVar = nc_file.variables
    fileVar['base_time'].assignValue(base_epoch)
    fileVar['time_offset'][:] = time_epoch - base_epoch
    fileVar['time'][:] = time_epoch
kgao's avatar
kgao committed

    # write coordinate var values to file
    # alt might not be right, need to verify
kgao's avatar
kgao committed
    fileVar['lon'].assignValue(station.LONGITUDE)
    fileVar['lat'].assignValue(station.LATITUDE)
    fileVar['alt'].assignValue(328)

    # write station name to file
    fileVar['station_name'][:len(STATION_NAME)] = STATION_NAME
kgao's avatar
kgao committed

    # writes data into file
kgao's avatar
kgao committed
    for varName in frame:
            LOG.warning('Extraneous key: %s in frame' % varName)
        fileVar[varName][:] = frame[varName].fillna(-999.).values
kgao's avatar
kgao committed

        valid_min = database[varName][5]
        valid_max = database[varName][6]
        valid_delta = database[varName][7]
kgao's avatar
kgao committed

        fileVar['qc_' + varName][:] = filter_array(frame[varName],
                                                   float(valid_min) if valid_min else valid_min,
                                                   float(valid_max) if valid_max else valid_max,
                                                   float(valid_delta) if valid_delta else valid_delta)
kgao's avatar
kgao committed

    coordinates = ['lon', 'lat', 'alt', 'base_time', 'time_offset', 'station_name', 'time']

    for varName in fileVar:
        if not varName.startswith('qc_') and \
                        varName not in frame and \
                        varName not in coordinates:
            # if a variable should be in file (database), but isn't in the
            # input data then make sure the QC field lists it as all fills
            fileVar['qc_' + varName][:] |= 0b1
kgao's avatar
kgao committed


def write_global_attributes(nc_file, input_sources):
    # create global attributes
    nc_file.source = 'surface observation'
    nc_file.conventions = 'ARM-1.2 CF-1.6'
    nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)'
    nc_file.featureType = 'timeSeries'
    nc_file.data_level = 'b1'
kgao's avatar
kgao committed

    # monthly files end with .month.nc
    # these end with .day.nc
    nc_file.datastream = 'aoss.tower.nc-1d-1m.b1.v{}'.format(SOFTWARE_VERSION)
    nc_file.software_version = SOFTWARE_VERSION
    nc_file.command_line = " ".join(sys.argv)

    # generate history
    nc_file.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__)
    nc_file.input_source = input_sources[0]
    nc_file.input_sources = ', '.join(input_sources)
kgao's avatar
kgao committed


# The purpose of this method is to take a begin date, and end date
kgao's avatar
kgao committed
# input filenames and output filename and create a netCDF file 
# based upon that
# @param start time - a start datetime object
# @param end time - an end datetime object
# @param input filenames - list of filenames
# @param output filename - filename of the netcdf file

def create_giant_netcdf(input_files, outputName, zlib, chunk_size,
                        start=None, end=None, interval_width=None,
                        database=schema.database):
    frame = get_data(input_files)
    if frame.empty:
        raise ValueError("No data found from input files: {}".format(", ".join(input_files)))
    frame = minute_averages(frame)
    if interval_width:
        frame = average_over_interval(frame, interval_width)
kgao's avatar
kgao committed

    if start and end:
        frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')]
    if chunk_size and not isinstance(chunk_size, (list, tuple)):
        chunk_sizes = [chunk_size]
kgao's avatar
kgao committed
    else:
        chunk_sizes = [frame.shape[0]]
    first_stamp = dt.strptime(str(frame.index[0]), '%Y-%m-%d %H:%M:%S')
    nc_file = Dataset(outputName, 'w', format='NETCDF4_CLASSIC')
    write_dimensions(nc_file)
    create_variables(nc_file, first_stamp, database, chunk_sizes, zlib)
    write_vars(nc_file, frame, database)
    write_global_attributes(nc_file, input_files)
    nc_file.close()
    return nc_file
kgao's avatar
kgao committed

def create_multiple(filenames, output_filenames, zlib, chunkSize):
    if output_filenames and len(filenames) != len(output_filenames):
        raise ValueError(
            'Number of output filenames must equal number of input filenames when start and end times are not specified')
kgao's avatar
kgao committed

    success = False
    for idx, filename in enumerate(filenames):
        try:
            create_giant_netcdf([filename], output_filenames[idx], zlib, chunkSize, None, None)
            success = True
        except (ValueError, TypeError):
            LOG.error("Could not generate NetCDF file for {}".format(filename), exc_info=1)
    if not success:
        raise IOError('All ASCII files were empty or could not be read')
kgao's avatar
kgao committed

def _dt_convert(datetime_str):
    """Parse datetime string, return datetime object"""
kgao's avatar
kgao committed
        return dt.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S')
    except ValueError:
kgao's avatar
kgao committed
        return dt.strptime(datetime_str, '%Y-%m-%d')
kgao's avatar
kgao committed
def main():
    import argparse
Matthew Westphall's avatar
Matthew Westphall committed
    argparser = argparse.ArgumentParser(description="Convert level_00 aoss tower data to level_a0",
                                        fromfile_prefix_chars='@')
kgao's avatar
kgao committed

    argparser.add_argument('-v', '--verbose', action="count", default=int(os.environ.get("VERBOSITY", 2)),
                           dest='verbosity',
                           help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)')

    argparser.add_argument('-s', '--start-time', type=_dt_convert,
                           help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given" +
                                ". Formats allowed: \'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
    argparser.add_argument('-e', '--end-time', type=_dt_convert,
                           help='End time of massive netcdf file. Formats allowed:' +
                                "\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
    argparser.add_argument('-i', '--interval',
                           help="""Width of the interval to average input data
over in Pandas offset format. If not specified, 1 minute averages are used. If
specified then '_high', '_mean', and '_low' versions of the data fields are
written to the output NetCDF.
Use '1D' for daily or '5T' for 5 minute averages.
See this page for more details:
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""")
    argparser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file')
    argparser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib')
kgao's avatar
kgao committed

    argparser.add_argument("input_files", nargs="+",
                           help="aoss_tower level_00 paths. Use @filename to red a list of paths from that file.")
kgao's avatar
kgao committed

    argparser.add_argument('-o', '--output', nargs="+", help="NetCDF filename(s) to create from input")
kgao's avatar
kgao committed

    levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
    level = levels[min(3, args.verbosity)]
kgao's avatar
kgao committed
    logging.basicConfig(level=level)

    if args.start_time and not args.end_time:
        args.end_time = args.start_time.replace(hour=23, minute=59, second=59)
    elif not args.start_time and args.end_time:
        raise ValueError('start time must be specified when end time is specified')

    database = MEAN_DATABASE if args.interval else schema.database
    if args.start_time and args.end_time:
        create_giant_netcdf(args.input_files, args.output[0], args.zlib, args.chunk_size, args.start_time,
                            args.end_time, args.interval, database)
kgao's avatar
kgao committed
    else:
        create_multiple(args.input_files, args.output, args.zlib, args.chunk_size)

kgao's avatar
kgao committed
if __name__ == "__main__":
    sys.exit(main())