-
Paolo Veglio authoredPaolo Veglio authored
read_data.py 23.00 KiB
import xarray as xr
import numpy as np
import numpy.typing as npt
import ancillary_data as anc
import scene as scn
import os
import logging
from datetime import datetime as dt
from typing import Dict
from attrs import define, field, validators, Factory
_DTR = np.pi/180.
_RTD = 180./np.pi
_bad_data = -999.0
logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s')
# logging.basicConfig(level=logging.INFO, filename='logfile.log', 'filemode='w',
# format='%(name)s %(levelname)s %(message)s')
@define(kw_only=True, slots=True)
class CollectInputs(object):
"""Class that collects all input files
Parameters
----------
file_name_geo: str
name of VIIRS geolocation file
file_name_l1b: str
name of VIIRS L1b file
ancillary_dir: str
path for the ancillary files. (this is necessary because the C functions
still require path and file name separately)
sst_file: str
file name for the Reynolds SST
ndvi_file: str
file name for the NDVI hdf4 file
geos_file_before: str
file name of the GEOS-5 containing atmospheric profiles before the VIIRS timestamp
geos_file_after: str
file name of the GEOS-5 containing atmospheric profiles after the VIIRS timestamp
geos_land: str
file name of the GEOS-5 file that has the land ice/snow information
geos_ocean: str
file name of the GEOS-5 file that has the ocean information
geos_constants: str
file name for the GEOS-5 constans
dims [optional]: str
name of the dimensions for the arrays in the xarray.Dataset output
"""
file_name_geo: str = field(default='', validator=[validators.instance_of(str), ])
file_name_l1b: str = field(default='', validator=[validators.instance_of(str), ])
ancillary_dir: str = field(default='', validator=[validators.instance_of(str), ])
sst_file: str = field(default='', validator=[validators.instance_of(str), ])
ndvi_file: str = field(default='', validator=[validators.instance_of(str), ])
geos_file_1: str = field(default='', validator=[validators.instance_of(str), ])
geos_file_2: str = field(default='', validator=[validators.instance_of(str), ])
geos_land: str = field(default='', validator=[validators.instance_of(str), ])
geos_ocean: str = field(default='', validator=[validators.instance_of(str), ])
geos_constants: str = field(default='', validator=[validators.instance_of(str), ])
dims: tuple = field(default=('number_of_lines', 'number_of_pixels'),
validator=[validators.instance_of(tuple), ])
@define(slots=True, kw_only=True)
class ReadData(CollectInputs):
"""Class that reads the L1b/geolocation data from VIIRS. Inherits file names from CollectInputs
Parameters
----------
satellite: str
satellite name.
sensor: str
sensor name
"""
satellite: str = field(validator=[validators.instance_of(str),
validators.in_(['snpp'])])
sensor: str = field(validator=[validators.instance_of(str),
validators.in_(['viirs'])])
logging.debug('Instance of ReadData created')
def read_viirs_geo(self) -> xr.Dataset:
"""Read VIIRS geolocation data and generate additional angles
Parameters
----------
self.file_name_geo: str
file name of the geolocation file
Returns
-------
geo_data xarray.Dataset
dataset containing all geolocation data
"""
logging.debug(f'Reading {self.file_name_geo}')
geo_data = xr.open_dataset(self.file_name_geo, group='geolocation_data')
relazi = self.relative_azimuth_angle(geo_data.sensor_azimuth.values, geo_data.solar_azimuth.values)
sunglint = self.sun_glint_angle(geo_data.sensor_zenith.values, geo_data.solar_zenith.values, relazi)
scatt_angle = self.scattering_angle(geo_data.solar_zenith.values, geo_data.sensor_zenith.values,
relazi)
geo_data['relative_azimuth'] = (self.dims, relazi)
geo_data['sunglint_angle'] = (self.dims, sunglint)
geo_data['scattering_angle'] = (self.dims, scatt_angle)
logging.debug('Geolocation file read correctly')
return geo_data
def read_viirs_l1b(self,
solar_zenith: npt.NDArray[float]) -> xr.Dataset:
"""Read VIIRS L1b data
Parameters
----------
self.file_name_l1b: str
file name of the L1b file
solar_zenith: np.ndarray
solar zenith angle derived from the geolocation file
"""
logging.debug(f'Reading {self.file_name_l1b}')
l1b_data = xr.open_dataset(self.file_name_l1b, group='observation_data', decode_cf=False)
rad_data = xr.Dataset()
for band in list(l1b_data.variables):
if 'reflectance' in l1b_data[band].long_name:
if hasattr(l1b_data[band], 'VCST_scale_factor'):
scale_factor = l1b_data[band].VCST_scale_factor * l1b_data[band].bias_correction
else:
scale_factor = l1b_data[band].scale_factor
rad_data[band] = (self.dims, l1b_data[band].values * scale_factor / np.cos(solar_zenith*_DTR))
elif 'radiance' in l1b_data[band].long_name:
bt_lut = f'{band}_brightness_temperature_lut'
rad_data[band] = (self.dims,
l1b_data[bt_lut].values[l1b_data[band].values])
else:
pass
logging.debug('L1b file read correctly')
return rad_data
def preprocess_viirs(self,
viirs: xr.Dataset) -> xr.Dataset:
"""Create band combinations (e.g. ratios, differences) that are used by some spectral tests.
Parameters
----------
viirs: xr.Dataset
VIIRS L1b data
Returns
-------
viirs: xr.Dataset
VIIRS L1b data plus band combinations for the spectral tests
"""
if (('M05' in viirs) and ('M07' in viirs)):
m01 = viirs.M05.values
m02 = viirs.M07.values
r1 = 2.0 * (np.power(m02, 2.0) - np.power(m01, 2.0)) + (1.5 * m02) + (0.5 * m01)
r2 = m01 + m02 + 0.5
r3 = r1 / r2
gemi = r3 * (1.0 - 0.25*r3) - ((m01 - 0.125) / (1.0 - m01))
else:
gemi = np.full((viirs.M15.shape), _bad_data)
"""
if 'M05' in viirs:
idx = np.nonzero((viirs.M05.values < -99) | (viirs.M05.values > 2))
viirs['M05'].values[idx] = _bad_data
else:
viirs['M05'] = (self.dims, np.full(viirs.M15.shape, _bad_data))
if 'M07' in viirs:
idx = np.nonzero((viirs.M07.values < -99) | (viirs.M07.values > 2))
viirs['M07'].values[idx] = _bad_data
else:
viirs['M07'] = (self.dims, np.full(viirs.M15.shape, _bad_data))
idx = np.nonzero((viirs.M12.values < 0) | (viirs.M12.values > 1000))
viirs['M12'].values[idx] = _bad_data
idx = np.nonzero((viirs.M13.values < 0) | (viirs.M13.values > 1000))
viirs['M13'].values[idx] = _bad_data
idx = np.nonzero((viirs.M14.values < 0) | (viirs.M14.values > 1000))
viirs['M14'].values[idx] = _bad_data
idx = np.nonzero((viirs.M15.values < 0) | (viirs.M15.values > 1000))
viirs['M15'].values[idx] = _bad_data
idx = np.nonzero((viirs.M16.values < 0) | (viirs.M16.values > 1000))
viirs['M16'].values[idx] = _bad_data
"""
# Compute channel differences and ratios that are used in the tests
if (('M05' in viirs) and ('M07' in viirs)):
viirs['M07-M05ratio'] = (self.dims, viirs.M07.values / viirs.M05.values)
else:
viirs['M07-M05ratio'] = (self.dims, np.full(viirs.M15.shape, _bad_data))
viirs['M13-M16'] = (self.dims, viirs.M13.values - viirs.M16.values)
viirs['M14-M15'] = (self.dims, viirs.M14.values - viirs.M15.values)
viirs['M15-M12'] = (self.dims, viirs.M15.values - viirs.M12.values)
viirs['M15-M13'] = (self.dims, viirs.M15.values - viirs.M13.values)
viirs['M15-M16'] = (self.dims, viirs.M15.values - viirs.M16.values)
viirs['GEMI'] = (self.dims, gemi)
# temp value to force the code to work
viirs['M128'] = (self.dims, np.zeros(viirs.M15.shape))
logging.debug('Viirs preprocessing completed successfully.')
return viirs
def relative_azimuth_angle(self,
sensor_azimuth: npt.NDArray[float],
solar_azimuth: npt.NDArray[float]) -> npt.NDArray[float]:
"""Computation of the relative azimuth angle
Parameters
----------
sensor_azimuth: np.ndarray
sensor azimuth angle from the geolocation file
solar_azimuth: np.ndarray
solar azimuth angle from the geolocation file
Returns
-------
relative_azimuth: np.ndarray
"""
rel_azimuth = np.abs(180. - np.abs(sensor_azimuth - solar_azimuth))
logging.debug('Relative azimuth calculated successfully.')
return rel_azimuth
def sun_glint_angle(self,
sensor_zenith: npt.NDArray[float],
solar_zenith: npt.NDArray[float],
rel_azimuth: npt.NDArray[float]) -> npt.NDArray[float]:
"""Computation of the sun glint angle
Parameters
----------
sensor_zenith: np.ndarray
sensor zenith angle from the geolocation file
solar_zenith: np.ndarray
solar zenith angle from the geolocation file
relative_azimuth: np.ndarray
relative azimuth computed from function relative_azimuth_angle()
Returns
-------
sunglint_angle: np.ndarray
"""
cossna = (np.sin(sensor_zenith*_DTR) * np.sin(solar_zenith*_DTR) * np.cos(rel_azimuth*_DTR) +
np.cos(sensor_zenith*_DTR) * np.cos(solar_zenith*_DTR))
cossna[cossna > 1] = 1
sunglint_angle = np.arccos(cossna) * _RTD
logging.debug('Sunglint generated')
return sunglint_angle
def scattering_angle(self,
solar_zenith: npt.NDArray[float],
sensor_zenith: npt.NDArray[float],
relative_azimuth: npt.NDArray[float]) -> npt.NDArray[float]:
"""Computation of the scattering angle
Parameters
----------
solar_zenith: np.ndarray
solar zenith angle from the geolocation file
sensor_zenith: np.ndarray
sensor zenith angle angle from the geolocation file
relative_azimuth: np.ndarray
relative azimuth computed from function relative_azimuth_angle()
Returns
-------
scattering_angle: np.ndarray
"""
cos_scatt_angle = -1. * (np.cos(solar_zenith*_DTR) * np.cos(sensor_zenith*_DTR) -
np.sin(solar_zenith*_DTR) * np.sin(sensor_zenith*_DTR) *
np.cos(relative_azimuth*_DTR))
scatt_angle = np.arccos(cos_scatt_angle) * _RTD
logging.debug('Scattering angle calculated successfully')
return scatt_angle
@define(kw_only=True, slots=True)
class ReadAncillary(CollectInputs):
"""Class tha processes all the ancillary files and makes them available for the MVCM"
Parameters
----------
latitude: npt.NDArray[float]
array of latitudes for the granule that is being processed
longitude: npt.NDArray[float]
array of longitudes for the granule that is being processed
resolution: int
flag to switch between MOD (1) and IMG (2) resolution
"""
latitude: npt.NDArray[float] = field(validator=[validators.instance_of(np.ndarray), ])
longitude: npt.NDArray[float] = field(validator=[validators.instance_of(np.ndarray), ])
resolution: int = field(validator=[validators.instance_of(int),
validators.in_([1, 2]), ])
out_shape: tuple = field(init=False,
default=Factory(lambda self: self.latitude.shape, takes_self=True))
def get_granule_time(self):
"""Get granule timestamp and format it for MVCM"""
vnp_time = '.'.join(os.path.basename(self.file_name_l1b).split('.')[1:3])
return dt.strftime(dt.strptime(vnp_time, 'A%Y%j.%H%M'), '%Y-%m-%d %H:%M:%S.000')
def get_sst(self) -> npt.NDArray[float]:
"""Read Reynolds SST file
Parameters
----------
Returns
-------
sst: npt.NDArray[float]
array containing the Reynolds SST interpolated at the sensor's resolution
"""
sst = np.empty(self.out_shape, dtype=np.float32).ravel()
sst = anc.py_get_Reynolds_SST(self.latitude.ravel(), self.longitude.ravel(), self.resolution,
self.ancillary_dir, self.sst_file, sst)
logging.debug('SST read successfully')
return sst.reshape(self.out_shape)
def get_ndvi(self) -> npt.NDArray[float]:
"""Read NDVI file
Parameters
----------
Returns
-------
ndvi: npt.NDArray[float]
NDVI interpolated at the sensor's resolution
"""
ndvi = np.empty(self.out_shape, dtype=np.float32).ravel()
ndvi = anc.py_get_NDVI_background(self.latitude.ravel(), self.longitude.ravel(), self.resolution,
self.ancillary_dir, self.ndvi_file, ndvi)
logging.debug('NDVI read successfully')
return ndvi.reshape(self.out_shape)
def get_eco(self):
pass
def get_geos(self) -> Dict:
"""Read GEOS-5 data and interpolate the fields to the sensor resolution
Parameters
----------
Returns
-------
geos_data: Dict
dictionary containing all quantities required by MVCM (see geos_variables here below)
"""
geos_variables = ['tpw', 'snow_fraction', 'ice_fraction', 'ocean_fraction',
'land_ice_fraction', 'surface_temperature']
geos_data = {var: np.empty(self.out_shape, dtype=np.float32) for var in geos_variables}
geos_data = anc.py_get_GEOS(self.latitude.ravel(), self.longitude.ravel(), self.resolution,
self.get_granule_time(), self.ancillary_dir, self.geos_file_before,
self.geos_file_after, self.geos_land, self.geos_ocean,
self.geos_constants, geos_data)
for var in list(geos_variables):
geos_data[var] = geos_data[var].reshape(self.out_shape)
logging.debug('GEOS data read successfully')
return geos_data
def correct_reflectances():
pass
def read_ancillary_data(file_names: str,
latitude: np.ndarray,
longitude: np.ndarray,
resolution: int = 1) -> xr.Dataset:
"""Read ancillary data using C functions from original MVCM
Parameters
----------
file_names: str
latitude: np.ndarray
longitude: np.ndarray
resolution: int
Returns
-------
data: xarray.Dataset
"""
# Ancillary files temporarily defined here. Eventually we will find a better way to pass these
anc_dir = file_names['ANC_DIR']
sst_file = file_names['SST']
ndvi_file = file_names['NDVI']
geos_cnst = file_names['GEOS_constants']
geos_lnd = file_names['GEOS_land']
geos_ocn = file_names['GEOS_ocean']
geos1 = file_names['GEOS_atm_1']
geos2 = file_names['GEOS_atm_2']
vnptime = '.'.join(os.path.basename(file_names['MOD02']).split('.')[1:3])
start_time = dt.strftime(dt.strptime(vnptime, 'A%Y%j.%H%M'), '%Y-%m-%d %H:%M:%S.000')
out_shape = latitude.shape
sst = np.empty((np.prod(out_shape), ), dtype=np.float32)
ndvi = np.empty((np.prod(out_shape), ), dtype=np.float32)
eco = np.empty((np.prod(out_shape), ), dtype=np.ubyte)
geos_data = {'tpw': np.empty((np.prod(out_shape), ), dtype=np.float32),
'snowfr': np.empty((np.prod(out_shape), ), dtype=np.float32),
'icefr': np.empty((np.prod(out_shape), ), dtype=np.float32),
'ocnfr': np.empty((np.prod(out_shape), ), dtype=np.float32),
'landicefr': np.empty((np.prod(out_shape), ), dtype=np.float32),
'sfct': np.empty((np.prod(out_shape), ), dtype=np.float32),
}
sst = anc.py_get_Reynolds_SST(latitude.ravel(), longitude.ravel(), resolution, anc_dir, sst_file, sst)
logging.info('SST read successfully')
ndvi = anc.py_get_NDVI_background(latitude.ravel(), longitude.ravel(),
resolution, anc_dir, ndvi_file, ndvi)
logging.info('NDVI read successfully')
eco = anc.py_get_Olson_eco(latitude.ravel(), longitude.ravel(), resolution, anc_dir, eco)
logging.info('Olson eco read successfully')
geos = anc.py_get_GEOS(latitude.ravel(), longitude.ravel(), resolution, start_time, anc_dir,
geos1, geos2, geos_lnd, geos_ocn, geos_cnst, geos_data)
logging.info('GEOS5 data read successfully')
ancillary = {'sst': sst.reshape(out_shape),
'ndvi': ndvi.reshape(out_shape),
'eco': eco.reshape(out_shape)
}
for var in list(geos):
ancillary[f'geos_{var}'] = geos[var].reshape(out_shape)
dims = ('number_of_lines', 'number_of_pixels')
data = xr.Dataset.from_dict({var: {'dims': dims, 'data': ancillary[var]} for var in list(ancillary)})
return data
def get_data(satellite: str,
sensor: str,
file_names: Dict[str, str],
sunglint_angle: float,
hires: bool = False) -> xr.Dataset:
mod02 = file_names['MOD02']
mod03 = file_names['MOD03']
if hires is True:
img02 = file_names['IMG02']
img03 = file_names['IMG03']
if hires is False:
viirs = read_viirs(sensor, f'{mod02}', f'{mod03}')
viirs = read_ancillary_data(file_names, viirs)
if (('M05' in viirs) and ('M07' in viirs)):
m01 = viirs.M05.values
m02 = viirs.M07.values
r1 = 2.0 * (np.power(m02, 2.0) - np.power(m01, 2.0)) + (1.5 * m02) + (0.5 * m01)
r2 = m02 + m01 + 0.5
r3 = r1 / r2
gemi = r3 * (1.0 - 0.25*r3) - ((m01 - 0.125) / (1.0 - m01))
else:
gemi = np.full((viirs.M15.shape), _bad_data)
if 'M05' in viirs:
idx = np.nonzero((viirs.M05.values < -99) | (viirs.M05.values > 2))
viirs['M05'].values[idx] = _bad_data
else:
viirs['M05'] = (('number_of_lines', 'number_of_pixels'),
np.full(viirs.M15.shape, _bad_data))
if 'M07' in viirs:
idx = np.nonzero((viirs.M07.values < -99) | (viirs.M07.values > 2))
viirs['M07'].values[idx] = _bad_data
else:
viirs['M07'] = (('number_of_lines', 'number_of_pixels'),
np.full(viirs.M15.shape, _bad_data))
idx = np.nonzero((viirs.M12.values < 0) | (viirs.M12.values > 1000))
viirs['M12'].values[idx] = _bad_data
idx = np.nonzero((viirs.M13.values < 0) | (viirs.M13.values > 1000))
viirs['M13'].values[idx] = _bad_data
idx = np.nonzero((viirs.M14.values < 0) | (viirs.M14.values > 1000))
viirs['M14'].values[idx] = _bad_data
idx = np.nonzero((viirs.M15.values < 0) | (viirs.M15.values > 1000))
viirs['M15'].values[idx] = _bad_data
idx = np.nonzero((viirs.M16.values < 0) | (viirs.M16.values > 1000))
viirs['M16'].values[idx] = _bad_data
# Compute channel differences and ratios that are used in the tests
viirs['M15-M13'] = (('number_of_lines', 'number_of_pixels'),
viirs.M15.values - viirs.M13.values)
viirs['M14-M15'] = (('number_of_lines', 'number_of_pixels'),
viirs.M14.values - viirs.M15.values)
viirs['M15-M16'] = (('number_of_lines', 'number_of_pixels'),
viirs.M15.values - viirs.M16.values)
viirs['M15-M12'] = (('number_of_lines', 'number_of_pixels'),
viirs.M15.values - viirs.M12.values)
viirs['M13-M16'] = (('number_of_lines', 'number_of_pixels'),
viirs.M13.values - viirs.M16.values)
viirs['M07-M05ratio'] = (('number_of_lines', 'number_of_pixels'),
viirs.M07.values / viirs.M05.values)
viirs['GEMI'] = (('number_of_lines', 'number_of_pixels'), gemi)
# temp value to force the code to work
viirs['M128'] = (('number_of_lines', 'number_of_pixels'), np.zeros(viirs.M15.shape))
else:
viirs = read_viirs('viirs', f'{img02}', f'{img03}')
viirs['M05'] = viirs.I01
viirs['M07'] = viirs.I02
idx = np.nonzero((viirs.M05.values < -99) | (viirs.M05.values > 2))
viirs['M05'].values[idx] = _bad_data
idx = np.nonzero((viirs.M07.values < -99) | (viirs.M07.values > 2))
viirs['M07'].values[idx] = _bad_data
idx = np.nonzero((viirs.I01.values < 0) | (viirs.I01.values > 1000))
viirs['I01'].values[idx] = _bad_data
idx = np.nonzero((viirs.I02.values < 0) | (viirs.I02.values > 1000))
viirs['I02'].values[idx] = _bad_data
idx = np.nonzero((viirs.I03.values < 0) | (viirs.I03.values > 1000))
viirs['I03'].values[idx] = _bad_data
idx = np.nonzero((viirs.I04.values < 0) | (viirs.I04.values > 1000))
viirs['I04'].values[idx] = _bad_data
idx = np.nonzero((viirs.I05.values < 0) | (viirs.I05.values > 1000))
viirs['I05'].values[idx] = _bad_data
viirs = read_ancillary_data(file_names, viirs, resolution=2)
viirs['I05-I04'] = (('number_of_lines_2', 'number_of_pixels_2'),
viirs.I05.values - viirs.I04.values)
viirs['I02-I01ratio'] = (('number_of_lines_2', 'number_of_pixels_2'),
viirs.I02.values / viirs.I01.values)
scene_flags = scn.find_scene(viirs, sunglint_angle)
scene = scn.scene_id(scene_flags)
scene_xr = xr.Dataset()
for s in scn._scene_list:
scene_xr[s] = (('number_of_lines', 'number_of_pixels'), scene[s])
scene['lat'] = viirs.latitude
scene['lon'] = viirs.longitude
data = xr.Dataset(viirs, coords=scene_xr)
data.drop_vars(['latitude', 'longitude'])
return data
def get_data_new(satellite: str,
sensor: str,
file_names: Dict[str, str],
sunglint_angle: float,
hires: bool = False) -> xr.Dataset:
"""Reads satellite and ancillary data and packs everything in an xarray dataset for the main function
"""
mod02 = file_names['MOD02']
mod03 = file_names['MOD03']
viirs = read_viirs(sensor, f'{mod02}', f'{mod03}')
ancillary_data = read_ancillary_data(file_names, viirs.latitude.values, viirs.longitude.values)