Newer
Older
import xarray as xr
import numpy as np
_DTR = np.pi/180.
_RTD = 180./np.pi
def read_data(sensor: str, l1b_filename: str, geo_filename: str):

Paolo Veglio
committed
in_data = xr.open_dataset(geo_filename, group='geolocation_data')
data = xr.open_dataset(l1b_filename, group='observation_data', decode_cf=False)

Paolo Veglio
committed
if sensor.lower() == 'viirs':
for band in list(data.variables):
if 'reflectance' in data[band].long_name:
if hasattr(data[band], 'VCST_scale_factor'):
scale_factor = data[band].VCST_scale_factor * data[band].bias_correction
else:

Paolo Veglio
committed
scale_factor = data[band].scale_factor
in_data[band] = (('number_of_lines', 'number_of_pixels'),

Paolo Veglio
committed
data[band].values * scale_factor / np.cos(in_data.solar_zenith.values*_DTR))
elif 'radiance' in data[band].long_name:
in_data[band] = (('number_of_lines', 'number_of_pixels'),
data[f'{band}_brightness_temperature_lut'].values[data[band].values])
else:
pass

Paolo Veglio
committed
relazi = relative_azimuth_angle(in_data.sensor_azimuth.values, in_data.solar_azimuth.values)
sunglint = sun_glint_angle(in_data.sensor_zenith.values, in_data.solar_zenith.values, relazi)
scatt_angle = scattering_angle(in_data.solar_zenith.values, in_data.sensor_zenith.values, relazi)
in_data['relative_azimuth'] = (('number_of_lines', 'number_of_pixels'), relazi)
in_data['sunglint_angle'] = (('number_of_lines', 'number_of_pixels'), sunglint)
in_data['scattering_angle'] = (('number_of_lines', 'number_of_pixels'), scatt_angle)
return in_data
def relative_azimuth_angle(sensor_azimuth: float, solar_azimuth: float) -> float:
rel_azimuth = np.abs(180. - np.abs(sensor_azimuth - solar_azimuth))
return rel_azimuth
def sun_glint_angle(sensor_zenith: float, solar_zenith: float, rel_azimuth: float) -> float:
cossna = (np.sin(sensor_zenith*_DTR) * np.sin(solar_zenith*_DTR) * np.cos(rel_azimuth*_DTR) +
np.cos(sensor_zenith*_DTR) * np.cos(solar_zenith*_DTR))
cossna[cossna > 1] = 1
sunglint_angle = np.arccos(cossna) * _RTD
return sunglint_angle
def scattering_angle(solar_zenith, sensor_zenith, relative_azimuth):
cos_scatt_angle = -1. * (np.cos(solar_zenith*_DTR) * np.cos(sensor_zenith*_DTR) -
np.sin(solar_zenith*_DTR) * np.sin(sensor_zenith*_DTR) *
np.cos(relative_azimuth*_DTR))
scatt_angle = np.arccos(cos_scatt_angle) * _RTD
return scatt_angle
def correct_reflectances():
pass
def read_ancillary_data(file_names: str, viirs_data: float, resolution=1):
# Ancillary files temporarily defined here. Eventually we will find a better way to pass these
start_time = '2022-06-22 14:54:00.000'

Paolo Veglio
committed
anc_dir = file_names['ANC_DIR']
sst_file = file_names['SST']
ndvi_file = file_names['NDVI']
geos_cnst = file_names['GEOS_constants']
geos_lnd = file_names['GEOS_land']
geos_ocn = file_names['GEOS_ocean']
geos1 = file_names['GEOS_atm_1']
geos2 = file_names['GEOS_atm_2']
dim1 = viirs_data.latitude.shape[0]
dim2 = viirs_data.latitude.shape[1]

Paolo Veglio
committed
print(dim1, dim2)

Paolo Veglio
committed
sst = np.empty((dim1*dim2, ), dtype=np.float32)
ndvi = np.empty((dim1*dim2, ), dtype=np.float32)
eco = np.empty((dim1*dim2, ), dtype=np.ubyte)
geos_data = {'tpw': np.empty((dim1*dim2, ), dtype=np.float32),
'snowfr': np.empty((dim1*dim2, ), dtype=np.float32),
'icefr': np.empty((dim1*dim2, ), dtype=np.float32),
'ocnfr': np.empty((dim1*dim2, ), dtype=np.float32),
'landicefr': np.empty((dim1*dim2, ), dtype=np.float32),
'sfct': np.empty((dim1*dim2, ), dtype=np.float32),

Paolo Veglio
committed
sst = anc.py_get_Reynolds_SST(viirs_data.latitude.values.reshape((dim1*dim2, )),
viirs_data.longitude.values.reshape((dim1*dim2, )),

Paolo Veglio
committed
resolution, anc_dir, sst_file, sst)
print('sst')

Paolo Veglio
committed
ndvi = anc.py_get_NDVI_background(viirs_data.latitude.values.reshape((dim1*dim2, )),
viirs_data.longitude.values.reshape((dim1*dim2, )),

Paolo Veglio
committed
resolution, anc_dir, ndvi_file, ndvi)

Paolo Veglio
committed
print('ndvi')

Paolo Veglio
committed
eco = anc.py_get_Olson_eco(viirs_data.latitude.values.reshape((dim1*dim2, )),
viirs_data.longitude.values.reshape((dim1*dim2, )),

Paolo Veglio
committed
resolution, anc_dir, eco)

Paolo Veglio
committed
print('eco')

Paolo Veglio
committed
geos_data = anc.py_get_GEOS(viirs_data.latitude.values.reshape((dim1*dim2, )),
viirs_data.longitude.values.reshape((dim1*dim2, )),

Paolo Veglio
committed
resolution, start_time, anc_dir, geos1, geos2, geos_lnd, geos_ocn, geos_cnst, geos_data)
print('geos')

Paolo Veglio
committed
data['sst'] = (('number_of_lines', 'number_of_pixels'), sst.reshape((dim1, dim2)))
data['ndvi'] = (('number_of_lines', 'number_of_pixels'), ndvi.reshape((dim1, dim2)))
data['eco'] = (('number_of_lines', 'number_of_pixels'), eco.reshape((dim1, dim2)))
data['geos_tpw'] = (('number_of_lines', 'number_of_pixels'), geos_data['tpw'].reshape((dim1, dim2)))
data['geos_snowfr'] = (('number_of_lines', 'number_of_pixels'), geos_data['snowfr'].reshape((dim1, dim2)))
data['geos_icefr'] = (('number_of_lines', 'number_of_pixels'), geos_data['icefr'].reshape((dim1, dim2)))
data['geos_ocnfr'] = (('number_of_lines', 'number_of_pixels'), geos_data['ocnfr'].reshape((dim1, dim2)))
data['geos_landicefr'] = (('number_of_lines', 'number_of_pixels'), geos_data['landicefr'].reshape((dim1, dim2)))
data['geos_sfct'] = (('number_of_lines', 'number_of_pixels'), geos_data['sfct'].reshape((dim1, dim2)))
def get_data(file_names, sunglint_angle):
mod02 = file_names['MOD02']
mod03 = file_names['MOD03']
viirs_data = read_data('viirs', f'{mod02}', f'{mod03}')
viirs_data = read_ancillary_data(file_names, viirs_data)
# Include channel differences and ratios that are used in the tests
viirs_data['M15-M14'] = (('number_of_lines', 'number_of_pixels'),
viirs_data.M15.values - viirs_data.M14.values)
viirs_data['M15-M16'] = (('number_of_lines', 'number_of_pixels'),
viirs_data.M15.values - viirs_data.M16.values)
viirs_data['M15-M13'] = (('number_of_lines', 'number_of_pixels'),
viirs_data.M15.values - viirs_data.M13.values)
scene_flags = scn.find_scene(viirs_data, sunglint_angle)
scene = scn.scene_id(scene_flags)
scene_xr = xr.Dataset()
for s in scn._scene_list:
scene_xr[s] = (('number_of_lines', 'number_of_pixels'), scene[s])
scene['lat'] = viirs_data.latitude
scene['lon'] = viirs_data.longitude
data = xr.Dataset(viirs_data, coords=scene_xr)
data.drop_vars(['latitude', 'longitude'])
return data