Skip to content
Snippets Groups Projects
nc.py 17.1 KiB
Newer Older
kgao's avatar
kgao committed
import os
import sys
import logging
import pandas as pd
from datetime import datetime as dt
from aosstower.level_00 import parser
kgao's avatar
kgao committed
from netCDF4 import Dataset
import numpy as np
kgao's avatar
kgao committed
import platform
from aosstower import station
from datetime import timedelta as delta
from aosstower.level_b1 import calc
kgao's avatar
kgao committed

LOG = logging.getLogger(__name__)


def make_mean_dict(source_dict):
    """Create the '_mean','_low','_high' file structure."""
    dest_dict = {}
    for key in source_dict:
        dest_dict[key + '_high'] = source_dict[key]
        dest_dict[key + '_mean'] = source_dict[key]
        dest_dict[key + '_low'] = source_dict[key]
mean_database = make_mean_dict(parser.database)

kgao's avatar
kgao committed

def filter_array(array, valid_min, valid_max):
kgao's avatar
kgao committed
    qcControl = []

    for value in array:
kgao's avatar
kgao committed
        if value == float(-99999):
kgao's avatar
kgao committed
            qcControl.append(np.byte(0b1))

kgao's avatar
kgao committed
        elif valid_min != '' and value < float(valid_min):
kgao's avatar
kgao committed
            qcControl.append(np.byte(0b10))
kgao's avatar
kgao committed
        elif valid_max != '' and value > float(valid_max):
kgao's avatar
kgao committed
            qcControl.append(np.byte(0b100))
kgao's avatar
kgao committed
        else:
            qcControl.append(np.byte(0b0))

    return np.array(qcControl)

kgao's avatar
kgao committed

def write_dimensions(ncFile):
kgao's avatar
kgao committed
    ncFile.createDimension('time', None)
    ncFile.createDimension('max_len_station_name', 32)
kgao's avatar
kgao committed

kgao's avatar
kgao committed
    return ncFile


def create_variables(ncFile, firstStamp, chunksizes, zlib, database=parser.database):
    # base_time long name
kgao's avatar
kgao committed
    btln = 'base time as unix timestamp'

    # base time units
kgao's avatar
kgao committed
    btu = 'seconds since 1970-01-01 00:00:00'

    # base time string
kgao's avatar
kgao committed
    bts = firstStamp.strftime('%Y-%m-%d 00:00:00Z')

    # time long name
    tln = 'time offset from base_time'
kgao's avatar
kgao committed

    # time units
kgao's avatar
kgao committed
    tu = 'seconds since ' + firstStamp.strftime('%Y-%m-%d 00:00:00Z')

    coordinates = {
        # fields: type, dimension, fill, valid_min, std_name, longname, units, valid_max, cf_role, axis
        'lon': [np.float32, None, float(-999), '-180L', 'longitude', None, 'degrees_east', '180L', None],
        'lat': [np.float32, None, float(-999), '-90L', 'latitude', None, 'degrees_north', '90L', None],
        'alt': [np.float32, None, float(-999), None, 'height', 'vertical distance', 'm', None, None],
        'base_time': [np.int32, None, float(-999), None, 'time', btln, btu, None, None],
        'time_offset': [np.float64, 'time', float(-999), None, 'time', tln, tu, None, None],
        'station_name': ['c', 'max_len_station_name', '-', None, None, 'station name', None, None, 'timeseries_id'],
        'time': [np.float32, 'time', float(-999), None, None, "Time offset from epoch",
                 "seconds since 1970-01-01 00:00:00Z", None, None, None]
    }

    for key, attr in coordinates.items():
        if attr[1]:
            if attr[1] == 'max_len_station_name':
                if chunksizes and chunksizes[0] > 32:
                    variable = ncFile.createVariable(key, attr[0], dimensions=(attr[1]), fill_value=attr[2], zlib=zlib,
                                                     chunksizes=[32])
kgao's avatar
kgao committed
                else:
                    variable = ncFile.createVariable(key, attr[0], dimensions=(attr[1]), fill_value=attr[2], zlib=zlib,
                                                     chunksizes=chunksizes)
kgao's avatar
kgao committed
            else:
                variable = ncFile.createVariable(key, attr[0], dimensions=(attr[1]), fill_value=attr[2], zlib=zlib,
                                                 chunksizes=chunksizes)
kgao's avatar
kgao committed
        else:
kgao's avatar
kgao committed
            variable = ncFile.createVariable(key, attr[0], fill_value=attr[1], zlib=zlib, chunksizes=chunksizes)
kgao's avatar
kgao committed

        # create var attributes
kgao's avatar
kgao committed
        if key == 'alt':
            variable.positive = 'up'
            variable.axis = 'Z'

kgao's avatar
kgao committed
            variable.valid_min = attr[3]
            variable.valid_max = attr[7]

kgao's avatar
kgao committed
            variable.standard_name = attr[4]

kgao's avatar
kgao committed
            variable.long_name = attr[5]

kgao's avatar
kgao committed
            variable.units = attr[6]

        if attr[8]:
            variable.cf_role = attr[8]
kgao's avatar
kgao committed

        if key == 'base_time':
            variable.string = bts

        if 'time' in key:
            variable.calendar = 'gregorian'
        if entry == 'stamp':
kgao's avatar
kgao committed
            continue

kgao's avatar
kgao committed
        variable = ncFile.createVariable(entry, np.float32,
                                         dimensions=('time'), fill_value=float(-99999), zlib=zlib,
                                         chunksizes=chunksizes)
kgao's avatar
kgao committed
        variable.standard_name = varTup[1]
        variable.description = varTup[3]
        variable.units = varTup[4]

        if varTup[5] != '':
            variable.valid_min = float(varTup[5])
            variable.valid_max = float(varTup[6])

kgao's avatar
kgao committed
        qcVariable = ncFile.createVariable('qc_' + entry, 'b',
                                           dimensions=('time'), fill_value=0b0, zlib=zlib, chunksizes=chunksizes)
kgao's avatar
kgao committed

        qcVariable.long_name = 'data quality'
        qcVariable.valid_range = np.byte(0b1), np.byte(0b1111)
        qcVariable.flag_masks = np.byte(0b1), np.byte(0b10), np.byte(0b100), np.byte(0b1000)

        flagMeaning = ['value is equal to missing_value',
                       'value is less than the valid min',
kgao's avatar
kgao committed
                       'value is greater than the valid max',
                       'difference between current and previous values exceeds valid_delta.']

        qcVariable.flag_meaning = ', '.join(flagMeaning)

    # create global attributes
kgao's avatar
kgao committed
    ncFile.source = 'surface observation'
kgao's avatar
kgao committed
    ncFile.conventions = 'ARM-1.2 CF-1.6'
kgao's avatar
kgao committed
    ncFile.institution = 'UW SSEC'
    ncFile.featureType = 'timeSeries'
kgao's avatar
kgao committed
    ncFile.data_level = 'b1'

    # monthly files end with .month.nc
    # these end with .day.nc
kgao's avatar
kgao committed

kgao's avatar
kgao committed
    ncFile.datastream = 'aoss.tower.nc-1d-1m.b1.v00'
    ncFile.software_version = '00'
kgao's avatar
kgao committed

    # generate history
kgao's avatar
kgao committed
    ncFile.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__)
kgao's avatar
kgao committed
    return ncFile


def get_gust(rollingAvg, speeds):
kgao's avatar
kgao committed
    averages = rollingAvg.tolist()
    maxSpeed = speeds['wind_speed'].tolist()

    gust = []
    for idx, average in enumerate(averages):
        if not average:
            gust.append(np.nan)
            continue
kgao's avatar
kgao committed
        elif average >= 4.63 and maxSpeed[idx] > average + 2.573:
            gust.append(maxSpeed[idx])
kgao's avatar
kgao committed
        else:
            gust.append(np.nan)
            continue

    return gust

kgao's avatar
kgao committed

def get_rolling(series, minutes):
    """Get the rolling mean closest to the nearest minute"""
    returnSeries = series.rolling(25, win_type='boxcar').mean()
kgao's avatar
kgao committed
    data = {}
    for minute in minutes:
        # doesn't go past the minute
kgao's avatar
kgao committed
        data[minute] = returnSeries[returnSeries.index.asof(minute)]

    return pd.Series(data)

kgao's avatar
kgao committed

def get_new_wind_direction(wind_dir, wind_speed, stamps):
    newWindDir = {}

    for stamp in stamps:
        before = stamp - delta(minutes=1)

        if before not in wind_speed.index:
            newWindDir[stamp] = None
        else:
            speed = wind_speed[before: stamp].tolist()
            dire = wind_dir[before: stamp].tolist()

            wind_dire = calc.mean_wind_vector(speed, dire)[0]
            newWindDir[stamp] = wind_dire
    return pd.Series(newWindDir)


def minute_averages(frame):
    frame['minute'] = [(ts + delta(minutes=1)).replace(second=0) for ts in frame.index]
    newFrame = frame.groupby('minute').mean()
    newFrame.index.names = ['']
    columns = list(newFrame.columns.values)
    if 'wind_speed' in columns:
        del newFrame['wind_speed']

        windSeries = frame['wind_speed']
        windSeries = get_rolling(windSeries, list(newFrame.index))
        newFrame['wind_speed'] = windSeries
kgao's avatar
kgao committed
        rollingAvg = newFrame['wind_speed']
        maxSpeed = pd.DataFrame()
        maxSpeed['minute'] = frame['minute']
        maxSpeed['speed'] = frame['wind_speed']
        maxSpeed = frame.groupby('minute').max()
        gust = get_gust(rollingAvg, maxSpeed)
kgao's avatar
kgao committed
        newFrame['gust'] = gust
    if 'wind_dir' in columns:
        del newFrame['wind_dir']

        dupFrame = frame.set_index('minute')
        stamps = newFrame.index
        windDirSeries = dupFrame['wind_dir']
        windSeries = dupFrame['wind_speed']
        windDirSeries = get_new_wind_direction(windDirSeries, windSeries, stamps)
        newFrame['wind_dir'] = windDirSeries
    del frame['minute']
    return newFrame.fillna(-99999)


def average_over_interval(frame, interval_width):
    """takes a frame and an interval to average it over, and returns a minimum,
    maximum, and average dataframe for that interval"""
    ts = frame.index
    # round each timestamp to the nearest n minutes
    frame['interval'] = (ts.astype(int) - ts.astype(int) % (interval_width * 60e9)).astype('datetime64[ns]')
    out_frames = {
        'low': frame.groupby('interval').min(),
        'high': frame.groupby('interval').max(),
        'mean': frame.groupby('interval').mean(),
    }
    for key in out_frames:
        # append the appropriate suffix to each column
        columns = out_frames[key].columns
        out_frames[key].columns = ['_'.join([col, key]) for col in columns]
    out_frames = pd.concat(out_frames.values(), axis=1)
    return out_frames
kgao's avatar
kgao committed


def get_data(inputFiles):
    dictData = {}
kgao's avatar
kgao committed
    for filename in inputFiles:
kgao's avatar
kgao committed
        getFrames = list(parser.read_frames(filename))

        for frame in getFrames:
kgao's avatar
kgao committed
            if 'stamp' not in frame:
                continue

            stamp = frame['stamp']
            del frame['stamp']

            dictData[stamp] = frame

    return pd.DataFrame(dictData).transpose().replace(-99999, np.nan)
kgao's avatar
kgao committed


def write_vars(ncFile, frame, database=parser.database):
kgao's avatar
kgao committed
    stamps = list(frame.index)
    baseDTObj = dt.strptime(str(stamps[0]).split(' ')[0], '%Y-%m-%d')

    # find out how much time elapsed
    # since the origin to the start of the day
    # in seconds
    baseTimeValue = baseDTObj - dt(1970, 1, 1)
kgao's avatar
kgao committed
    baseTimeValue = baseTimeValue.total_seconds()

    # create time numpy
    timeNumpy = np.empty(len(stamps), dtype='float64')
kgao's avatar
kgao committed

    counter = 0

    # write stamps in, yo
kgao's avatar
kgao committed

    for stamp in stamps:
        stampObj = dt.strptime(str(stamp), '%Y-%m-%d %H:%M:%S')
        timeValue = (stampObj - baseDTObj).total_seconds()

        timeNumpy[counter] = timeValue
        counter += 1

    fileVar = ncFile.variables
    fileVar['base_time'].assignValue(baseTimeValue)
    fileVar['time_offset'][:] = timeNumpy
    fileVar['time'][:] = timeNumpy
kgao's avatar
kgao committed

    # write coordinate var values to file
    # alt might not be right, need to verify
kgao's avatar
kgao committed
    fileVar['lon'].assignValue(station.LONGITUDE)
    fileVar['lat'].assignValue(station.LATITUDE)
    fileVar['alt'].assignValue(328)

    # might change
kgao's avatar
kgao committed
    stationName = ("AOSS Tower")

    # transfer station name into array of chars
kgao's avatar
kgao committed
    statChars = list(stationName)
    statNumpy = np.asarray(statChars)

    # write station name to file
kgao's avatar
kgao committed
    fileVar['station_name'][0:len(statNumpy)] = statNumpy
kgao's avatar
kgao committed

    # writes data into file
kgao's avatar
kgao committed
    for varName in frame:
            logging.warn('Extraneous key: %s in frame' % varName)
kgao's avatar
kgao committed
        dataList = frame[varName].tolist()
kgao's avatar
kgao committed

kgao's avatar
kgao committed
        dataArray = np.asarray(dataList)
        fileVar[varName][:] = dataArray
kgao's avatar
kgao committed

        valid_min = database[varName][5]
        valid_max = database[varName][6]
kgao's avatar
kgao committed

        fileVar['qc_' + varName][:] = filter_array(dataArray, valid_min, valid_max)
kgao's avatar
kgao committed

    coordinates = ['lon', 'lat', 'alt', 'base_time', 'time_offset', 'station_name', 'time']

    for varName in fileVar:
        if varName.startswith('qc_'):
            continue

        elif varName in frame:
            continue

        elif varName in coordinates:
            continue

        else:
            fileVar['qc_' + varName][:] = np.full(len(list(frame.index)), np.byte(0b1), dtype='b')
kgao's avatar
kgao committed
    return ncFile


# The purpose of this method is to take a begin date, and end date
kgao's avatar
kgao committed
# input filenames and output filename and create a netCDF file 
# based upon that
# @param start time - a start datetime object
# @param end time - an end datetime object
# @param input filenames - list of filenames
# @param output filename - filename of the netcdf file

def create_giant_netCDF(start, end, inputFiles, outputName, zlib, chunkSize,
                        interval_width=None, database=parser.database):
    default = False

    if (chunkSize):
        chunksizes = [chunkSize]

    else:
        default = True
    frame = get_data(inputFiles)
kgao's avatar
kgao committed

    if (frame.empty):
kgao's avatar
kgao committed
        return False
kgao's avatar
kgao committed
    else:
        frame = minute_averages(frame)
            frame = average_over_interval(frame, interval_width)
        if (start and end):
kgao's avatar
kgao committed
            frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')]

        if (default):
            chunksizes = [len(list(frame.index))]

kgao's avatar
kgao committed
        firstStamp = dt.strptime(str(list(frame.index)[0]), '%Y-%m-%d %H:%M:%S')

        ncFile = Dataset(outputName, 'w', format='NETCDF4_CLASSIC')
kgao's avatar
kgao committed

        ncFile = write_dimensions(ncFile)

        ncFile = create_variables(ncFile, firstStamp, chunksizes, zlib, database)
kgao's avatar
kgao committed

        ncFile.inputFiles = ', '.join(inputFiles)
kgao's avatar
kgao committed

        ncFile = write_vars(ncFile, frame, database)
kgao's avatar
kgao committed

kgao's avatar
kgao committed
        ncFile.close()
def create_multiple(filenames, outputFilenames, zlib, chunkSize):
    if outputFilenames and len(filenames) != len(outputFilenames):
        raise ValueError(
            'Number of output filenames must equal number of input filenames when start and end times are not specified')
kgao's avatar
kgao committed

    results = []
    for idx, filename in enumerate(filenames):
        results.append(create_giant_netCDF(None, None, [filename], outputFilenames[idx], zlib, chunkSize))
kgao's avatar
kgao committed

    if not any(results):
kgao's avatar
kgao committed
        raise IOError('All ASCII files were empty')

# The purpose of this method is to take a string in the format
kgao's avatar
kgao committed
# YYYY-mm-ddTHH:MM:SS and convert that to a datetime object
# used in coordination with argparse -s and -e params
# @param datetime string
# @return datetime object

def _dt_convert(datetime_str):
    # parse datetime string, return datetime object
    try:
kgao's avatar
kgao committed
        return dt.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S')
    except:
        return dt.strptime(datetime_str, '%Y-%m-%d')
kgao's avatar
kgao committed
def main():
    import argparse
Matthew Westphall's avatar
Matthew Westphall committed
    argparser = argparse.ArgumentParser(description="Convert level_00 aoss tower data to level_a0",
                                        fromfile_prefix_chars='@')
kgao's avatar
kgao committed

    argparser.add_argument('-v', '--verbose', action="count", default=int(os.environ.get("VERBOSITY", 2)),
                           dest='verbosity',
                           help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)')

    argparser.add_argument('-s', '--start-time', type=_dt_convert,
                           help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given" +
                                ". Formats allowed: \'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
    argparser.add_argument('-e', '--end-time', type=_dt_convert,
                           help='End time of massive netcdf file. Formats allowed:' +
                                "\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
    argparser.add_argument('-i', '--interval', type=float,
                           help='Width of the interval to average input data over in minutes.' +
                                " If not specified, 1 is assumed. (Use 60 for one hour and 1440 for 1 day)")
    argparser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file')
    argparser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib')
kgao's avatar
kgao committed

    argparser.add_argument("input_files", nargs="+",
                           help="aoss_tower level_00 paths. Use @filename to red a list of paths from that file.")
kgao's avatar
kgao committed

    argparser.add_argument('-o', '--output', required=True, nargs="+", help="filename pattern or filename. " +
                                                                            "Should be along the lines of <filepath>/aoss_tower.YYYY-MM-DD.nc")
kgao's avatar
kgao committed

    levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
    level = levels[min(3, args.verbosity)]
kgao's avatar
kgao committed
    logging.basicConfig(level=level)

    database = mean_database if args.interval else parser.database
    if args.start_time and args.end_time:
        result = create_giant_netCDF(args.start_time, args.end_time, args.input_files, args.output[0], args.zlib,
                                     args.chunk_size,
                                     args.interval, database)
        if not result:
kgao's avatar
kgao committed
            raise IOError('An empty ASCII file was found')
    elif args.start_time:
kgao's avatar
kgao committed
        end_time = args.start_time.replace(hour=23, minute=59, second=59)
        result = create_giant_netCDF(args.start_time, end_time, args.input_files, args.output[0], args.zlib,
                                     args.chunk_size,
                                     args.interval, database)
        if not result:
kgao's avatar
kgao committed
            raise IOError('An empty ASCII file was found')
    elif args.end_time:
        raise ValueError('start time must be specified when end time is specified')
kgao's avatar
kgao committed
    else:
        create_multiple(args.input_files, args.output, args.zlib, args.chunk_size)

kgao's avatar
kgao committed
if __name__ == "__main__":
    sys.exit(main())