Skip to content
Snippets Groups Projects
create_test_file.py 6.55 KiB
Newer Older
"""Create test files."""
from os.path import exists

import numpy as np
import numpy.typing as npt
import xarray as xr
from rich.logging import RichHandler
from ruamel.yaml import YAML

import mvcm.read_data as rd

_LOG_FORMAT = "%(message)s"
logging.basicConfig(level="NOTSET", datefmt="[%X]", format=_LOG_FORMAT, handlers=[RichHandler()])
logger = logging.getLogger(__name__)

# #################################################################### #
# TEST CASES
# data:
_datapath = "/ships19/hercules/pveglio/mvcm_viirs_hires"
_outpath = "/ships19/hercules/pveglio/mvcm_git_tests"
_thresholds_file = f"{_outpath}/thresholds.mvcm.snpp.v0.0.1.yaml"

# The following sets of files try to cover all the possible scenes found
# in the mvcm. the list of scenes per granule is reported on top of every
# VNP02/VNP03 group

########################
# Scene List:
# - Land_Day_Coast
# - Land_Day_Desert_Coast
# - Land_Day_Desert
# - Land_Day
# - Ocean_Day
# _fname_mod02 = f"{_datapath}/VNP02MOD.A2022173.1312.001.2022174011547.uwssec_bowtie_corrected.nc"
# _fname_mod03 = f"{_datapath}/VNP03MOD.A2022173.1312.001.2022174012746.uwssec.nc"
# _fname_img02 = f"{_datapath}/VNP02IMG.A2022173.1312.001.2022174011547.uwssec_bowtie_corrected.nc"
# _fname_img03 = f"{_datapath}/VNP03IMG.A2022173.1312.001.2022174012746.uwssec.nc"

########################
# Scene List:
# - Polar_Day_Coast
# - Polar_Day_Desert_Coast
# - Polar_Day_Desert
# - Polar_Day_Ocean
_fname_mod02 = f"{_datapath}/VNP02MOD.A2022173.1324.001.2022174014257.uwssec_bowtie_corrected.nc"
_fname_mod03 = f"{_datapath}/VNP03MOD.A2022173.1324.001.2022174012743.uwssec.nc"
# _fname_img02 = f"{_datapath}/VNP02IMG.A2022173.1324.001.2022174014257.uwssec_bowtie_corrected.nc"
# _fname_img03 = f"{_datapath}/VNP03IMG.A2022173.1324.001.2022174012743.uwssec.nc"

########################
# Scene List:
# -
#

# output filename root
_out_fname = f"{_outpath}/test_scene"

# ancillary files:
_geos_atm_1 = f"{_datapath}/GEOS.fpit.asm.inst3_2d_asm_Nx.GEOS5124.20220622_1200.V01.nc4"
_geos_atm_2 = f"{_datapath}/GEOS.fpit.asm.inst3_2d_asm_Nx.GEOS5124.20220622_1500.V01.nc4"
_geos_land = f"{_datapath}/GEOS.fpit.asm.tavg1_2d_lnd_Nx.GEOS5124.20220622_1330.V01.nc4"
_geos_ocean = f"{_datapath}/GEOS.fpit.asm.tavg1_2d_ocn_Nx.GEOS5124.20220622_1330.V01.nc4"
_geos_constants = f"{_datapath}/GEOS.fp.asm.const_2d_asm_Nx.00000000_0000.V01.nc4"
_ndvi_file = f"{_datapath}/NDVI.FM.c004.v2.0.WS.00-04.177.hdf"
_sst_file = f"{_datapath}/oisst.20220622"
_eco_file = f"{_datapath}/goge1_2_img.v1"

# #################################################################### #

# Scenes still unaccounted for:
# "Ocean_Night",
# "Polar_Night_Ocean",
# "Polar_Day_Land",
# "Polar_Day_Snow",
# "Land_Night",
# "Polar_Night_Land",
# "Polar_Night_Snow",
# "Day_Snow",
# "Night_Snow",

_scene_list = [
    "Land_Day",
    "Land_Day_Coast",
    "Land_Day_Desert",
    "Land_Day_Desert_Coast",
    "Ocean_Day",
    "Ocean_Night",
    "Polar_Day_Ocean",
    "Polar_Night_Ocean",
    "Polar_Day_Land",
    "Polar_Day_Coast",
    "Polar_Day_Desert",
    "Polar_Day_Desert_Coast",
    "Polar_Day_Snow",
    "Land_Night",
    "Polar_Night_Land",
    "Polar_Night_Snow",
    "Day_Snow",
    "Night_Snow",
]

_default_scene_size = 10_000


def find_scene(scene_arr: npt.NDArray[np.float64], x: int = 100, y: int = 100) -> tuple:
    """Find X by Y area with same scene type."""
    i, j = None, None
    flag = False
    for i in range(scene_arr.shape[0] - x):
        for j in range(scene_arr.shape[1] - y):
            if (scene_arr[i : i + x, j : j + y] == np.ones((x, y))).all():
                flag = True
                break
        if flag is True:
            break
    if flag is False:
        i = None
        j = None

    return (i, j)


def write_test_file() -> None:
    """Write test file."""
    for scene in _scene_list:
        test_file = f"{_outpath}/test_scene_{scene}.nc"
        if exists(test_file):
            ds = xr.open_dataset(test_file)
            ds.to_netcdf(f"{_outpath}/full_test_file.nc", mode="a", group=scene)


def main(
    mod02: str,
    mod03: str,
    geos_atm_1: str,
    geos_atm_2: str,
    geos_land: str,
    geos_ocean: str,
    geos_constants: str,
    ndvi_file: str,
    sst_file: str,
    eco_file: str,
    data_path: str,
    out_fname: str,
) -> None:
    """Create test files."""
    file_names = {
        "MOD02": f"{mod02}",
        "MOD03": f"{mod03}",
        "IMG02": None,
        "IMG03": None,
        "GEOS_atm_1": f"{geos_atm_1}",
        "GEOS_atm_2": f"{geos_atm_2}",
        "GEOS_land": f"{geos_land}",
        "GEOS_ocean": f"{geos_ocean}",
        "GEOS_constants": f"{geos_constants}",
        "NDVI": f"{ndvi_file}",
        "SST": f"{sst_file}",
        "ECO": f"{eco_file}",
        "ANC_DIR": f"{data_path}/ancillary",
    }

    with open(thresholds_file) as f:
        text = f.read()
    thresholds = YAML(typ="safe").load(text)
    viirs_data = rd.get_data("snpp", "viirs", file_names, thresholds, hires=False)

    out_ds = xr.Dataset()

    for scene in _scene_list:
        if exists(f"{out_fname}_{scene}.nc"):
            continue
        if len(viirs_data[scene].values[viirs_data[scene].values == 1]) < _default_scene_size:
            continue
        if scene in [
            "Land_Day_Coast",
            "Land_Day_Desert_Coast",
            "Polar_Day_Coast",
            "Polar_Day_Desert_Coast",
        ]:
            scn_idx = list(np.nonzero(viirs_data[scene].values == 1))
            scn_idx = [scn_idx[0][:100], scn_idx[1][:100]]
        else:
            scn_tpl = find_scene(viirs_data[scene].values)
            if scn_tpl[0] is None or scn_tpl[1] is None:
                continue
            scn_idx = [
                np.arange(scn_tpl[0], scn_tpl[0] + 100),
                np.arange(scn_tpl[1], scn_tpl[1] + 100),
            ]
        if len(scn_idx[0]) == 0:
            continue

        sample_data = xr.Dataset()

        for var in viirs_data:
            sample_data[var] = viirs_data[var][scn_idx[0], scn_idx[1]]

        sample_data.attrs["scene_name"] = scene

        out_ds = sample_data
        out_ds.to_netcdf(f"{out_fname}_{scene}.nc")


if __name__ == "__main__":
    main(
        mod02=_fname_mod02,
        mod03=_fname_mod03,
        thresholds_file=_thresholds_file,
        geos_atm_1=_geos_atm_1,
        geos_atm_2=_geos_atm_2,
        geos_land=_geos_land,
        geos_ocean=_geos_ocean,
        geos_constants=_geos_constants,
        ndvi_file=_ndvi_file,
        sst_file=_sst_file,
        eco_file=_eco_file,
        data_path=_datapath,
        out_fname=_out_fname,
    )