#!/usr/bin/env python # encoding: utf8 """Generate AOSS Tower NetCDF4 files from Level 00 ASCII files. """ import os import sys import logging import pandas as pd from datetime import datetime from netCDF4 import Dataset import numpy as np import platform from aosstower import schema from aosstower.station import station_info from aosstower.level_00 import parser from metobscommon.util import calc from metobscommon.util.nc import (make_summary_dict, write_vars, calculate_wind_gust, summary_over_interval, create_variables) LOG = logging.getLogger(__name__) SOFTWARE_VERSION = '00' def _get_data(input_files): bad_files = 0 for filename in input_files: try: yield from parser.read_frames(filename) except FileNotFoundError: LOG.warning("Input data file was not found: {}".format(filename)) bad_files += 1 continue if bad_files == len(input_files): raise ValueError("No valid input data files found") def get_data(input_files): frame = pd.DataFrame(_get_data(input_files)) frame.set_index('stamp', inplace=True) frame.mask(frame == -99999., inplace=True) frame.fillna(value=np.nan, inplace=True) return frame def write_global_attributes(nc_file, input_sources, interval=None, datastream=None): # create global attributes nc_file.source = 'surface observation' nc_file.Conventions = 'ARM-1.2 CF-1.6' nc_file.institution = 'University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)' nc_file.featureType = 'timeSeries' nc_file.data_level = 'b1' # monthly files end with .month.nc # these end with .day.nc if datastream: nc_file.datastream = datastream elif interval in ['1D']: # assume this is a monthly file, averaged daily nc_file.datastream = 'aoss.tower.nc-1mo-1d.b1.v{software_version}'.format(software_version=SOFTWARE_VERSION) elif interval in ['1T', '1min']: # assume this is a daily file, averaged nc_file.datastream = 'aoss.tower.nc-1d-1m.b1.v{software_version}'.format(software_version=SOFTWARE_VERSION) nc_file.software_version = SOFTWARE_VERSION nc_file.command_line = " ".join(sys.argv) # generate history nc_file.history = ' '.join(platform.uname()) + " " + os.path.basename(__file__) nc_file.input_source = input_sources[0] nc_file.input_sources = ', '.join(input_sources) def create_giant_netcdf(input_files, output_fn, zlib, chunk_size, start=None, end=None, interval_width=None, summary=False, database=schema.database_dict, datastream=None): frame = get_data(input_files) if frame.empty: raise ValueError("No data found from input files: {}".format(", ".join(input_files))) # Add wind direction components so we can average wind direction properly frame['wind_east'], frame['wind_north'], _ = calc.wind_vector_components(frame['wind_speed'], frame['wind_dir']) if 'air_temp' in frame and 'rh' in frame and \ ('dewpoint' in database or 'dewpoint_mean' in database): LOG.info("'dewpoint' is missing from the input file, will calculate " "it from air temp and relative humidity") frame['dewpoint'] = calc.dewpoint(frame['air_temp'], frame['rh']) # round up each 1 minute group so data at time T is the average of data # from T - 1 (exclusive) to T (inclusive). new_frame = frame.resample('5S', closed='right', loffset='5S').mean() # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes) winds_frame_5s = new_frame[['wind_speed', 'wind_east', 'wind_north']] winds_frame_2m = winds_frame_5s.rolling('2T').mean() winds_frame_2m['gust'] = calculate_wind_gust(winds_frame_5s['wind_speed'], winds_frame_2m['wind_speed']) # rolling average is used for mean output new_frame.update(winds_frame_2m) # adds wind_speed, wind_east/north new_frame['gust'] = winds_frame_2m['gust'] # average the values if summary: frame = summary_over_interval(new_frame, interval_width) else: frame = new_frame.resample(interval_width, closed='right', loffset=interval_width).mean() frame['wind_dir'] = calc.wind_vector_degrees(frame['wind_east'], frame['wind_north']) frame['gust'] = new_frame['gust'].resample(interval_width, closed='right', loffset=interval_width).max() frame.fillna(np.nan, inplace=True) if start and end: frame = frame[start.strftime('%Y-%m-%d %H:%M:%S'): end.strftime('%Y-%m-%d %H:%M:%S')] if chunk_size and not isinstance(chunk_size, (list, tuple)): chunk_sizes = [chunk_size] else: chunk_sizes = [frame.shape[0]] first_stamp = datetime.strptime(str(frame.index[0]), '%Y-%m-%d %H:%M:%S') # NETCDF4_CLASSIC was chosen so that MFDataset reading would work. See: # http://unidata.github.io/netcdf4-python/#netCDF4.MFDataset nc_file = Dataset(output_fn, 'w', format='NETCDF4_CLASSIC') nc_file.createDimension('time', None) nc_file.createDimension('max_len_station_name', 32) create_variables(nc_file, first_stamp, database, chunk_sizes, zlib) write_vars(nc_file, frame, database, station_info) write_global_attributes(nc_file, [os.path.basename(x) for x in input_files], interval=interval_width, datastream=datastream) nc_file.close() return nc_file def _dt_convert(datetime_str): """Parse datetime string, return datetime object""" try: return datetime.strptime(datetime_str, '%Y%m%d') except ValueError: try: return datetime.strptime(datetime_str, '%Y-%m-%d') except ValueError: return datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S') def main(): import argparse from metobscommon.archive import setup_logging parser = argparse.ArgumentParser(description="Convert level_00 aoss tower data to level_b1", fromfile_prefix_chars='@') parser.add_argument('-v', '--verbose', action="count", default=int(os.environ.get("VERBOSITY", 2)), dest='verbosity', help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)') parser.add_argument('-l', '--log-file', dest="log_filepath", help="Alternate name for log file, default is to not create a file") parser.add_argument('-s', '--start-time', type=_dt_convert, help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given" + ". Formats allowed: \'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'") parser.add_argument('-e', '--end-time', type=_dt_convert, help='End time of massive netcdf file. Formats allowed:' + "\'YYYY-MM-DDTHH:MM:SS\', \'YYYY-MM-DD\'") parser.add_argument('-n', '--interval', default='1T', help="""Width of the interval to average input data over in Pandas offset format. If not specified, 1 minute averages are used. Use '1D' for daily or '5T' for 5 minute averages. See this page for more details: http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""") parser.add_argument('--summary', action='store_true', help="Create a file with _low, _mean, _high versions of every variable name") parser.add_argument('-f', '--fields', nargs='+', default=schema.met_vars, help="Variable names to include in the NetCDF file (base name, no suffixes)") parser.add_argument('--chunk-size', type=int, help='chunk size for the netCDF file') parser.add_argument('-z', '--zlib', action='store_true', help='compress netCDF file with zlib') parser.add_argument('--data-stream', help="'datastream' global attribute to put in output file") parser.add_argument('-i', '--input', dest='input_files', required=True, nargs="+", help="aoss_tower level_00 paths. Use @filename to read a list of paths from that filename.") parser.add_argument('-o', '--output', dest='output_files', required=True, nargs="+", help="""NetCDF filename(s) to create from input. If one filename is specified then all input files are combined in to it. Otherwise each input file is mapped to the corresponding output file. """) args = parser.parse_args() levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] setup_logging(args.log_filepath, level=levels[min(3, args.verbosity)]) if args.start_time and not args.end_time: args.end_time = args.start_time.replace(hour=23, minute=59, second=59) elif not args.start_time and args.end_time: raise ValueError('start time must be specified when end time is specified') mini_database = {k: schema.database_dict[k] for k in args.fields} if args.summary: mini_database = make_summary_dict(mini_database) # Case 1: All inputs to 1 output file # Case 2: Each input in to a separate output file if args.output_files and len(args.output_files) not in [1, len(args.input_files)]: raise ValueError('Output filenames must be 1 or the same length as input files') elif args.output_files and len(args.output_files) == len(args.input_files): args.input_files = [[i] for i in args.input_files] else: args.input_files = [args.input_files] success = False for in_files, out_fn in zip(args.input_files, args.output_files): try: create_giant_netcdf(in_files, out_fn, args.zlib, args.chunk_size, args.start_time, args.end_time, args.interval, args.summary, mini_database, args.data_stream) success = True except (ValueError, TypeError): LOG.error("Could not generate NetCDF file for {}".format(in_files), exc_info=True) if not success: raise IOError('All ASCII files were empty or could not be read') if __name__ == "__main__": sys.exit(main())