Newer
Older
"""Generate AOSS Tower NetCDF4 files from Level 00 ASCII files."""
import logging
from datetime import datetime
import pandas as pd
from metobscommon.util import calc
from metobscommon.util.nc import (
calculate_wind_gust,
create_variables,
make_summary_dict,
summary_over_interval,
write_qc_for_unwritten,
write_vars,
)
from netCDF4 import Dataset
from aosstower import schema
from aosstower.level_00 import parser
from aosstower.station import station_info
SOFTWARE_VERSION = "00"
def _get_data(input_files):
try:
yield from parser.read_frames(filename)
except FileNotFoundError:
LOG.warning(f"Input data file was not found: {filename}")
bad_files += 1
continue
if bad_files == len(input_files):
msg = "No valid input data files found"
raise ValueError(msg)
def get_data(input_files):
frame = pd.DataFrame(_get_data(input_files))
frame = frame.set_index("stamp")
frame = frame.mask(frame == -99999.0).fillna(value=np.nan)
for col_name in frame.columns:
if col_name in schema.unit_conversions:
conv_func = schema.unit_conversions[col_name]
frame[col_name] = conv_func(frame[col_name])
def write_global_attributes(nc_file, input_sources, interval=None, datastream=None):
nc_file.source = "surface observation"
nc_file.Conventions = "ARM-1.2 CF-1.6"
nc_file.institution = "University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)"
nc_file.featureType = "timeSeries"
nc_file.data_level = "b1"
# monthly files end with .month.nc
# these end with .day.nc
if datastream:
nc_file.datastream = datastream
elif interval in ["1D"]:
# assume this is a monthly file, averaged daily
nc_file.datastream = f"aoss.tower.nc-1mo-1d.b1.v{SOFTWARE_VERSION}"
elif interval in ["1T", "1min"]:
# assume this is a daily file, averaged
nc_file.datastream = f"aoss.tower.nc-1d-1m.b1.v{SOFTWARE_VERSION}"
nc_file.software_version = SOFTWARE_VERSION
nc_file.command_line = " ".join(sys.argv)
# generate history
nc_file.history = " ".join(platform.uname()) + " " + Path(__file__).name
nc_file.input_source = input_sources[0]
nc_file.input_sources = ", ".join(input_sources)
def create_giant_netcdf(
zlib,
chunk_size,
start=None,
end=None,
interval_width=None,
summary=False,
database=schema.database_dict,
datastream=None,
):
frame = get_data(input_files)
if frame.empty:
in_files_str = ", ".join(str(in_file) for in_file in input_files)
msg = f"No data found from input files: {in_files_str}"
raise ValueError(msg)
# Add wind direction components so we can average wind direction properly
frame["wind_east"], frame["wind_north"], _ = calc.wind_vector_components(frame["wind_speed"], frame["wind_dir"])
if "air_temp" in frame and "rh" in frame and ("dewpoint" in database or "dewpoint_mean" in database):
LOG.info("'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity")
frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"])
# round up each 1 minute group so data at time T is the average of data
# from T - 1 (exclusive) to T (inclusive).
new_frame = frame.resample("5S", closed="right", loffset="5S").mean()
# 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes)
winds_frame_5s = new_frame[["wind_speed", "wind_east", "wind_north"]]
winds_frame_2m = winds_frame_5s.rolling("2T").mean()
winds_frame_2m["gust"] = calculate_wind_gust(winds_frame_5s["wind_speed"], winds_frame_2m["wind_speed"])
# rolling average is used for mean output
new_frame.update(winds_frame_2m) # adds wind_speed, wind_east/north
new_frame["gust"] = winds_frame_2m["gust"]
# average the values
if summary:
frame = summary_over_interval(new_frame, interval_width)
else:
frame = new_frame.resample(interval_width, closed="right", loffset=interval_width).mean()
frame["wind_dir"] = calc.wind_vector_degrees(frame["wind_east"], frame["wind_north"])
frame["gust"] = new_frame["gust"].resample(interval_width, closed="right", loffset=interval_width).max()
frame = frame[start.strftime("%Y-%m-%d %H:%M:%S") : end.strftime("%Y-%m-%d %H:%M:%S")]
chunk_sizes = [chunk_size] if chunk_size and not isinstance(chunk_size, list | tuple) else [frame.shape[0]]
first_stamp = datetime.strptime(str(frame.index[0]), "%Y-%m-%d %H:%M:%S")
# NETCDF4_CLASSIC was chosen so that MFDataset reading would work. See:
# http://unidata.github.io/netcdf4-python/#netCDF4.MFDataset
nc_file = Dataset(output_fn, "w", format="NETCDF4_CLASSIC")
nc_file.createDimension("time", None)
nc_file.createDimension("max_len_station_name", 32)
create_variables(nc_file, first_stamp, database, chunk_sizes, zlib)
written_vars = write_vars(nc_file, frame, database, station_info)
unwritten_vars = set(nc_file.variables.keys()) - set(written_vars)
written_vars.extend(write_qc_for_unwritten(nc_file.variables, unwritten_vars))
# remove any of the qc_ variables we just added
unwritten_vars = set(nc_file.variables.keys()) - set(written_vars)
for unwritten in unwritten_vars:
LOG.warning(f"Variable created but no data written: {unwritten}")
write_global_attributes(
nc_file,
interval=interval_width,
datastream=datastream,
)
nc_file.close()
return nc_file
Matthew Westphall
committed
"""Parse datetime string, return datetime object."""
return datetime.strptime(datetime_str, "%Y%m%d")
return datetime.strptime(datetime_str, "%Y-%m-%d")
return datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S")
Matthew Westphall
committed
parser = argparse.ArgumentParser(
description="Convert level_00 aoss tower data to level_b1",
fromfile_prefix_chars="@",
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=int(os.environ.get("VERBOSITY", 2)),
dest="verbosity",
help="each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)",
)
parser.add_argument(
"-l",
"--log-file",
dest="log_filepath",
help="Alternate name for log file, default is to not create a file",
)
parser.add_argument(
"-s",
"--start-time",
type=_dt_convert,
help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given"
". Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
)
parser.add_argument(
"-e",
"--end-time",
type=_dt_convert,
help="End time of massive netcdf file. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
)
parser.add_argument(
"-n",
"--interval",
default="1T",
help="""Width of the interval to average input data
over in Pandas offset format. If not specified, 1 minute averages are used.
Use '1D' for daily or '5T' for 5 minute averages.
See this page for more details:
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""",
)
parser.add_argument(
"--summary",
action="store_true",
help="Create a file with _low, _mean, _high versions of every variable name",
)
parser.add_argument(
"-f",
"--fields",
nargs="+",
default=schema.met_vars,
help="Variable names to include in the NetCDF file (base name, no suffixes)",
)
parser.add_argument("--chunk-size", type=int, help="chunk size for the netCDF file")
parser.add_argument("-z", "--zlib", action="store_true", help="compress netCDF file with zlib")
parser.add_argument("--data-stream", help="'datastream' global attribute to put in output file")
parser.add_argument(
"-i",
"--input",
dest="input_files",
required=True,
nargs="+",
help="aoss_tower level_00 paths. Use @filename to read a list of paths from that filename.",
)
parser.add_argument(
"-o",
"--output",
dest="output_files",
required=True,
nargs="+",
help="""NetCDF filename(s) to create from input. If one
filename is specified then all input files are combined in to it. Otherwise
each input file is mapped to the corresponding output file.
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
setup_logging(args.log_filepath, level=levels[min(3, args.verbosity)])
if args.start_time and not args.end_time:
args.end_time = args.start_time.replace(hour=23, minute=59, second=59)
elif not args.start_time and args.end_time:
msg = "start time must be specified when end time is specified"
raise ValueError(msg)
mini_database = {k: schema.database_dict[k] for k in args.fields}
if args.summary:
mini_database = make_summary_dict(mini_database)
# Case 1: All inputs to 1 output file
# Case 2: Each input in to a separate output file
if args.output_files and len(args.output_files) not in [1, len(args.input_files)]:
msg = "Output filenames must be 1 or the same length as input files"
raise ValueError(msg)
if args.output_files and len(args.output_files) == len(args.input_files):
args.input_files = [[i] for i in args.input_files]
args.input_files = [args.input_files]
success = False
for in_files, out_fn in zip(args.input_files, args.output_files, strict=True):
create_giant_netcdf(
[Path(in_file) for in_file in in_files],
Path(out_fn),
args.zlib,
args.chunk_size,
args.start_time,
args.end_time,
args.interval,
args.summary,
mini_database,
args.data_stream,
)
success = True
except (ValueError, TypeError):
LOG.error(f"Could not generate NetCDF file for {in_files}", exc_info=True)
if not success:
msg = "All ASCII files were empty or could not be read"
raise OSError(msg)
Matthew Westphall
committed