Skip to content
Snippets Groups Projects
nc.py 10.6 KiB
Newer Older
#!/usr/bin/env python
# encoding: utf8
"""Generate AOSS Tower NetCDF4 files from Level 00 ASCII files.
"""
kgao's avatar
kgao committed
import os
import sys
import logging
import pandas as pd
from datetime import datetime
kgao's avatar
kgao committed
from netCDF4 import Dataset
import numpy as np
kgao's avatar
kgao committed
import platform
from aosstower import schema
from aosstower.station import station_info
from aosstower.level_00 import parser
from metobscommon.util import calc
from metobscommon.util.nc import (make_summary_dict, write_vars,
                                  calculate_wind_gust, summary_over_interval,
                                  create_variables, write_qc_for_unwritten)
kgao's avatar
kgao committed

LOG = logging.getLogger(__name__)
SOFTWARE_VERSION = '00'
kgao's avatar
kgao committed

def _get_data(input_files):
    for filename in input_files:
        try:
            yield from parser.read_frames(filename)
        except FileNotFoundError:
            LOG.warning("Input data file was not found: {}".format(filename))
            bad_files += 1
            continue
    if bad_files == len(input_files):
        raise ValueError("No valid input data files found")
kgao's avatar
kgao committed

def get_data(input_files):
    frame = pd.DataFrame(_get_data(input_files))
    frame = frame.set_index('stamp').mask(frame == -99999.).fillna(value=np.nan)
kgao's avatar
kgao committed


def write_global_attributes(nc_file, input_sources, interval=None, datastream=None):
    # create global attributes
    nc_file.source = 'surface observation'
    nc_file.Conventions = 'ARM-1.2 CF-1.6'
    nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)'
    nc_file.featureType = 'timeSeries'
    nc_file.data_level = 'b1'
kgao's avatar
kgao committed

    # monthly files end with .month.nc
    # these end with .day.nc
    if datastream:
        nc_file.datastream = datastream
    elif interval in ['1D']:
        # assume this is a monthly file, averaged daily
        nc_file.datastream = 'aoss.tower.nc-1mo-1d.b1.v{software_version}'.format(software_version=SOFTWARE_VERSION)
    elif interval in ['1T', '1min']:
        # assume this is a daily file, averaged
        nc_file.datastream = 'aoss.tower.nc-1d-1m.b1.v{software_version}'.format(software_version=SOFTWARE_VERSION)
    nc_file.software_version = SOFTWARE_VERSION
    nc_file.command_line = " ".join(sys.argv)

    # generate history
    nc_file.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__)
    nc_file.input_source = input_sources[0]
    nc_file.input_sources = ', '.join(input_sources)
kgao's avatar
kgao committed

def create_giant_netcdf(input_files, output_fn, zlib, chunk_size,
                        start=None, end=None, interval_width=None,
                        database=schema.database_dict, datastream=None):
    frame = get_data(input_files)
    if frame.empty:
        raise ValueError("No data found from input files: {}".format(", ".join(input_files)))
    # Add wind direction components so we can average wind direction properly
    frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir'])
    if 'air_temp' in frame and 'rh' in frame and \
            ('dewpoint' in database or 'dewpoint_mean' in database):
        LOG.info("'dewpoint' is missing from the input file, will calculate "
                 "it from air temp and relative humidity")
        frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh'])

    # round up each 1 minute group so data at time T is the average of data
    # from T - 1 (exclusive) to T (inclusive).
    new_frame = frame.resample('5S', closed='right', loffset='5S').mean()

    # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes)
    winds_frame_5s = new_frame[['wind_speed', 'wind_east', 'wind_north']]
    winds_frame_2m = winds_frame_5s.rolling('2T').mean()
    winds_frame_2m['gust'] = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed'])

    # rolling average is used for mean output
    new_frame.update(winds_frame_2m)  # adds wind_speed, wind_east/north
    new_frame['gust'] = winds_frame_2m['gust']

    # average the values
    if summary:
        frame = summary_over_interval(new_frame, interval_width).fillna(np.nan)
        frame = new_frame.resample(interval_width, closed='right', loffset=interval_width).mean().fillna(np.nan)
        frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north'])
        frame['gust'] = new_frame['gust'].resample(interval_width, closed='right', loffset=interval_width).max()
kgao's avatar
kgao committed

    if start and end:
        frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')]
    if chunk_size and not isinstance(chunk_size, (list, tuple)):
        chunk_sizes = [chunk_size]
kgao's avatar
kgao committed
    else:
        chunk_sizes = [frame.shape[0]]
    first_stamp = datetime.strptime(str(frame.index[0]), '%Y-%m-%d %H:%M:%S')
    # NETCDF4_CLASSIC was chosen so that MFDataset reading would work. See:
    # http://unidata.github.io/netcdf4-python/#netCDF4.MFDataset
    nc_file = Dataset(output_fn, 'w', format='NETCDF4_CLASSIC')
    nc_file.createDimension('time', None)
    nc_file.createDimension('max_len_station_name', 32)
    create_variables(nc_file, first_stamp, database, chunk_sizes, zlib)
    written_vars = write_vars(nc_file, frame, database, station_info)
    unwritten_vars = set(nc_file.variables.keys()) - set(written_vars)
    written_vars.extend(write_qc_for_unwritten(nc_file.variables,
                                               unwritten_vars))
    # remove any of the qc_ variables we just added
    unwritten_vars = set(nc_file.variables.keys()) - set(written_vars)
    for unwritten in unwritten_vars:
        LOG.warning("Variable created but no data written: {}".format(unwritten))

    write_global_attributes(nc_file,
                            [os.path.basename(x) for x in input_files],
                            interval=interval_width,
                            datastream=datastream)
    nc_file.close()
    return nc_file
kgao's avatar
kgao committed

kgao's avatar
kgao committed
def _dt_convert(datetime_str):
    """Parse datetime string, return datetime object"""
        return datetime.strptime(datetime_str, '%Y%m%d')
    except ValueError:
        try:
            return datetime.strptime(datetime_str, '%Y-%m-%d')
        except ValueError:
            return datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S')
kgao's avatar
kgao committed
def main():
    import argparse
David Hoese's avatar
David Hoese committed
    from metobscommon.archive import setup_logging
    parser = argparse.ArgumentParser(description="Convert level_00 aoss tower data to level_b1",
                                     fromfile_prefix_chars='@')

    parser.add_argument('-v', '--verbose', action="count", default=int(os.environ.get("VERBOSITY", 2)),
                        dest='verbosity',
                        help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)')
David Hoese's avatar
David Hoese committed
    parser.add_argument('-l', '--log-file', dest="log_filepath",
                        help="Alternate name for log file, default is to not create a file")
    parser.add_argument('-s', '--start-time', type=_dt_convert,
                        help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given" +
                             ". Formats allowed: \'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
    parser.add_argument('-e', '--end-time', type=_dt_convert,
                        help='End time of massive netcdf file. Formats allowed:' +
                                "\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'")
    parser.add_argument('-n', '--interval', default='1T',
                        help="""Width of the interval to average input data
over in Pandas offset format. If not specified, 1 minute averages are used.
Use '1D' for daily or '5T' for 5 minute averages.
See this page for more details:
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""")
    parser.add_argument('--summary', action='store_true',
                        help="Create a file with _low, _mean, _high versions of every variable name")
    parser.add_argument('-f', '--fields', nargs='+', default=schema.met_vars,
                        help="Variable names to include in the NetCDF file (base name, no suffixes)")
    parser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file')
    parser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib')
    parser.add_argument('--data-stream', help="'datastream' global attribute to put in output file")
kgao's avatar
kgao committed

    parser.add_argument('-i', '--input', dest='input_files', required=True, nargs="+",
                        help="aoss_tower level_00 paths. Use @filename to read a list of paths from that filename.")
kgao's avatar
kgao committed

    parser.add_argument('-o', '--output', dest='output_files', required=True, nargs="+",
                        help="""NetCDF filename(s) to create from input. If one
filename is specified then all input files are combined in to it. Otherwise
each input file is mapped to the corresponding output file.
""")
    args = parser.parse_args()
kgao's avatar
kgao committed

    levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
David Hoese's avatar
David Hoese committed
    setup_logging(args.log_filepath, level=levels[min(3, args.verbosity)])
kgao's avatar
kgao committed

    if args.start_time and not args.end_time:
        args.end_time = args.start_time.replace(hour=23, minute=59, second=59)
    elif not args.start_time and args.end_time:
        raise ValueError('start time must be specified when end time is specified')
    mini_database = {k: schema.database_dict[k] for k in args.fields}
    if args.summary:
        mini_database = make_summary_dict(mini_database)

    # Case 1: All inputs to 1 output file
    # Case 2: Each input in to a separate output file
    if args.output_files and len(args.output_files) not in [1, len(args.input_files)]:
        raise ValueError('Output filenames must be 1 or the same length as input files')
    elif args.output_files and len(args.output_files) == len(args.input_files):
        args.input_files = [[i] for i in args.input_files]
kgao's avatar
kgao committed
    else:
        args.input_files = [args.input_files]

    success = False
    for in_files, out_fn in zip(args.input_files, args.output_files):
        try:
            create_giant_netcdf(in_files, out_fn, args.zlib,
                                args.chunk_size, args.start_time,
                                args.end_time, args.interval, args.summary,
                                mini_database, args.data_stream)
            success = True
        except (ValueError, TypeError):
            LOG.error("Could not generate NetCDF file for {}".format(in_files), exc_info=True)
    if not success:
        raise IOError('All ASCII files were empty or could not be read')
kgao's avatar
kgao committed
if __name__ == "__main__":
    sys.exit(main())