Skip to content
Snippets Groups Projects
Commit a65f82ca authored by Paolo Veglio's avatar Paolo Veglio
Browse files

Merge branch 'sipsanc' into 'main'

merging sipsanc in main

See merge request !1
parents 23e2b92b a1d55eb3
Branches
No related tags found
1 merge request!1merging sipsanc in main
Pipeline #57670 failed
"""Set up version."""
__version__ = "0.3.3"
__version__ = "0.3.4"
......@@ -50,6 +50,7 @@ def main(
debug: bool = False,
hires_only: bool = False,
mvcm_threshold_file: str | None,
use_sipsanc: bool = False,
) -> None:
"""MVCM main function.
......@@ -97,7 +98,10 @@ def main(
Debug mode
hires_only: bool
Only run MVCM with high resolution
mvcm_threshold_file: str
Path to the MVCM threshold file
use_sipsanc: bool
Use sipsanc tool for ancillary files
Returns
-------
None
......@@ -130,6 +134,8 @@ def main(
if hires_only is False:
logger.info("Running regular MVCM before high resolution")
# run sipsanc V??03MOD before mvcm.
# this requires to modify the C code.
run_mvcm(file_names, mvcm_threshold_file, out_file, exe_path, satellite=satellite)
with open(threshold_file) as f:
......@@ -138,14 +144,14 @@ def main(
viirs_m_bands = SensorConstants.VIIRS_VIS_BANDS + SensorConstants.VIIRS_IR_BANDS
with Dataset(file_names["MOD03"]) as f:
with Dataset(file_names["IMG03"]) as f:
attrs = {attr: f.getncattr(attr) for attr in f.ncattrs()}
lat_shape = f["geolocation_data/latitude"][:].shape
fmt_in = "%Y-%m-%dT%H:%M:%S.%fZ"
fmt_out = "%Y-%m-%dT%H%M"
# We are not processing night granules
# We are not processing night granules for now
if use_hires is True:
with xr.open_dataset(file_names["IMG02"], group="observation_data") as vnp02:
for b in SensorConstants.VIIRS_IMG_BANDS:
......@@ -170,11 +176,10 @@ def main(
logger.info(f"All bands found in file {file_names['MOD02']}. The code will run.")
logger.info(f"Memory usage #2: {proc.memory_info().rss / 1e6} MB")
# rd.get_data(satellite, sensor, file_names, thresholds, hires=use_hires)
viirs_data, pixel_type = rd.get_data(satellite, sensor, file_names, thresholds, hires=use_hires)
# viirs_data = rd.get_data(satellite, sensor, file_names, thresholds, hires=use_hires)
viirs_data, pixel_type = rd.get_data(
satellite, sensor, file_names, thresholds, hires=use_hires, use_sipsanc=use_sipsanc
)
# viirs_data = xr.open_dataset("input_data.nc")
logger.info(f"Memory usage #3: {proc.memory_info().rss / 1e6} MB")
with xr.open_dataset(file_names["MOD02"]) as vnp02:
......@@ -245,6 +250,9 @@ if __name__ == "__main__":
parser.add_argument(
"--mvcm-thresholds", help="thresholds file name for operational MVCM", default=None
)
parser.add_argument(
"--sipsanc", action="store_true", help="use sipsanc tool for ancillary files"
)
args = parser.parse_args()
......@@ -267,6 +275,7 @@ if __name__ == "__main__":
out_file = args.out
verbose = args.verbose or False
debug = args.debug or False
sipsanc = args.sipsan or False
main(
satellite=satellite,
......@@ -291,6 +300,7 @@ if __name__ == "__main__":
exe_path=args.mvcm_exe_path,
hires_only=args.hires_only,
mvcm_threshold_file=args.mvcm_thresholds,
use_sipsanc=sipsanc,
)
# tracemalloc.stop()
......@@ -106,7 +106,6 @@ def main(
pixel_type = xr.open_dataset(pixel_data, chunks="auto")
viirs_data = xr.open_dataset(input_data, chunks="auto")
# cmin_3d = np.ones((18, viirs_data.M11.shape[0], viirs_data.M11.shape[1]), dtype=np.float32)
cmin = np.ones(viirs_data.latitude.shape, dtype=np.float32)
# cmin_3d = xr.ones_like(viirs_data.latitude, dtype=np.float32, chunks="auto")
......@@ -164,17 +163,12 @@ def main(
viirs_data["hicut"] = np.full_like(
viirs_data.latitude.shape, fill_value=np.nan, dtype=np.float32
)
# logger.info(f"viirs_data: {viirs_data}")
logger.info(f"Memory usage #5: {proc.memory_info().rss / 1e6} MB")
# scene_types = np.zeros(viirs_data.M11.shape, dtype=np.ubyte)
for scene_name in SceneConstants.SCENE_LIST:
# for scene_name in ["Land_Day"]:
# scene_types[viirs_data[scene_name].values == 1] = i
logger.info(f"Processing {scene_name}")
if np.all(viirs_data[scene_name].values == 0):
# if viirs_data[scene_name].all() is False:
logger.info("Skipping, no pixels in scene.")
continue
......
......@@ -3,6 +3,7 @@
import logging # noqa - pyright complains about imports not being sorted but pysort seem happy with the current sorting
import os
from datetime import datetime as dt
from subprocess import run
import ancillary_data as anc
import numpy as np
......@@ -715,6 +716,7 @@ def get_data(
thresholds: dict,
*,
hires: bool = False,
use_sipsanc: bool = False,
) -> tuple[xr.Dataset, xr.Dataset]:
"""Store every variable in one xarray dataset.
......@@ -728,12 +730,15 @@ def get_data(
dictionary containing all input file names
hires: bool
use high resolution data
use_sipsanc: bool
use sipsanc tool for ancillary files
Returns
-------
input_data: xr.Dataset
dataset containing all the satellite and ancillary data
"""
viirs = ReadData(
file_name_geo=file_names["MOD03"],
file_name_l1b=file_names["MOD02"],
......@@ -750,97 +755,95 @@ def get_data(
hires=True,
)
fmt_in = "%Y-%m-%dT%H:%M:%S.%fZ"
fmt_out = "%Y-%m-%dT%H%M"
geo_data_mod = viirs.read_viirs_geo()
viirs_data_mod = viirs.read_viirs_l1b(geo_data_mod.solar_zenith.values)
if hires is True:
geo_data = viirs_hires.read_viirs_geo()
viirs_data_img = viirs_hires.read_viirs_l1b(geo_data.solar_zenith.values)
viirs_data = viirs_hires.preprocess_viirs(geo_data, viirs_data_mod, viirs_data_img)
# viirs_data.update(geo_data)
res = 1
with xr.open_dataset(file_names["IMG02"]) as vnp02:
time_str = dt.strftime(dt.strptime(vnp02.time_coverage_start, fmt_in), fmt_out) # noqa
else:
viirs_data = viirs.preprocess_viirs(geo_data_mod, viirs_data_mod)
geo_data = geo_data_mod
res = 1
with xr.open_dataset(file_names["MOD02"]) as vnp02:
time_str = dt.strftime(dt.strptime(vnp02.time_coverage_start, fmt_in), fmt_out) # noqa
if use_sipsanc is True:
cmd_sipsanc = [
"sipsanc",
"regrid",
"all",
f"{file_names['IMG03']}",
"--output",
f"ancillary_{time_str}.nc",
]
ancillary = ReadAncillary(
file_name_l1b=file_names["MOD02"],
latitude=geo_data.latitude.values,
longitude=geo_data.longitude.values,
resolution=res,
ancillary_dir=file_names["ANC_DIR"],
geos_file_1=file_names["GEOS_atm_1"],
geos_file_2=file_names["GEOS_atm_2"],
geos_land=file_names["GEOS_land"],
geos_ocean=file_names["GEOS_ocean"],
geos_constants=file_names["GEOS_constants"],
ndvi_file=file_names["NDVI"],
sst_file=file_names["SST"],
eco_file=file_names["ECO"],
)
logger.info("Ancillary data read successfully")
logger.info("Running sipsanc")
run(cmd_sipsanc, shell=False, check=True)
ancillary_data = xr.open_dataset(f"ancillary_{time_str}.nc", chunks="auto")
logger.info("Ancillary data read successfully")
viirs_data.update(ancillary_data)
else:
ancillary = ReadAncillary(
file_name_l1b=file_names["MOD02"],
latitude=geo_data.latitude.values,
longitude=geo_data.longitude.values,
resolution=res,
ancillary_dir=file_names["ANC_DIR"],
geos_file_1=file_names["GEOS_atm_1"],
geos_file_2=file_names["GEOS_atm_2"],
geos_land=file_names["GEOS_land"],
geos_ocean=file_names["GEOS_ocean"],
geos_constants=file_names["GEOS_constants"],
ndvi_file=file_names["NDVI"],
sst_file=file_names["SST"],
eco_file=file_names["ECO"],
)
viirs_data.update(ancillary.pack_data())
logger.info(f"read_data: Memory usage: {proc.memory_info().rss / 1e6} MB")
# ancillary.pack_data()
# ancillary_data = xr.open_dataset("tmp_ancillary_data.nc", chunks="auto")
# viirs_data = xr.merge([viirs_data, ancillary_data])
viirs_data.update(ancillary.pack_data())
logger.info("Ancillary data added to the dataset")
logger.info(f"read_data: Memory usage: {proc.memory_info().rss / 1e6} MB")
viirs_data.to_netcdf("tmp_viirs_data.nc")
# scene_xr, pixels_flags = scn.scene_id(
# viirs_data, thresholds["Sun_Glint"]["bounds"][3], thresholds["Snow_Mask"]
# xdim = len(viirs_data.number_of_lines)
# # ydim = len(viirs_data.number_of_pixels)
#
# n_slices = 4
# scene_xr, pixel_flags = scn.scene_id(
# viirs_data.isel(
# number_of_lines=slice(0, int(xdim / n_slices)),
# ),
# thresholds["Sun_Glint"]["bounds"][3],
# thresholds["Snow_Mask"],
# )
xdim = len(viirs_data.number_of_lines)
ydim = len(viirs_data.number_of_pixels)
viirs_data = xr.open_dataset("tmp_viirs_data.nc", chunks="auto")
logger.info(f"pre_slicing: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr_1, pixels_flags_1 = scn.scene_id(
viirs_data.isel(
number_of_lines=slice(0, int(xdim / 2)), number_of_pixels=slice(0, int(ydim / 2))
),
thresholds["Sun_Glint"]["bounds"][3],
thresholds["Snow_Mask"],
)
logger.info(f"slice 1: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr_2, pixels_flags_2 = scn.scene_id(
viirs_data.isel(
number_of_lines=slice(int(xdim / 2), xdim), number_of_pixels=slice(0, int(ydim / 2))
),
thresholds["Sun_Glint"]["bounds"][3],
thresholds["Snow_Mask"],
)
logger.info(f"slice 2: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr_tmp_1 = xr.concat([scene_xr_1, scene_xr_2], dim="number_of_lines")
pixels_flags_tmp_1 = xr.concat([pixels_flags_1, pixels_flags_2], dim="number_of_lines")
logger.info(f"concat 1: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr_1, pixels_flags_1 = scn.scene_id(
viirs_data.isel(
number_of_lines=slice(0, int(xdim / 2)), number_of_pixels=slice(int(ydim / 2), ydim)
),
# for i in range(1, n_slices):
# slice_start = int(xdim / n_slices) * i
# slice_end = int(xdim / n_slices) * (i + 1)
# scene_xr_tmp, pixel_flags_tmp = scn.scene_id(
# viirs_data.isel(
# number_of_lines=slice(slice_start, slice_end),
# ),
# thresholds["Sun_Glint"]["bounds"][3],
# thresholds["Snow_Mask"],
# )
# logger.info(f"slice size: {scene_xr.Land_Day.shape}, {scene_xr_tmp.Land_Day.shape}")
# scene_xr = xr.concat([scene_xr, scene_xr_tmp], dim="number_of_lines")
# pixel_flags = xr.concat([pixel_flags, pixel_flags_tmp], dim="number_of_lines")
scene_xr, pixel_flags = scn.scene_id(
viirs_data,
thresholds["Sun_Glint"]["bounds"][3],
thresholds["Snow_Mask"],
)
logger.info(f"slice 3: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr_2, pixels_flags_2 = scn.scene_id(
viirs_data.isel(
number_of_lines=slice(int(xdim / 2), xdim), number_of_pixels=slice(int(ydim / 2), ydim)
),
thresholds["Sun_Glint"]["bounds"][3],
thresholds["Snow_Mask"],
)
logger.info(f"slice 4: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr_tmp_2 = xr.concat([scene_xr_1, scene_xr_2], dim="number_of_lines")
pixels_flags_tmp_2 = xr.concat([pixels_flags_1, pixels_flags_2], dim="number_of_lines")
logger.info(f"concat 2: Memory usage: {proc.memory_info().rss / 1e6} MB")
scene_xr = xr.concat([scene_xr_tmp_1, scene_xr_tmp_2], dim="number_of_pixels")
pixels_flags = xr.concat([pixels_flags_tmp_1, pixels_flags_tmp_2], dim="number_of_pixels")
logger.info(f"concat 3: Memory usage: {proc.memory_info().rss / 1e6} MB")
logger.info(f"read_data: Memory usage: {proc.memory_info().rss / 1e6} MB")
input_data = xr.Dataset(viirs_data, coords=scene_xr).chunk()
input_data.drop_vars(["latitude", "longitude"])
......@@ -848,6 +851,6 @@ def get_data(
logger.info(f"read_data: Memory usage: {proc.memory_info().rss / 1e6} MB")
# input_data.to_netcdf("input_data.nc")
# pixels_flags.to_netcdf("pixels_flags.nc")
# pixel_flags.to_netcdf("pixels_flags.nc")
return input_data, pixels_flags
return input_data, pixel_flags
"""Various utility functions."""
import logging
from typing import Dict
import numpy as np
import xarray as xr
......@@ -17,7 +16,7 @@ def cloud_mask_spi(data: xr.Dataset):
ydim = int(m05.shape[1] / 2)
spi = xr.DataArray(
np.empty((xdim, ydim, 2), dtype=np.float32),
dims=("SPI_nbands", "number_of_lines", "number_of_pixels"),
dims=("number_of_lines", "number_of_pixels", "SPI_nbands"),
)
spi[:, :, 0] = 100 * (
m05.coarsen(number_of_lines=2, number_of_pixels=2).std()
......@@ -77,7 +76,7 @@ def bt11_elevation_correction(height: np.ndarray) -> np.ndarray:
return 5 * height / 1000
def group_count(bits: Dict) -> np.ndarray:
def group_count(bits: dict) -> np.ndarray:
"""Compute number of groups with at least one test performed."""
g1 = np.clip(bits["01"] + bits["02"], 0, 1)
g2 = np.clip(bits["04"] + bits["05"] + bits["06"] + bits["07"] + bits["08"] + bits["09"], 0, 1)
......
......@@ -51,7 +51,7 @@ def save_output(
scan_line_attributes = {
"scan_start_time": {
"dims": ("number_of_scans"),
"data": np.random.rand(10),
"data": np.random.rand(203),
"attrs": {
"units": "seconds",
"_FillValue": -999.9,
......@@ -323,7 +323,7 @@ def save_output(
out_ds.to_netcdf(fname, mode="a")
logger.debug("output file created and global attributes written to netcdf file")
out_scanline.to_netcdf(fname, mode="a", group="scanline_data_hires")
out_scanline.to_netcdf(fname, mode="a", group="scan_line_attributes_hires")
logger.debug("scanline_data_hires written to netcdf file")
out_geolocation.to_netcdf(fname, mode="a", group="geolocation_data_hires")
......
......@@ -62,6 +62,9 @@ if __name__ == "__main__":
parser.add_argument(
"--mvcm-thresholds", help="thresholds file name for operational MVCM", default=None
)
parser.add_argument(
"--sipsanc", action="store_true", help="use sipsanc tool for ancillary files"
)
args = parser.parse_args()
......@@ -84,6 +87,7 @@ if __name__ == "__main__":
out_file = args.out
verbose = args.verbose or False
debug = args.debug or False
sipsanc = args.sipsanc or False
main(
satellite=satellite,
......@@ -107,4 +111,5 @@ if __name__ == "__main__":
exe_path=args.mvcm_exe_path,
hires_only=args.hires_only,
mvcm_threshold_file=args.mvcm_thresholds,
use_sipsanc=sipsanc,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment