import xarray as xr import numpy as np import ancillary_data as anc import scene as scn from typing import Dict _DTR = np.pi/180. _RTD = 180./np.pi def read_data(sensor: str, l1b_filename: str, geo_filename: str) -> xr.Dataset: in_data = xr.open_dataset(geo_filename, group='geolocation_data') data = xr.open_dataset(l1b_filename, group='observation_data', decode_cf=False) if sensor.lower() == 'viirs': for band in list(data.variables): if 'reflectance' in data[band].long_name: if hasattr(data[band], 'VCST_scale_factor'): scale_factor = data[band].VCST_scale_factor * data[band].bias_correction else: scale_factor = data[band].scale_factor in_data[band] = (('number_of_lines', 'number_of_pixels'), data[band].values * scale_factor / np.cos(in_data.solar_zenith.values*_DTR)) elif 'radiance' in data[band].long_name: in_data[band] = (('number_of_lines', 'number_of_pixels'), data[f'{band}_brightness_temperature_lut'].values[data[band].values]) else: pass relazi = relative_azimuth_angle(in_data.sensor_azimuth.values, in_data.solar_azimuth.values) sunglint = sun_glint_angle(in_data.sensor_zenith.values, in_data.solar_zenith.values, relazi) scatt_angle = scattering_angle(in_data.solar_zenith.values, in_data.sensor_zenith.values, relazi) in_data['relative_azimuth'] = (('number_of_lines', 'number_of_pixels'), relazi) in_data['sunglint_angle'] = (('number_of_lines', 'number_of_pixels'), sunglint) in_data['scattering_angle'] = (('number_of_lines', 'number_of_pixels'), scatt_angle) return in_data def relative_azimuth_angle(sensor_azimuth: np.ndarray, solar_azimuth: np.ndarray) -> np.ndarray: rel_azimuth = np.abs(180. - np.abs(sensor_azimuth - solar_azimuth)) return rel_azimuth def sun_glint_angle(sensor_zenith: np.ndarray, solar_zenith: np.ndarray, rel_azimuth: np.ndarray) -> np.ndarray: cossna = (np.sin(sensor_zenith*_DTR) * np.sin(solar_zenith*_DTR) * np.cos(rel_azimuth*_DTR) + np.cos(sensor_zenith*_DTR) * np.cos(solar_zenith*_DTR)) cossna[cossna > 1] = 1 sunglint_angle = np.arccos(cossna) * _RTD return sunglint_angle def scattering_angle(solar_zenith: np.ndarray, sensor_zenith: np.ndarray, relative_azimuth: np.ndarray) -> np.ndarray: cos_scatt_angle = -1. * (np.cos(solar_zenith*_DTR) * np.cos(sensor_zenith*_DTR) - np.sin(solar_zenith*_DTR) * np.sin(sensor_zenith*_DTR) * np.cos(relative_azimuth*_DTR)) scatt_angle = np.arccos(cos_scatt_angle) * _RTD return scatt_angle def correct_reflectances(): pass def read_ancillary_data(file_names: str, viirs_data: xr.Dataset, resolution: int = 1) -> xr.Dataset: # Ancillary files temporarily defined here. Eventually we will find a better way to pass these start_time = '2022-06-22 14:54:00.000' anc_dir = file_names['ANC_DIR'] sst_file = file_names['SST'] ndvi_file = file_names['NDVI'] geos_cnst = file_names['GEOS_constants'] geos_lnd = file_names['GEOS_land'] geos_ocn = file_names['GEOS_ocean'] geos1 = file_names['GEOS_atm_1'] geos2 = file_names['GEOS_atm_2'] dim1 = viirs_data.latitude.shape[0] dim2 = viirs_data.latitude.shape[1] sst = np.empty((dim1*dim2, ), dtype=np.float32) ndvi = np.empty((dim1*dim2, ), dtype=np.float32) eco = np.empty((dim1*dim2, ), dtype=np.ubyte) geos_data = {'tpw': np.empty((dim1*dim2, ), dtype=np.float32), 'snowfr': np.empty((dim1*dim2, ), dtype=np.float32), 'icefr': np.empty((dim1*dim2, ), dtype=np.float32), 'ocnfr': np.empty((dim1*dim2, ), dtype=np.float32), 'landicefr': np.empty((dim1*dim2, ), dtype=np.float32), 'sfct': np.empty((dim1*dim2, ), dtype=np.float32), } sst = anc.py_get_Reynolds_SST(viirs_data.latitude.values.reshape((dim1*dim2, )), viirs_data.longitude.values.reshape((dim1*dim2, )), resolution, anc_dir, sst_file, sst) print('SST read successfully') ndvi = anc.py_get_NDVI_background(viirs_data.latitude.values.reshape((dim1*dim2, )), viirs_data.longitude.values.reshape((dim1*dim2, )), resolution, anc_dir, ndvi_file, ndvi) print('NDVI read successfully') eco = anc.py_get_Olson_eco(viirs_data.latitude.values.reshape((dim1*dim2, )), viirs_data.longitude.values.reshape((dim1*dim2, )), resolution, anc_dir, eco) print('Olson eco read successfully') geos_data = anc.py_get_GEOS(viirs_data.latitude.values.reshape((dim1*dim2, )), viirs_data.longitude.values.reshape((dim1*dim2, )), resolution, start_time, anc_dir, geos1, geos2, geos_lnd, geos_ocn, geos_cnst, geos_data) print('GEOS5 data read successfully') data = viirs_data data['sst'] = (('number_of_lines', 'number_of_pixels'), sst.reshape((dim1, dim2))) data['ndvi'] = (('number_of_lines', 'number_of_pixels'), ndvi.reshape((dim1, dim2))) data['eco'] = (('number_of_lines', 'number_of_pixels'), eco.reshape((dim1, dim2))) data['geos_tpw'] = (('number_of_lines', 'number_of_pixels'), geos_data['tpw'].reshape((dim1, dim2))) data['geos_snowfr'] = (('number_of_lines', 'number_of_pixels'), geos_data['snowfr'].reshape((dim1, dim2))) data['geos_icefr'] = (('number_of_lines', 'number_of_pixels'), geos_data['icefr'].reshape((dim1, dim2))) data['geos_ocnfr'] = (('number_of_lines', 'number_of_pixels'), geos_data['ocnfr'].reshape((dim1, dim2))) data['geos_landicefr'] = (('number_of_lines', 'number_of_pixels'), geos_data['landicefr'].reshape((dim1, dim2))) data['geos_sfct'] = (('number_of_lines', 'number_of_pixels'), geos_data['sfct'].reshape((dim1, dim2))) return data def get_data(file_names: Dict[str, str], sunglint_angle: float, hires: bool = False) -> xr.Dataset: mod02 = file_names['MOD02'] mod03 = file_names['MOD03'] img02 = file_names['IMG02'] img03 = file_names['IMG03'] if hires is False: viirs_data = read_data('viirs', f'{mod02}', f'{mod03}') viirs_data = read_ancillary_data(file_names, viirs_data) m01 = viirs_data.M05.values m02 = viirs_data.M07.values r1 = 2.0 * (np.power(m02, 2.0) - np.power(m01, 2.0)) + (1.5 * m02) + (0.5 * m01) r2 = m02 + m01 + 0.5 r3 = r1 / r2 gemi = r3 * (1.0 - 0.25*r3) - ((m01 - 0.125) / (1.0 - m01)) # Compute channel differences and ratios that are used in the tests viirs_data['M14-M15'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M14.values - viirs_data.M15.values) viirs_data['M15-M16'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M15.values - viirs_data.M16.values) viirs_data['M15-M12'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M15.values - viirs_data.M12.values) viirs_data['M13-M16'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M13.values - viirs_data.M16.values) viirs_data['M07-M05ratio'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M07.values / viirs_data.M05.values) viirs_data['GEMI'] = (('number_of_lines', 'number_of_pixels'), gemi) # temp value to force the code to work viirs_data['M128'] = (('number_of_lines', 'number_of_pixels'), np.zeros(viirs_data.M01.shape)) else: viirs_data = read_data('viirs', f'{img02}', f'{img03}') viirs_data['M05'] = viirs_data.I01 viirs_data['M07'] = viirs_data.I02 viirs_data = read_ancillary_data(file_names, viirs_data) viirs_data['I05-I04'] = (('number_of_lines_2', 'number_of_pixels_2'), viirs_data.I05.values - viirs_data.I04.values) viirs_data['I02-I01ratio'] = (('number_of_lines_2', 'number_of_pixels_2'), viirs_data.I02.values / viirs_data.I01.values) scene_flags = scn.find_scene(viirs_data, sunglint_angle) scene = scn.scene_id(scene_flags) scene_xr = xr.Dataset() for s in scn._scene_list: scene_xr[s] = (('number_of_lines', 'number_of_pixels'), scene[s]) scene['lat'] = viirs_data.latitude scene['lon'] = viirs_data.longitude data = xr.Dataset(viirs_data, coords=scene_xr) data.drop_vars(['latitude', 'longitude']) return data