import numpy as np import xarray as xr from numpy.lib.stride_tricks import sliding_window_view from typing import Dict import functools import utils import conf import conf_xr import preprocess_thresholds as pt import importlib _DTR = np.pi/180 # this is used for testing, eventually we want to remove it importlib.reload(pt) # ############## GROUP 1 TESTS ############## # def test_11um_old(rad, threshold): radshape = rad.shape rad = rad.reshape( thr = np.array(threshold['bt11']) confidence = np.zeros(rad.shape) if thr[4] == 1: print("11um test running") # the C code has the line below that I don't quite understand the purpose of. # It seems to be setting the bit to 0 if the BT value is greater than the midpoint # # if (m31 >= dobt11[1]) (void) set_bit(13, pxout.testbits); # confidence = utils.conf_test(rad, thr) confidence = conf.conf_test(rad, thr) return confidence.reshape(radshape) def test_11um_var(rad, threshold, var_threshold): print("11um variability test running") thr = np.array(threshold['11um_var']) radshape = rad.shape var = np.zeros((radshape[0], radshape[1], 9)) # chk_spatial2() need to figure out what this is # np = rg_var.num_small_diffs * 1.0 test = sliding_window_view(np.pad(rad, [1, 1], mode='constant'), (3, 3)) - np.expand_dims(rad, (2, 3)) var[np.abs(test).reshape(radshape[0], radshape[1], 9) < var_threshold['dovar11']] = 1 var = var.sum(axis=2).reshape( rad = rad.reshape( confidence = np.zeros(rad.shape) confidence[var == 9] = conf.conf_test(rad[var == 9], thr) return confidence.reshape(radshape) def test_11_4diff(rad1, rad2, threshold, viirs_data, sg_thresh): print("11um - 4um difference test running") radshape = rad1.shape raddiff = (rad1 - rad2).reshape( day = np.zeros(radshape) day[viirs_data.solar_zenith <= 85] = 1 day = day.reshape(raddiff.shape) sunglint = np.zeros(rad1.shape) sunglint[viirs_data.sunglint_angle <= sg_thresh] = 1 sunglint = sunglint.reshape(raddiff.shape) thr = np.array(threshold['test11_4lo']) confidence = np.zeros(raddiff.shape) # confidence[(day == 1) & (sunglint == 0)] = utils.conf_test(raddiff[(day == 1) & (sunglint == 0)], thr) confidence[(day == 1) & (sunglint == 0)] = conf.conf_test(raddiff[(day == 1) & (sunglint == 0)], thr) return confidence.reshape(radshape) def vir_refl_test(rad, threshold, viirs_data): print('Visible reflectance test running') thr = threshold['vis_refl_test'] radshape = rad.shape() rad = rad.reshape( confidence = np.zeros(radshape) vzcpow = 0.75 # THIS NEEDS TO BE READ FROM THE THRESHOLDS FILE vza = viirs_data.sensor_zenith.values dtr = np.pi/180 cosvza = np.cos(vza*dtr) coeffs = utils.get_b1_thresholds() coeffs[:, :3] = coeffs[:, :3] * threshold['b1_bias_adj'] # this quantity is the return of get_b1_thresholds() in the C code # it's defined here to keep a consistent logic with the original source, for now irtn = 0 if irtn != 0: coeffs = thr coeffs[:, :3] = coeffs[:, :3] * 1/np.power(cosvza, vzcpow) confidence = conf.conf_test(rad, coeffs) return confidence.reshape(radshape) def nir_refl_test(rad, threshold, sunglint_thresholds, viirs_data): print("NIR reflectance test running") sza = viirs_data.solar_zenith.values refang = viirs_data.sunglint_angle.values vza = viirs_data.sensor_zenith.values dtr = np.pi/180 # Keep in mind that band_n uses MODIS band numbers (i.e. 2=0.86um and 7=2.1um) # For VIIRS would be 2=M07 (0.865um) and 7=M11 (2.25um) band_n = 2 vzcpow = 0.75 # THIS NEEDS TO BE READ FROM THE THRESHOLDS FILE radshape = rad.shape rad = rad.reshape( confidence = np.zeros(rad.shape) sza = sza.reshape(rad.shape) vza = vza.reshape(rad.shape) refang = refang.reshape(rad.shape) sunglint_flag = utils.sunglint_scene(refang, sunglint_thresholds).reshape(rad.shape) # ref2 [5] # b2coeffs [4] # b2mid [1] # b2bias_adj [1] # b2lo [1] # vzcpow [3] (in different place) cosvza = np.cos(vza*dtr) coeffs = threshold['b2coeffs'] hicut0 = np.array(coeffs[0] + coeffs[1]*sza + coeffs[2]*np.power(sza, 2) + coeffs[3]*np.power(sza, 3)) hicut0 = (hicut0 * 0.01) + threshold['b2adj'] hicut0 = hicut0 * threshold['b2bias_adj'] midpt0 = hicut0 + (threshold['b2mid'] * threshold['b2bias_adj']) locut0 = midpt0 + (threshold['b2lo'] * threshold['b2bias_adj']) thr = np.array([locut0, midpt0, hicut0, threshold['ref2'][3]*np.ones(rad.shape)]) # corr_thr = np.zeros((4, 4)) corr_thr = np.zeros((4, rad.shape[0])) corr_thr[:3, sunglint_flag == 0] = thr[:3, sunglint_flag == 0] * (1./np.power(cosvza[sunglint_flag == 0], vzcpow)) corr_thr[3, sunglint_flag == 0] = thr[3, sunglint_flag == 0] for flag in range(1, 4): if len(refang[sunglint_flag == flag]) > 0: sunglint_thr = utils.get_sunglint_thresholds(refang, sunglint_thresholds, band_n, flag, thr) corr_thr[:3, sunglint_flag == flag] = sunglint_thr[:3, sunglint_flag == flag] * (1./np.power(cosvza[sunglint_flag == flag], vzcpow)) corr_thr[3, sunglint_flag == flag] = sunglint_thr[3, sunglint_flag == flag] confidence = conf.conf_test(rad, corr_thr) return confidence.reshape(radshape) def vis_nir_ratio_test(rad1, rad2, threshold, sg_threshold): print("NIR-Visible ratio test running") if threshold['vis_nir_ratio'][6] == 1: radshape = rad1.shape rad1 = rad1.reshape( rad2 = rad2.reshape( vrat = rad2/rad1 thresh = np.zeros((7,)) # temp value to avoid linter bitching at me # eventually we would have the test run in two blocks as: # confidence[sunglint == 1] = conf.conf_test_dble(vrat[sunglint == 1], sg_threshold['snglnt']) # confidence[sunglint == 0] = conf.conf_test_dble(vrat[sunglint == 0], threshold['vis_nir_ratio']) # sunglint needs to be defined somewhere sunglint = 0 if sunglint: thresh = threshold['snglnt'] else: thresh = threshold['vis_nir_ratio'] confidence = conf.conf_test_dble(vrat, thresh) return confidence.reshape(radshape) # old class, doesn't use xarray much class CloudTests_old: def __init__(self, scene_ids, scene_name, thresholds): self.scene = scene_ids self.scene_name = scene_name self.idx = np.where(scene_ids[scene_name] == 1) self.threshold = thresholds[scene_name] def single_threshold_test(self, test_name, rad, cmin): radshape = rad.shape rad = rad.reshape( thr = np.array(self.threshold[test_name]) confidence = np.zeros(radshape) if thr[4] == 1: print('test running') confidence[self.idx] = conf.conf_test(rad[self.idx], thr) cmin[self.idx] = np.minimum(cmin[self.idx], confidence[self.idx]) return cmin def double_threshold_test(self): pass # new class to try to use xarray more extensively class CloudTests(object): def __init__(self, data: xr.Dataset, scene_name: str, thresholds: Dict) -> None: = data self.scene_name = scene_name self.thresholds = thresholds self.scene_idx = tuple(np.nonzero(data[scene_name] == 1)) def run_if_test_exists_for_scene(func): @functools.wraps(func) def wrapper(self, *args, test_name, **kwargs): if test_name not in self.thresholds[self.scene_name]: print('Not running test for this scene') # returns cmin. This could be changed into a keyworded argument for readability return args[-1] return func(self, *args, test_name, **kwargs) return wrapper @run_if_test_exists_for_scene def test_11um(self, band: str, cmin: np.ndarray, test_name: str = '11um_Test') -> np.ndarray: confidence = np.ones([band].shape) threshold = self.thresholds[self.scene_name][test_name] if threshold['perform'] is True: print(f'Testing "{self.scene_name}"\n') rad =[band].values[self.scene_idx] confidence[self.scene_idx] = conf.conf_test_new(rad, threshold['thr']) cmin = np.fmin(cmin, confidence) return cmin @run_if_test_exists_for_scene def sst_test(self, band31: str, band32: str, cmin: np.ndarray, test_name: str = 'SST_Test') -> np.ndarray: confidence = np.ones( threshold = self.thresholds[self.scene_name][test_name] if threshold['perform'] is True: m31 =[band31].values - 273.16 bt_diff =[band31].values -[band32].values sst = - 273.16 cosvza = np.cos(*_DTR) c = threshold['coeffs'] modsst = 273.16 + c[0] + c[1]*m31 + c[2]*bt_diff*sst + c[3]*bt_diff*((1/cosvza) - 1) sfcdif = - modsst print(f'Testing "{self.scene_name}"\n') confidence[self.scene_idx] = conf.conf_test_new(sfcdif[self.scene_idx], threshold['thr']) cmin = np.fmin(cmin, confidence) return cmin @run_if_test_exists_for_scene def bt_diff_86_11um(self, band: str, cmin: np.ndarray, test_name: str = '8.6-11um_Test') -> np.ndarray: confidence = np.ones( threshold = self.thresholds[self.scene_name][test_name] if threshold['perform'] is True: print(f'Testing "{self.scene_name}"\n') rad =[band].values[self.scene_idx] confidence[self.scene_idx] = conf.conf_test_new(rad, threshold['thr']) cmin = np.fmin(cmin, confidence) return cmin def run_tests_temp(self): cmin_G1 = np.ones( cmin_G2 = np.ones( # Group 1 cmin_G1 = self.test_11um('M15', cmin_G1, test_name='11um_Test') cmin_G1 = self.sst_test('M15', 'M16', cmin_G1, test_name='SST_Test') # Group 2 cmin_G2 = self.bt_diff_86_11um('M14-M15', cmin_G2, '8.6-11um_Test') cmin = cmin_G1 * cmin_G2 return cmin def single_threshold_test(self, test_name, band, cmin): if band == 'bad_data': return cmin print(f'Running test "{test_name}" for "{self.scene_name}"') # preproc_thresholds() if 'thr' in self.thresholds[self.scene_name][test_name]: thr = np.array(self.thresholds[self.scene_name][test_name]['thr']) else: thr = np.array(self.thresholds[self.scene_name][test_name]) thr_xr = xr.Dataset() if test_name == '11-12um_Cirrus_Test': thr_xr['threshold'] = pt.preproc(, self.thresholds[self.scene_name], self.scene_name) thr = np.ones((5,)) # This is only temporary to force the logic of the code # I need to find a better solution at some point elif test_name == 'SST_Test': thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), np.ones(([band].shape[0],[band].shape[1], 5))*thr) elif test_name == '7.3-11um_BTD_Mid_Level_Cloud_Test': thr_xr['threshold'] = pt.get_pn_thresholds(, self.thresholds, self.scene_name, '7.3-11um_BTD_Mid_Level_Cloud_Test') thr = np.ones((5,)) elif test_name == 'Surface_Temperature_Test': thr_xr['threshold'] = pt.preproc_surf_temp(, self.thresholds[self.scene_name]) thr = np.ones((5,)) elif (test_name == '11-4um_Oceanic_Stratus_Test' and self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert', 'Polar_Day_Desert_Coast']): thr = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)]) elif test_name == 'NIR_Reflectance_Test': corr_thr = pt.preproc_nir(, self.thresholds, self.scene_name) thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), corr_thr) elif test_name == 'Visible_Reflectance_Test': thr_xr['threshold'],['M128'] = pt.vis_refl_thresholds(, self.thresholds, self.scene_name) elif test_name == '1.6_2.1um_NIR_Reflectance_Test': corr_thr = pt.nir_refl(, self.thresholds, self.scene_name) thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), corr_thr) thr = np.ones((5,)) elif test_name == '4-12um_BTD_Thin_Cirrus_Test': thr_xr['threshold'] = pt.get_pn_thresholds(, self.thresholds, self.scene_name, '4-12um_BTD_Thin_Cirrus_Test') thr = np.ones((5,)) elif test_name == 'GEMI_Test': thr_xr['threshold'] = pt.GEMI_test(, self.thresholds, self.scene_name) thr = np.ones((5,)) elif (test_name == '1.38um_High_Cloud_Test' and self.scene_name in ['Ocean_Day', 'Polar_Ocean_Day']): thr_xr['threshold'] = pt.test_1_38um_preproc(, self.thresholds, self.scene_name) thr = np.ones((5,)) else: thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), np.ones(([band].shape[0],[band].shape[1], 5))*thr) data = xr.Dataset(, coords=thr_xr) if test_name == 'SST_Test': data['sfcdif'] = (('number_of_lines', 'number_of_pixels'), pt.preproc_sst(data, self.thresholds[self.scene_name][test_name]).values) band = 'sfcdif' if test_name == '11um_Variability_Test': var = pt.var_11um(, self.thresholds) data['11um_var'] = data.M15 data['11um_var'].values[var != 9] = np.nan if thr[4] == 1: print('test running...') confidence = conf_xr.conf_test(data, band) cmin = np.fmin(cmin, confidence) return cmin def double_threshold_test(self, test_name, band, cmin): data = if test_name == '11-4um_BT_Difference_Test': thr = pt.bt11_4um_preproc(, self.thresholds, self.scene_name) print('test running...') confidence = conf.conf_test_dble(data['M15-M13'].values, thr) confidence = confidence.reshape(data.M01.shape) if test_name == 'Vis/NIR_Ratio_Test': print('test running...') thr_no_sunglint = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)]) thr_sunglint = np.array([self.thresholds['Sun_Glint']['snglnt'][i] for i in range(8)]) vrat = data.M07.values/data.M05.values _dtr = np.pi/180.0 sza = data.sensor_zenith.values raz = data.relative_azimuth.values vza = data.sensor_zenith.values cos_refang = np.sin(vza*_dtr) * np.sin(sza*_dtr) * np.cos(raz*_dtr) + \ np.cos(vza*_dtr) * np.cos(sza*_dtr) refang = np.arccos(cos_refang) * 180./np.pi idx = np.nonzero((data.solar_zenith <= 85) & (refang <= data.sunglint_angle)) confidence = conf.conf_test_dble(vrat, thr_no_sunglint) confidence = confidence.reshape(data.M01.shape) confidence[idx] = conf.conf_test_dble(vrat[idx], thr_sunglint) confidence = confidence.reshape(data.M01.shape) if (test_name == '11-4um_Oceanic_Stratus_Test' and self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert', 'Polar_Day_Desert_Coast']): thr = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)]) print('test running...') confidence = conf.conf_test_dble(data['M15-M16'].values, thr) confidence = confidence.reshape(data.M01.shape) cmin = np.fmin(cmin, confidence) return cmin class ComputeTests(CloudTests): def __init__(self, data: xr.Dataset, scene_name: str, thresholds: Dict) -> None: super().__init__(data, scene_name, thresholds) def run_tests(self): cmin_G1 = np.ones( cmin_G2 = np.ones( # Group 1 cmin_G1 = self.test_11um('M15', cmin_G1, test_name='11um_Test') cmin_G1 = self.sst_test('M15', 'M16', cmin_G1, test_name='SST_Test') # Group 2 cmin_G2 = self.bt_diff_86_11um('M14-M15', cmin_G2, test_name='8.6-11um_Test') cmin = cmin_G1 * cmin_G2 return cmin def preproc_thresholds(thresholds, data): thr = np.array(thresholds) thr_xr = xr.Dataset() thr_xr['tresholds'] = (('number_of_lines', 'number_of_pixels', 'z'), np.ones((data['M01'].shape[0], data['M01'].shape[1], 5))*thr) nl_sfct1 = thresholds['Land_Night']['Surface_Temperature_Test'][0] # nl_sfct2 = thresholds['Land_Night']['Surface_Temperature_Test'][1] # nlsfct_pfm = thresholds['Land_Night']['Surface_Temperature_Test'][2] nl_df1 = thresholds['Land_Night']['Surface_Temperature_Test_difference'][0:2] nl_df2 = thresholds['Land_Night']['Surface_Temperature_Test_difference'][2:] # df1 = data.M15 - data.M16 # df2 = data.M15 - data.M13 thr_xr = thr_xr.where(data.desert != 1, nl_sfct1) thr_xr = thr_xr.where((data['M15-M16'] > nl_df1[0]) | ((data['M15-M16'] < nl_df1[0]) & ((data['M15-M13'] <= nl_df2[0]) | (data['M15-M13'] >= nl_df2[1]))), nl_sfct1[0]) data = xr.Dataset(data, coords=thr_xr) return data def single_threshold_test(test, rad, threshold): radshape = rad.shape rad = rad.reshape( thr = np.array(threshold[test]) confidence = np.zeros(rad.shape) if thr[4] == 1: print(f"{test} test running") # the C code has the line below that I don't quite understand the purpose of. # It seems to be setting the bit to 0 if the BT value is greater than the midpoint # # if (m31 >= dobt11[1]) (void) set_bit(13, pxout.testbits); # confidence = utils.conf_test(rad, thr) confidence = conf.conf_test(rad, thr) return confidence.reshape(radshape) def test(): rad = np.random.randint(50, size=[4, 8]) # coeffs = [5, 42, 20, 28, 15, 35, 1] # coeffs = [20, 28, 5, 42, 15, 35, 1] coeffs = [35, 15, 20, 1, 1] # confidence = conf_test_dble(rad, coeffs) confidence = test_11um(rad, coeffs) print(rad) print('\n') print(confidence) if __name__ == "__main__": test()