Skip to content
Snippets Groups Projects
nc.py 10.8 KiB
Newer Older
"""Generate AOSS Tower NetCDF4 files from Level 00 ASCII files."""
import logging
kgao's avatar
kgao committed
import os
kgao's avatar
kgao committed
import sys
from datetime import datetime
David Hoese's avatar
David Hoese committed
from pathlib import Path
import numpy as np
import pandas as pd
from metobscommon.util import calc
from metobscommon.util.nc import (
    calculate_wind_gust,
    create_variables,
    make_summary_dict,
    summary_over_interval,
    write_qc_for_unwritten,
    write_vars,
)
from netCDF4 import Dataset

from aosstower import schema
from aosstower.level_00 import parser
from aosstower.station import station_info
kgao's avatar
kgao committed

LOG = logging.getLogger(__name__)
SOFTWARE_VERSION = "00"
kgao's avatar
kgao committed

def _get_data(input_files):
    for filename in input_files:
        try:
            yield from parser.read_frames(filename)
        except FileNotFoundError:
            LOG.warning(f"Input data file was not found: {filename}")
            bad_files += 1
            continue
    if bad_files == len(input_files):
David Hoese's avatar
David Hoese committed
        msg = "No valid input data files found"
        raise ValueError(msg)
kgao's avatar
kgao committed

def get_data(input_files):
    frame = pd.DataFrame(_get_data(input_files))
    frame = frame.set_index("stamp")
    frame = frame.mask(frame == -99999.0).fillna(value=np.nan)

    for col_name in frame.columns:
        if col_name in schema.unit_conversions:
            conv_func = schema.unit_conversions[col_name]
            frame[col_name] = conv_func(frame[col_name])
kgao's avatar
kgao committed


def write_global_attributes(nc_file, input_sources, interval=None, datastream=None):
    # create global attributes
    nc_file.source = "surface observation"
    nc_file.Conventions = "ARM-1.2 CF-1.6"
    nc_file.institution = "University of Wisconsin - Madison (UW) Space Science and Engineering Center (SSEC)"
    nc_file.featureType = "timeSeries"
    nc_file.data_level = "b1"
kgao's avatar
kgao committed

    # monthly files end with .month.nc
    # these end with .day.nc
    if datastream:
        nc_file.datastream = datastream
    elif interval in ["1D"]:
        # assume this is a monthly file, averaged daily
        nc_file.datastream = f"aoss.tower.nc-1mo-1d.b1.v{SOFTWARE_VERSION}"
    elif interval in ["1T", "1min"]:
        # assume this is a daily file, averaged
        nc_file.datastream = f"aoss.tower.nc-1d-1m.b1.v{SOFTWARE_VERSION}"
    nc_file.software_version = SOFTWARE_VERSION
    nc_file.command_line = " ".join(sys.argv)

    # generate history
David Hoese's avatar
David Hoese committed
    nc_file.history = " ".join(platform.uname()) + " " + Path(__file__).name
    nc_file.input_source = input_sources[0]
    nc_file.input_sources = ", ".join(input_sources)


def create_giant_netcdf(
David Hoese's avatar
David Hoese committed
    input_files: list[Path],
    output_fn: Path,
    zlib,
    chunk_size,
    start=None,
    end=None,
    interval_width=None,
    summary=False,
    database=schema.database_dict,
    datastream=None,
):
    frame = get_data(input_files)
    if frame.empty:
David Hoese's avatar
David Hoese committed
        in_files_str = ", ".join(str(in_file) for in_file in input_files)
        msg = f"No data found from input files: {in_files_str}"
        raise ValueError(msg)
    # Add wind direction components so we can average wind direction properly
    frame["wind_east"], frame["wind_north"], _ = calc.wind_vector_components(frame["wind_speed"], frame["wind_dir"])
    if "air_temp" in frame and "rh" in frame and ("dewpoint" in database or "dewpoint_mean" in database):
David Hoese's avatar
David Hoese committed
        LOG.info("'dewpoint' is missing from the input file, will calculate it from air temp and relative humidity")
        frame["dewpoint"] = calc.dewpoint(frame["air_temp"], frame["rh"])
    # round up each 1 minute group so data at time T is the average of data
    # from T - 1 (exclusive) to T (inclusive).
    new_frame = frame.resample("5S", closed="right", loffset="5S").mean()

    # 2 minute rolling average of 5 second data (5 seconds * 24 = 120 seconds = 2 minutes)
    winds_frame_5s = new_frame[["wind_speed", "wind_east", "wind_north"]]
    winds_frame_2m = winds_frame_5s.rolling("2T").mean()
    winds_frame_2m["gust"] = calculate_wind_gust(winds_frame_5s["wind_speed"], winds_frame_2m["wind_speed"])

    # rolling average is used for mean output
    new_frame.update(winds_frame_2m)  # adds wind_speed, wind_east/north
    new_frame["gust"] = winds_frame_2m["gust"]
        frame = summary_over_interval(new_frame, interval_width)
        frame = new_frame.resample(interval_width, closed="right", loffset=interval_width).mean()
        frame["wind_dir"] = calc.wind_vector_degrees(frame["wind_east"], frame["wind_north"])
        frame["gust"] = new_frame["gust"].resample(interval_width, closed="right", loffset=interval_width).max()
    frame = frame.fillna(np.nan)
kgao's avatar
kgao committed

    if start and end:
        frame = frame[start.strftime("%Y-%m-%d %H:%M:%S") : end.strftime("%Y-%m-%d %H:%M:%S")]
David Hoese's avatar
David Hoese committed
    chunk_sizes = [chunk_size] if chunk_size and not isinstance(chunk_size, list | tuple) else [frame.shape[0]]
    first_stamp = datetime.strptime(str(frame.index[0]), "%Y-%m-%d %H:%M:%S")
    # NETCDF4_CLASSIC was chosen so that MFDataset reading would work. See:
    # http://unidata.github.io/netcdf4-python/#netCDF4.MFDataset
    nc_file = Dataset(output_fn, "w", format="NETCDF4_CLASSIC")
    nc_file.createDimension("time", None)
    nc_file.createDimension("max_len_station_name", 32)
    create_variables(nc_file, first_stamp, database, chunk_sizes, zlib)
    written_vars = write_vars(nc_file, frame, database, station_info)
    unwritten_vars = set(nc_file.variables.keys()) - set(written_vars)
    written_vars.extend(write_qc_for_unwritten(nc_file.variables, unwritten_vars))
    # remove any of the qc_ variables we just added
    unwritten_vars = set(nc_file.variables.keys()) - set(written_vars)
    for unwritten in unwritten_vars:
        LOG.warning(f"Variable created but no data written: {unwritten}")

    write_global_attributes(
        nc_file,
David Hoese's avatar
David Hoese committed
        [x.name for x in input_files],
        interval=interval_width,
        datastream=datastream,
    )
    nc_file.close()
    return nc_file
kgao's avatar
kgao committed

kgao's avatar
kgao committed
def _dt_convert(datetime_str):
    """Parse datetime string, return datetime object."""
        return datetime.strptime(datetime_str, "%Y%m%d")
    except ValueError:
            return datetime.strptime(datetime_str, "%Y-%m-%d")
        except ValueError:
            return datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S")
kgao's avatar
kgao committed
def main():
    import argparse
David Hoese's avatar
David Hoese committed
    from metobscommon.archive import setup_logging

    parser = argparse.ArgumentParser(
        description="Convert level_00 aoss tower data to level_b1",
        fromfile_prefix_chars="@",
    )

    parser.add_argument(
        "-v",
        "--verbose",
        action="count",
        default=int(os.environ.get("VERBOSITY", 2)),
        dest="verbosity",
        help="each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)",
    )
    parser.add_argument(
        "-l",
        "--log-file",
        dest="log_filepath",
        help="Alternate name for log file, default is to not create a file",
    )
    parser.add_argument(
        "-s",
        "--start-time",
        type=_dt_convert,
        help="Start time of massive netcdf file, if only -s is given, a netcdf file for only that day is given"
David Hoese's avatar
David Hoese committed
        ". Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
    )
    parser.add_argument(
        "-e",
        "--end-time",
        type=_dt_convert,
David Hoese's avatar
David Hoese committed
        help="End time of massive netcdf file. Formats allowed: 'YYYY-MM-DDTHH:MM:SS', 'YYYY-MM-DD'",
    )
    parser.add_argument(
        "-n",
        "--interval",
        default="1T",
        help="""Width of the interval to average input data
over in Pandas offset format. If not specified, 1 minute averages are used.
Use '1D' for daily or '5T' for 5 minute averages.
See this page for more details:
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases""",
    )
    parser.add_argument(
        "--summary",
        action="store_true",
        help="Create a file with _low, _mean, _high versions of every variable name",
    )
    parser.add_argument(
        "-f",
        "--fields",
        nargs="+",
        default=schema.met_vars,
        help="Variable names to include in the NetCDF file (base name, no suffixes)",
    )
    parser.add_argument("--chunk-size", type=int, help="chunk size for the netCDF file")
    parser.add_argument("-z", "--zlib", action="store_true", help="compress netCDF file with zlib")
    parser.add_argument("--data-stream", help="'datastream' global attribute to put in output file")

    parser.add_argument(
        "-i",
        "--input",
        dest="input_files",
        required=True,
        nargs="+",
        help="aoss_tower level_00 paths. Use @filename to read a list of paths from that filename.",
    )

    parser.add_argument(
        "-o",
        "--output",
        dest="output_files",
        required=True,
        nargs="+",
        help="""NetCDF filename(s) to create from input. If one
filename is specified then all input files are combined in to it. Otherwise
each input file is mapped to the corresponding output file.
    args = parser.parse_args()
kgao's avatar
kgao committed

    levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
David Hoese's avatar
David Hoese committed
    setup_logging(args.log_filepath, level=levels[min(3, args.verbosity)])
kgao's avatar
kgao committed

    if args.start_time and not args.end_time:
        args.end_time = args.start_time.replace(hour=23, minute=59, second=59)
    elif not args.start_time and args.end_time:
David Hoese's avatar
David Hoese committed
        msg = "start time must be specified when end time is specified"
        raise ValueError(msg)
    mini_database = {k: schema.database_dict[k] for k in args.fields}
    if args.summary:
        mini_database = make_summary_dict(mini_database)

    # Case 1: All inputs to 1 output file
    # Case 2: Each input in to a separate output file
    if args.output_files and len(args.output_files) not in [1, len(args.input_files)]:
David Hoese's avatar
David Hoese committed
        msg = "Output filenames must be 1 or the same length as input files"
        raise ValueError(msg)
    if args.output_files and len(args.output_files) == len(args.input_files):
        args.input_files = [[i] for i in args.input_files]
kgao's avatar
kgao committed
    else:
        args.input_files = [args.input_files]

    success = False
David Hoese's avatar
David Hoese committed
    for in_files, out_fn in zip(args.input_files, args.output_files, strict=True):
David Hoese's avatar
David Hoese committed
                [Path(in_file) for in_file in in_files],
                Path(out_fn),
                args.zlib,
                args.chunk_size,
                args.start_time,
                args.end_time,
                args.interval,
                args.summary,
                mini_database,
                args.data_stream,
            )
            success = True
        except (ValueError, TypeError):
            LOG.error(f"Could not generate NetCDF file for {in_files}", exc_info=True)
David Hoese's avatar
David Hoese committed
        msg = "All ASCII files were empty or could not be read"
        raise OSError(msg)
kgao's avatar
kgao committed
if __name__ == "__main__":
    sys.exit(main())