import xarray as xr import numpy as np import ancillary_data as anc import scene as scn import os import logging from datetime import datetime as dt from typing import Dict from attrs import define, field, validators _DTR = np.pi/180. _RTD = 180./np.pi _bad_data = -999.0 logging.basicConfig(level=logging.INFO, format='%(name)s - %(levelname)s - %(message)s') # logging.basicConfig(level=logging.INFO, filename='logfile.log', 'filemode='w', # format='%(name)s %(levelname)s %(message)s') @define(kw_only=True, slots=True) class CollectInputs(object): """Class that collects all input files Parameters ---------- file_name_geo: str name of VIIRS geolocation file file_name_l1b: str name of VIIRS L1b file ancillary_dir: str path for the ancillary files. (this is necessary because the C functions still require path and file name separately) sst_file: str file name for the Reynolds SST ndvi_file: str file name for the NDVI hdf4 file geos_file_before: str file name of the GEOS-5 containing atmospheric profiles before the VIIRS timestamp geos_file_after: str file name of the GEOS-5 containing atmospheric profiles after the VIIRS timestamp geos_land: str file name of the GEOS-5 file that has the land ice/snow information geos_ocean: str file name of the GEOS-5 file that has the ocean information geos_constants: str file name for the GEOS-5 constans dims [optional]: str name of the dimensions for the arrays in the xarray.Dataset output """ file_name_geo: str = field([validators.of_instance(str), ]) file_name_l1b: str = field([validators.of_instance(str), ]) ancillary_dir: str = field([validators.of_instance(str), ]) sst_file: str = field([validators.of_instance(str), ]) ndvi_file: str = field([validators.of_instance(str), ]) geos_file_1: str = field([validators.of_instance(str), ]) geos_file_2: str = field([validators.of_instance(str), ]) geos_land: str = field([validators.of_instance(str), ]) geos_ocean: str = field([validators.of_instance(str), ]) geos_constants: str = field([validators.of_instance(str), ]) dims: tuple = field(default=('number_of_lines', 'number_of_pixels'), validator=[validators.instance_of(tuple), ]) @define(slots=True, kw_only=True) class ReadData(CollectInputs): """Class that reads the L1b/geolocation data from VIIRS. Inherits file names from CollectInputs Parameters ---------- satellite: str satellite name. sensor: str sensor name """ satellite: str = field(validator=[validators.instance_of(str), validators.in_(['snpp'])]) sensor: str = field(validator=[validators.instance_of(str), validators.in_(['viirs'])]) logging.debug('Instance of ReadData created') def read_viirs_geo(self) -> xr.Dataset: """Read VIIRS geolocation data and generate additional angles Parameters ---------- self.file_name_geo: str file name of the geolocation file Returns ------- geo_data xarray.Dataset dataset containing all geolocation data """ logging.debug(f'Reading {self.file_name_geo}') geo_data = xr.open_dataset(self.file_name_geo, group='geolocation_data') relazi = self.relative_azimuth_angle(geo_data.sensor_azimuth.values, geo_data.solar_azimuth.values) sunglint = self.sun_glint_angle(geo_data.sensor_zenith.values, geo_data.solar_zenith.values, relazi) scatt_angle = self.scattering_angle(geo_data.solar_zenith.values, geo_data.sensor_zenith.values, relazi) geo_data['relative_azimuth'] = (self.dims, relazi) geo_data['sunglint_angle'] = (self.dims, sunglint) geo_data['scattering_angle'] = (self.dims, scatt_angle) logging.debug('Geolocation file read correctly') return geo_data def read_viirs_l1b(self, solar_zenith: np.ndarray(float)) -> xr.Dataset: """Read VIIRS L1b data Parameters ---------- self.file_name_l1b: str file name of the L1b file solar_zenith: np.ndarray solar zenith angle derived from the geolocation file """ logging.debug(f'Reading {self.file_name_l1b}') l1b_data = xr.open_dataset(self.file_name_l1b, group='observation_data', decode_cf=False) rad_data = xr.Dataset() for band in list(l1b_data.variables): if 'reflectance' in l1b_data[band].long_name: if hasattr(l1b_data[band], 'VCST_scale_factor'): scale_factor = l1b_data[band].VCST_scale_factor * l1b_data[band].bias_correction else: scale_factor = l1b_data[band].scale_factor rad_data[band] = (self.dims, l1b_data[band].values * scale_factor / np.cos(solar_zenith*_DTR)) elif 'radiance' in l1b_data[band].long_name: bt_lut = f'{band}_brightness_temperature_lut' rad_data[band] = (self.dims, l1b_data[bt_lut].values[l1b_data[band].values]) else: pass logging.debug('L1b file read correctly') return rad_data def relative_azimuth_angle(sensor_azimuth: np.ndarray(float), solar_azimuth: np.ndarray(float)) -> np.ndarray(float): """Computation of the relative azimuth angle Parameters ---------- sensor_azimuth: np.ndarray sensor azimuth angle from the geolocation file solar_azimuth: np.ndarray solar azimuth angle from the geolocation file Returns ------- relative_azimuth: np.ndarray """ rel_azimuth = np.abs(180. - np.abs(sensor_azimuth - solar_azimuth)) logging.debug('Relative azimuth calculated successfully.') return rel_azimuth def sun_glint_angle(sensor_zenith: np.ndarray(float), solar_zenith: np.ndarray(float), rel_azimuth: np.ndarray(float)) -> np.ndarray(float): """Computation of the sun glint angle Parameters ---------- sensor_zenith: np.ndarray sensor zenith angle from the geolocation file solar_zenith: np.ndarray solar zenith angle from the geolocation file relative_azimuth: np.ndarray relative azimuth computed from function relative_azimuth_angle() Returns ------- sunglint_angle: np.ndarray """ cossna = (np.sin(sensor_zenith*_DTR) * np.sin(solar_zenith*_DTR) * np.cos(rel_azimuth*_DTR) + np.cos(sensor_zenith*_DTR) * np.cos(solar_zenith*_DTR)) cossna[cossna > 1] = 1 sunglint_angle = np.arccos(cossna) * _RTD logging.debug('Sunglint generated') return sunglint_angle def scattering_angle(solar_zenith: np.ndarray(float), sensor_zenith: np.ndarray(float), relative_azimuth: np.ndarray(float)) -> np.ndarray(float): """Computation of the scattering angle Parameters ---------- solar_zenith: np.ndarray solar zenith angle from the geolocation file sensor_zenith: np.ndarray sensor zenith angle angle from the geolocation file relative_azimuth: np.ndarray relative azimuth computed from function relative_azimuth_angle() Returns ------- scattering_angle: np.ndarray """ cos_scatt_angle = -1. * (np.cos(solar_zenith*_DTR) * np.cos(sensor_zenith*_DTR) - np.sin(solar_zenith*_DTR) * np.sin(sensor_zenith*_DTR) * np.cos(relative_azimuth*_DTR)) scatt_angle = np.arccos(cos_scatt_angle) * _RTD logging.debug('Scattering angle calculated successfully') return scatt_angle @define(kw_only=True, slots=True) class ReadAncillary(CollectInputs): """Class tha processes all the ancillary files and makes them available for the MVCM" Parameters ---------- latitude: np.ndarray(float) array of latitudes for the granule that is being processed longitude: np.ndarray(float) array of longitudes for the granule that is being processed resolution: int flag to switch between MOD (1) and IMG (2) resolution """ latitude: np.ndarray(float) = field([validators.of_instance(np.ndarray), ]) longitude: np.ndarray(float) = field([validators.of_instance(np.ndarray), ]) resolution: int = field([validators.of_instance(int), validators.in_([1, 2]), ]) vnp_time = '.'.join(os.path.basename(CollectInputs.file_name_l1b).split('.')[1:3]) start_time = dt.strftime(dt.strptime(vnp_time, 'A%Y%j.%H%M'), '%Y-%m-%d %H:%M:%S.000') out_shape = latitude.shape def get_sst(self): sst = np.empty(self.out_shape, dtype=np.float32) sst = anc.py.get_Reynolds_SST(self.latitude.ravel(), self.longitude.ravel(), self.resolution, self.ancillary_dir, self.sst_file, sst) logging.debug('SST read successfully') return sst.reshape(self.out_shape) def get_ndvi(self): ndvi = np.empty(self.out_shape, dtype=np.float32) ndvi = anc.py_get_NDVI_background(self.latitude.ravel(), self.longitude.ravel(), self.resolution, self.ancillary_dir, self.ndvi_file, ndvi) logging.debug('NDVI read successfully') return ndvi.reshape(self.out_shape) def get_eco(self): pass def get_geos(self): geos_variables = ['tpw', 'snow_fraction', 'ice_fraction', 'ocean_fraction', 'land_ice_fraction', 'surface_temperature'] geos_data = {var: np.empty(self.out_shape, dtype=np.float32) for var in geos_variables} geos_data = anc.py_get_GEOS(self.latitude.ravel(), self.longitude.ravel(), self.resolution, self.start_time, self.ancillary_dir, self.geos_file_before, self.geos_file_after, self.geos_land, self.geos_ocean, self.geos_constants, geos_data) for var in list(geos_variables): geos_data[var] = geos_data[var].reshape(self.out_shape) logging.debug('GEOS data read successfully') return geos_data def correct_reflectances(): pass def read_ancillary_data(file_names: str, latitude: np.ndarray, longitude: np.ndarray, resolution: int = 1) -> xr.Dataset: """Read ancillary data using C functions from original MVCM Parameters ---------- file_names: str latitude: np.ndarray longitude: np.ndarray resolution: int Returns ------- data: xarray.Dataset """ # Ancillary files temporarily defined here. Eventually we will find a better way to pass these anc_dir = file_names['ANC_DIR'] sst_file = file_names['SST'] ndvi_file = file_names['NDVI'] geos_cnst = file_names['GEOS_constants'] geos_lnd = file_names['GEOS_land'] geos_ocn = file_names['GEOS_ocean'] geos1 = file_names['GEOS_atm_1'] geos2 = file_names['GEOS_atm_2'] vnptime = '.'.join(os.path.basename(file_names['MOD02']).split('.')[1:3]) start_time = dt.strftime(dt.strptime(vnptime, 'A%Y%j.%H%M'), '%Y-%m-%d %H:%M:%S.000') out_shape = latitude.shape sst = np.empty((np.prod(out_shape), ), dtype=np.float32) ndvi = np.empty((np.prod(out_shape), ), dtype=np.float32) eco = np.empty((np.prod(out_shape), ), dtype=np.ubyte) geos_data = {'tpw': np.empty((np.prod(out_shape), ), dtype=np.float32), 'snowfr': np.empty((np.prod(out_shape), ), dtype=np.float32), 'icefr': np.empty((np.prod(out_shape), ), dtype=np.float32), 'ocnfr': np.empty((np.prod(out_shape), ), dtype=np.float32), 'landicefr': np.empty((np.prod(out_shape), ), dtype=np.float32), 'sfct': np.empty((np.prod(out_shape), ), dtype=np.float32), } sst = anc.py_get_Reynolds_SST(latitude.ravel(), longitude.ravel(), resolution, anc_dir, sst_file, sst) logging.info('SST read successfully') ndvi = anc.py_get_NDVI_background(latitude.ravel(), longitude.ravel(), resolution, anc_dir, ndvi_file, ndvi) logging.info('NDVI read successfully') eco = anc.py_get_Olson_eco(latitude.ravel(), longitude.ravel(), resolution, anc_dir, eco) logging.info('Olson eco read successfully') geos = anc.py_get_GEOS(latitude.ravel(), longitude.ravel(), resolution, start_time, anc_dir, geos1, geos2, geos_lnd, geos_ocn, geos_cnst, geos_data) logging.info('GEOS5 data read successfully') ancillary = {'sst': sst.reshape(out_shape), 'ndvi': ndvi.reshape(out_shape), 'eco': eco.reshape(out_shape) } for var in list(geos): ancillary[f'geos_{var}'] = geos[var].reshape(out_shape) dims = ('number_of_lines', 'number_of_pixels') data = xr.Dataset.from_dict({var: {'dims': dims, 'data': ancillary[var]} for var in list(ancillary)}) return data def get_data(satellite: str, sensor: str, file_names: Dict[str, str], sunglint_angle: float, hires: bool = False) -> xr.Dataset: mod02 = file_names['MOD02'] mod03 = file_names['MOD03'] if hires is True: img02 = file_names['IMG02'] img03 = file_names['IMG03'] if hires is False: viirs_data = read_viirs_data(sensor, f'{mod02}', f'{mod03}') viirs_data = read_ancillary_data(file_names, viirs_data) if (('M05' in viirs_data) and ('M07' in viirs_data)): m01 = viirs_data.M05.values m02 = viirs_data.M07.values r1 = 2.0 * (np.power(m02, 2.0) - np.power(m01, 2.0)) + (1.5 * m02) + (0.5 * m01) r2 = m02 + m01 + 0.5 r3 = r1 / r2 gemi = r3 * (1.0 - 0.25*r3) - ((m01 - 0.125) / (1.0 - m01)) else: gemi = np.full((viirs_data.M15.shape), _bad_data) if 'M05' in viirs_data: idx = np.nonzero((viirs_data.M05.values < -99) | (viirs_data.M05.values > 2)) viirs_data['M05'].values[idx] = _bad_data else: viirs_data['M05'] = (('number_of_lines', 'number_of_pixels'), np.full(viirs_data.M15.shape, _bad_data)) if 'M07' in viirs_data: idx = np.nonzero((viirs_data.M07.values < -99) | (viirs_data.M07.values > 2)) viirs_data['M07'].values[idx] = _bad_data else: viirs_data['M07'] = (('number_of_lines', 'number_of_pixels'), np.full(viirs_data.M15.shape, _bad_data)) idx = np.nonzero((viirs_data.M12.values < 0) | (viirs_data.M12.values > 1000)) viirs_data['M12'].values[idx] = _bad_data idx = np.nonzero((viirs_data.M13.values < 0) | (viirs_data.M13.values > 1000)) viirs_data['M13'].values[idx] = _bad_data idx = np.nonzero((viirs_data.M14.values < 0) | (viirs_data.M14.values > 1000)) viirs_data['M14'].values[idx] = _bad_data idx = np.nonzero((viirs_data.M15.values < 0) | (viirs_data.M15.values > 1000)) viirs_data['M15'].values[idx] = _bad_data idx = np.nonzero((viirs_data.M16.values < 0) | (viirs_data.M16.values > 1000)) viirs_data['M16'].values[idx] = _bad_data # Compute channel differences and ratios that are used in the tests viirs_data['M15-M13'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M15.values - viirs_data.M13.values) viirs_data['M14-M15'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M14.values - viirs_data.M15.values) viirs_data['M15-M16'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M15.values - viirs_data.M16.values) viirs_data['M15-M12'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M15.values - viirs_data.M12.values) viirs_data['M13-M16'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M13.values - viirs_data.M16.values) viirs_data['M07-M05ratio'] = (('number_of_lines', 'number_of_pixels'), viirs_data.M07.values / viirs_data.M05.values) viirs_data['GEMI'] = (('number_of_lines', 'number_of_pixels'), gemi) # temp value to force the code to work viirs_data['M128'] = (('number_of_lines', 'number_of_pixels'), np.zeros(viirs_data.M15.shape)) else: viirs_data = read_viirs_data('viirs', f'{img02}', f'{img03}') viirs_data['M05'] = viirs_data.I01 viirs_data['M07'] = viirs_data.I02 idx = np.nonzero((viirs_data.M05.values < -99) | (viirs_data.M05.values > 2)) viirs_data['M05'].values[idx] = _bad_data idx = np.nonzero((viirs_data.M07.values < -99) | (viirs_data.M07.values > 2)) viirs_data['M07'].values[idx] = _bad_data idx = np.nonzero((viirs_data.I01.values < 0) | (viirs_data.I01.values > 1000)) viirs_data['I01'].values[idx] = _bad_data idx = np.nonzero((viirs_data.I02.values < 0) | (viirs_data.I02.values > 1000)) viirs_data['I02'].values[idx] = _bad_data idx = np.nonzero((viirs_data.I03.values < 0) | (viirs_data.I03.values > 1000)) viirs_data['I03'].values[idx] = _bad_data idx = np.nonzero((viirs_data.I04.values < 0) | (viirs_data.I04.values > 1000)) viirs_data['I04'].values[idx] = _bad_data idx = np.nonzero((viirs_data.I05.values < 0) | (viirs_data.I05.values > 1000)) viirs_data['I05'].values[idx] = _bad_data viirs_data = read_ancillary_data(file_names, viirs_data, resolution=2) viirs_data['I05-I04'] = (('number_of_lines_2', 'number_of_pixels_2'), viirs_data.I05.values - viirs_data.I04.values) viirs_data['I02-I01ratio'] = (('number_of_lines_2', 'number_of_pixels_2'), viirs_data.I02.values / viirs_data.I01.values) scene_flags = scn.find_scene(viirs_data, sunglint_angle) scene = scn.scene_id(scene_flags) scene_xr = xr.Dataset() for s in scn._scene_list: scene_xr[s] = (('number_of_lines', 'number_of_pixels'), scene[s]) scene['lat'] = viirs_data.latitude scene['lon'] = viirs_data.longitude data = xr.Dataset(viirs_data, coords=scene_xr) data.drop_vars(['latitude', 'longitude']) return data def get_data_new(satellite: str, sensor: str, file_names: Dict[str, str], sunglint_angle: float, hires: bool = False) -> xr.Dataset: """Reads satellite and ancillary data and packs everything in an xarray dataset for the main function """ mod02 = file_names['MOD02'] mod03 = file_names['MOD03'] viirs_data = read_viirs_data(sensor, f'{mod02}', f'{mod03}') ancillary_data = read_ancillary_data(file_names, viirs_data.latitude.values, viirs_data.longitude.values)