Newer
Older
import xarray as xr
import numpy as np
import os

Paolo Veglio
committed
import logging
from datetime import datetime as dt
from attrs import define, field, validators, Factory
_DTR = np.pi/180.
_RTD = 180./np.pi

Paolo Veglio
committed
_bad_data = -999.0

Paolo Veglio
committed
logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s')
# logging.basicConfig(level=logging.INFO, filename='logfile.log', 'filemode='w',
# format='%(name)s %(levelname)s %(message)s')

Paolo Veglio
committed
@define(kw_only=True, slots=True)
class CollectInputs(object):
"""Class that collects all input files

Paolo Veglio
committed
Parameters
----------
file_name_geo: str
name of VIIRS geolocation file
file_name_l1b: str
name of VIIRS L1b file
ancillary_dir: str
path for the ancillary files. (this is necessary because the C functions
still require path and file name separately)
sst_file: str
file name for the Reynolds SST
ndvi_file: str
file name for the NDVI hdf4 file
geos_file_before: str
file name of the GEOS-5 containing atmospheric profiles before the VIIRS timestamp
geos_file_after: str
file name of the GEOS-5 containing atmospheric profiles after the VIIRS timestamp
geos_land: str
file name of the GEOS-5 file that has the land ice/snow information
geos_ocean: str
file name of the GEOS-5 file that has the ocean information
geos_constants: str
file name for the GEOS-5 constans
dims [optional]: str
name of the dimensions for the arrays in the xarray.Dataset output

Paolo Veglio
committed
"""
file_name_geo: str = field(default='', validator=[validators.instance_of(str), ])
file_name_l1b: str = field(default='', validator=[validators.instance_of(str), ])
ancillary_dir: str = field(default='', validator=[validators.instance_of(str), ])
sst_file: str = field(default='', validator=[validators.instance_of(str), ])
ndvi_file: str = field(default='', validator=[validators.instance_of(str), ])
geos_file_before: str = field(default='', validator=[validators.instance_of(str), ])
geos_file_after: str = field(default='', validator=[validators.instance_of(str), ])
geos_land: str = field(default='', validator=[validators.instance_of(str), ])
geos_ocean: str = field(default='', validator=[validators.instance_of(str), ])
geos_constants: str = field(default='', validator=[validators.instance_of(str), ])
dims: tuple = field(default=('number_of_lines', 'number_of_pixels'),
validator=[validators.instance_of(tuple), ])

Paolo Veglio
committed
@define(slots=True, kw_only=True)
class ReadData(CollectInputs):
"""Class that reads the L1b/geolocation data from VIIRS. Inherits file names from CollectInputs

Paolo Veglio
committed
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
Parameters
----------
satellite: str
satellite name.
sensor: str
sensor name
"""
satellite: str = field(validator=[validators.instance_of(str),
validators.in_(['snpp'])])
sensor: str = field(validator=[validators.instance_of(str),
validators.in_(['viirs'])])
logging.debug('Instance of ReadData created')
def read_viirs_geo(self) -> xr.Dataset:
"""Read VIIRS geolocation data and generate additional angles
Parameters
----------
self.file_name_geo: str
file name of the geolocation file
Returns
-------
geo_data xarray.Dataset
dataset containing all geolocation data
"""
logging.debug(f'Reading {self.file_name_geo}')
geo_data = xr.open_dataset(self.file_name_geo, group='geolocation_data')
relazi = self.relative_azimuth_angle(geo_data.sensor_azimuth.values, geo_data.solar_azimuth.values)
sunglint = self.sun_glint_angle(geo_data.sensor_zenith.values, geo_data.solar_zenith.values, relazi)
scatt_angle = self.scattering_angle(geo_data.solar_zenith.values, geo_data.sensor_zenith.values,
relazi)
geo_data['relative_azimuth'] = (self.dims, relazi)
geo_data['sunglint_angle'] = (self.dims, sunglint)
geo_data['scattering_angle'] = (self.dims, scatt_angle)
logging.debug('Geolocation file read correctly')
return geo_data
def read_viirs_l1b(self,
solar_zenith: npt.NDArray[float]) -> xr.Dataset:
"""Read VIIRS L1b data
Parameters
----------
self.file_name_l1b: str
file name of the L1b file
solar_zenith: np.ndarray
solar zenith angle derived from the geolocation file
"""
logging.debug(f'Reading {self.file_name_l1b}')
l1b_data = xr.open_dataset(self.file_name_l1b, group='observation_data', decode_cf=False)
rad_data = xr.Dataset()
for band in list(l1b_data.variables):
if 'reflectance' in l1b_data[band].long_name:
if hasattr(l1b_data[band], 'VCST_scale_factor'):
scale_factor = l1b_data[band].VCST_scale_factor * l1b_data[band].bias_correction
else:
scale_factor = l1b_data[band].scale_factor
rad_data[band] = (self.dims, l1b_data[band].values * scale_factor / np.cos(solar_zenith*_DTR))
elif 'radiance' in l1b_data[band].long_name:
bt_lut = f'{band}_brightness_temperature_lut'
rad_data[band] = (self.dims,
l1b_data[bt_lut].values[l1b_data[band].values])

Paolo Veglio
committed
logging.debug('L1b file read correctly')
viirs: xr.Dataset) -> xr.Dataset:
"""Create band combinations (e.g. ratios, differences) that are used by some spectral tests.
Parameters
----------
geo_data: xarray.Dataset
dataset containing all geolocation data
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
viirs: xr.Dataset
VIIRS L1b data
Returns
-------
viirs: xr.Dataset
VIIRS L1b data plus band combinations for the spectral tests
"""
if (('M05' in viirs) and ('M07' in viirs)):
m01 = viirs.M05.values
m02 = viirs.M07.values
r1 = 2.0 * (np.power(m02, 2.0) - np.power(m01, 2.0)) + (1.5 * m02) + (0.5 * m01)
r2 = m01 + m02 + 0.5
r3 = r1 / r2
gemi = r3 * (1.0 - 0.25*r3) - ((m01 - 0.125) / (1.0 - m01))
else:
gemi = np.full((viirs.M15.shape), _bad_data)
"""
if 'M05' in viirs:
idx = np.nonzero((viirs.M05.values < -99) | (viirs.M05.values > 2))
viirs['M05'].values[idx] = _bad_data
else:
viirs['M05'] = (self.dims, np.full(viirs.M15.shape, _bad_data))
if 'M07' in viirs:
idx = np.nonzero((viirs.M07.values < -99) | (viirs.M07.values > 2))
viirs['M07'].values[idx] = _bad_data
else:
viirs['M07'] = (self.dims, np.full(viirs.M15.shape, _bad_data))
idx = np.nonzero((viirs.M12.values < 0) | (viirs.M12.values > 1000))
viirs['M12'].values[idx] = _bad_data
idx = np.nonzero((viirs.M13.values < 0) | (viirs.M13.values > 1000))
viirs['M13'].values[idx] = _bad_data
idx = np.nonzero((viirs.M14.values < 0) | (viirs.M14.values > 1000))
viirs['M14'].values[idx] = _bad_data
idx = np.nonzero((viirs.M15.values < 0) | (viirs.M15.values > 1000))
viirs['M15'].values[idx] = _bad_data
idx = np.nonzero((viirs.M16.values < 0) | (viirs.M16.values > 1000))
viirs['M16'].values[idx] = _bad_data
"""
# Compute channel differences and ratios that are used in the tests
if (('M05' in viirs) and ('M07' in viirs)):
viirs['M07-M05ratio'] = (self.dims, viirs.M07.values / viirs.M05.values)
else:
viirs['M07-M05ratio'] = (self.dims, np.full(viirs.M15.shape, _bad_data))
viirs['M13-M16'] = (self.dims, viirs.M13.values - viirs.M16.values)
viirs['M14-M15'] = (self.dims, viirs.M14.values - viirs.M15.values)
viirs['M15-M12'] = (self.dims, viirs.M15.values - viirs.M12.values)
viirs['M15-M13'] = (self.dims, viirs.M15.values - viirs.M13.values)
viirs['M15-M16'] = (self.dims, viirs.M15.values - viirs.M16.values)
viirs['GEMI'] = (self.dims, gemi)
# temp value to force the code to work
viirs['M128'] = (self.dims, np.zeros(viirs.M15.shape))
viirs.update(geo_data)
viirs = viirs.set_coords(['latitude', 'longitude'])
logging.debug('Viirs preprocessing completed successfully.')
return viirs
def relative_azimuth_angle(self,
sensor_azimuth: npt.NDArray[float],
solar_azimuth: npt.NDArray[float]) -> npt.NDArray[float]:
"""Computation of the relative azimuth angle
Parameters
----------
sensor_azimuth: np.ndarray
sensor azimuth angle from the geolocation file
solar_azimuth: np.ndarray
solar azimuth angle from the geolocation file
Returns
-------
relative_azimuth: np.ndarray
"""
rel_azimuth = np.abs(180. - np.abs(sensor_azimuth - solar_azimuth))
logging.debug('Relative azimuth calculated successfully.')

Paolo Veglio
committed

Paolo Veglio
committed
def sun_glint_angle(self,
sensor_zenith: npt.NDArray[float],
solar_zenith: npt.NDArray[float],
rel_azimuth: npt.NDArray[float]) -> npt.NDArray[float]:
"""Computation of the sun glint angle
Parameters
----------
sensor_zenith: np.ndarray
sensor zenith angle from the geolocation file
solar_zenith: np.ndarray
solar zenith angle from the geolocation file
relative_azimuth: np.ndarray
relative azimuth computed from function relative_azimuth_angle()
Returns
-------
sunglint_angle: np.ndarray
"""
cossna = (np.sin(sensor_zenith*_DTR) * np.sin(solar_zenith*_DTR) * np.cos(rel_azimuth*_DTR) +
np.cos(sensor_zenith*_DTR) * np.cos(solar_zenith*_DTR))
cossna[cossna > 1] = 1
sunglint_angle = np.arccos(cossna) * _RTD

Paolo Veglio
committed
logging.debug('Sunglint generated')

Paolo Veglio
committed
def scattering_angle(self,
solar_zenith: npt.NDArray[float],
sensor_zenith: npt.NDArray[float],
relative_azimuth: npt.NDArray[float]) -> npt.NDArray[float]:
"""Computation of the scattering angle
Parameters
----------
solar_zenith: np.ndarray
solar zenith angle from the geolocation file
sensor_zenith: np.ndarray
sensor zenith angle angle from the geolocation file
relative_azimuth: np.ndarray
relative azimuth computed from function relative_azimuth_angle()
Returns
-------
scattering_angle: np.ndarray
"""
cos_scatt_angle = -1. * (np.cos(solar_zenith*_DTR) * np.cos(sensor_zenith*_DTR) -
np.sin(solar_zenith*_DTR) * np.sin(sensor_zenith*_DTR) *
np.cos(relative_azimuth*_DTR))
scatt_angle = np.arccos(cos_scatt_angle) * _RTD
logging.debug('Scattering angle calculated successfully')
return scatt_angle
@define(kw_only=True, slots=True)
class ReadAncillary(CollectInputs):
"""Class tha processes all the ancillary files and makes them available for the MVCM"

Paolo Veglio
committed
Parameters
----------
array of latitudes for the granule that is being processed
array of longitudes for the granule that is being processed
resolution: int
flag to switch between MOD (1) and IMG (2) resolution

Paolo Veglio
committed
"""
latitude: npt.NDArray[float] = field(validator=[validators.instance_of(np.ndarray), ])
longitude: npt.NDArray[float] = field(validator=[validators.instance_of(np.ndarray), ])
resolution: int = field(validator=[validators.instance_of(int),
validators.in_([1, 2]), ])
out_shape: tuple = field(init=False,
default=Factory(lambda self: self.latitude.shape, takes_self=True))
def get_granule_time(self):
"""Get granule timestamp and format it for MVCM"""
vnp_time = '.'.join(os.path.basename(self.file_name_l1b).split('.')[1:3])
return dt.strftime(dt.strptime(vnp_time, 'A%Y%j.%H%M'), '%Y-%m-%d %H:%M:%S.000')
def get_sst(self) -> npt.NDArray[float]:
"""Read Reynolds SST file
Parameters
----------
Returns
-------
sst: npt.NDArray[float]
array containing the Reynolds SST interpolated at the sensor's resolution
"""
sst = np.empty(self.out_shape, dtype=np.float32).ravel()
sst = anc.py_get_Reynolds_SST(self.latitude.ravel(), self.longitude.ravel(), self.resolution,
self.ancillary_dir, self.sst_file, sst)
logging.debug('SST read successfully')
return sst.reshape(self.out_shape)
def get_ndvi(self) -> npt.NDArray[float]:
"""Read NDVI file
Parameters
----------
Returns
-------
ndvi: npt.NDArray[float]
NDVI interpolated at the sensor's resolution
"""
ndvi = np.empty(self.out_shape, dtype=np.float32).ravel()
ndvi = anc.py_get_NDVI_background(self.latitude.ravel(), self.longitude.ravel(), self.resolution,
self.ancillary_dir, self.ndvi_file, ndvi)
logging.debug('NDVI read successfully')
return ndvi.reshape(self.out_shape)
def get_eco(self):
pass
def get_geos(self) -> Dict:
"""Read GEOS-5 data and interpolate the fields to the sensor resolution
Parameters
----------
Returns
-------
geos_data: Dict
dictionary containing all quantities required by MVCM (see geos_variables here below)
"""
geos_variables = ['tpw', 'snow_fraction', 'ice_fraction', 'ocean_fraction',
'land_ice_fraction', 'surface_temperature']
geos_data = {var: np.empty(self.out_shape, dtype=np.float32).ravel() for var in geos_variables}
geos_data = anc.py_get_GEOS(self.latitude.ravel(), self.longitude.ravel(), self.resolution,
self.get_granule_time(), self.ancillary_dir, self.geos_file_before,
self.geos_file_after, self.geos_land, self.geos_ocean,
self.geos_constants, geos_data)
for var in list(geos_variables):
geos_data[var] = geos_data[var].reshape(self.out_shape)
logging.debug('GEOS data read successfully')
return geos_data
def pack_data(self) -> xr.Dataset:
"""Pack all the ancillary data into a single dataset
Returns
-------
ancillary_data: xr.Dataset
dataset containing all the ancillary data
"""
ancillary_data = xr.Dataset()
ancillary_data['sst'] = (self.dims, self.get_sst())
ancillary_data['ndvi'] = (self.dims, self.get_ndvi())
geos_tmp = self.get_geos()
for var in list(geos_tmp.keys()):
ancillary_data[f'geos_{var}'] = (self.dims, geos_tmp[var])

Paolo Veglio
committed

Paolo Veglio
committed
scene_flags = scn.find_scene(viirs, sunglint_angle)
scene = scn.scene_id(scene_flags)
scene_xr = xr.Dataset()
for s in scn._scene_list:
scene_xr[s] = (('number_of_lines', 'number_of_pixels'), scene[s])
scene['lat'] = viirs.latitude
scene['lon'] = viirs.longitude
data = xr.Dataset(viirs, coords=scene_xr)
data.drop_vars(['latitude', 'longitude'])

Paolo Veglio
committed
def get_data(satellite: str,
sensor: str,
file_names: Dict[str, str],
sunglint_angle: float,
hires: bool = False) -> xr.Dataset:
mod02 = file_names['MOD02']
mod03 = file_names['MOD03']
if hires is True:
img02 = file_names['IMG02']
img03 = file_names['IMG03']
viirs = read_viirs(sensor, f'{mod02}', f'{mod03}')
viirs = read_ancillary_data(file_names, viirs)
if (('M05' in viirs) and ('M07' in viirs)):
m01 = viirs.M05.values
m02 = viirs.M07.values
r1 = 2.0 * (np.power(m02, 2.0) - np.power(m01, 2.0)) + (1.5 * m02) + (0.5 * m01)
r2 = m02 + m01 + 0.5
r3 = r1 / r2
gemi = r3 * (1.0 - 0.25*r3) - ((m01 - 0.125) / (1.0 - m01))
else:
gemi = np.full((viirs.M15.shape), _bad_data)
if 'M05' in viirs:
idx = np.nonzero((viirs.M05.values < -99) | (viirs.M05.values > 2))
viirs['M05'].values[idx] = _bad_data
viirs['M05'] = (('number_of_lines', 'number_of_pixels'),
np.full(viirs.M15.shape, _bad_data))
if 'M07' in viirs:
idx = np.nonzero((viirs.M07.values < -99) | (viirs.M07.values > 2))
viirs['M07'].values[idx] = _bad_data
viirs['M07'] = (('number_of_lines', 'number_of_pixels'),
np.full(viirs.M15.shape, _bad_data))
idx = np.nonzero((viirs.M12.values < 0) | (viirs.M12.values > 1000))
viirs['M12'].values[idx] = _bad_data
idx = np.nonzero((viirs.M13.values < 0) | (viirs.M13.values > 1000))
viirs['M13'].values[idx] = _bad_data
idx = np.nonzero((viirs.M14.values < 0) | (viirs.M14.values > 1000))
viirs['M14'].values[idx] = _bad_data
idx = np.nonzero((viirs.M15.values < 0) | (viirs.M15.values > 1000))
viirs['M15'].values[idx] = _bad_data
idx = np.nonzero((viirs.M16.values < 0) | (viirs.M16.values > 1000))
viirs['M16'].values[idx] = _bad_data

Paolo Veglio
committed
# Compute channel differences and ratios that are used in the tests
viirs['M15-M13'] = (('number_of_lines', 'number_of_pixels'),
viirs.M15.values - viirs.M13.values)
viirs['M14-M15'] = (('number_of_lines', 'number_of_pixels'),
viirs.M14.values - viirs.M15.values)
viirs['M15-M16'] = (('number_of_lines', 'number_of_pixels'),
viirs.M15.values - viirs.M16.values)
viirs['M15-M12'] = (('number_of_lines', 'number_of_pixels'),
viirs.M15.values - viirs.M12.values)
viirs['M13-M16'] = (('number_of_lines', 'number_of_pixels'),
viirs.M13.values - viirs.M16.values)
viirs['M07-M05ratio'] = (('number_of_lines', 'number_of_pixels'),
viirs.M07.values / viirs.M05.values)
viirs['GEMI'] = (('number_of_lines', 'number_of_pixels'), gemi)
# temp value to force the code to work
viirs['M128'] = (('number_of_lines', 'number_of_pixels'), np.zeros(viirs.M15.shape))
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
viirs = read_viirs('viirs', f'{img02}', f'{img03}')
viirs['M05'] = viirs.I01
viirs['M07'] = viirs.I02
idx = np.nonzero((viirs.M05.values < -99) | (viirs.M05.values > 2))
viirs['M05'].values[idx] = _bad_data
idx = np.nonzero((viirs.M07.values < -99) | (viirs.M07.values > 2))
viirs['M07'].values[idx] = _bad_data
idx = np.nonzero((viirs.I01.values < 0) | (viirs.I01.values > 1000))
viirs['I01'].values[idx] = _bad_data
idx = np.nonzero((viirs.I02.values < 0) | (viirs.I02.values > 1000))
viirs['I02'].values[idx] = _bad_data
idx = np.nonzero((viirs.I03.values < 0) | (viirs.I03.values > 1000))
viirs['I03'].values[idx] = _bad_data
idx = np.nonzero((viirs.I04.values < 0) | (viirs.I04.values > 1000))
viirs['I04'].values[idx] = _bad_data
idx = np.nonzero((viirs.I05.values < 0) | (viirs.I05.values > 1000))
viirs['I05'].values[idx] = _bad_data
viirs = read_ancillary_data(file_names, viirs, resolution=2)
viirs['I05-I04'] = (('number_of_lines_2', 'number_of_pixels_2'),
viirs.I05.values - viirs.I04.values)
viirs['I02-I01ratio'] = (('number_of_lines_2', 'number_of_pixels_2'),
viirs.I02.values / viirs.I01.values)
scene_flags = scn.find_scene(viirs, sunglint_angle)
scene = scn.scene_id(scene_flags)
scene_xr = xr.Dataset()
for s in scn._scene_list:
scene_xr[s] = (('number_of_lines', 'number_of_pixels'), scene[s])
scene['lat'] = viirs.latitude
scene['lon'] = viirs.longitude
data = xr.Dataset(viirs, coords=scene_xr)
data.drop_vars(['latitude', 'longitude'])
return data