Newer
Older
import numpy as np
import xarray as xr
from numpy.lib.stride_tricks import sliding_window_view
from typing import Dict
import conf
import conf_xr
import preprocess_thresholds as pt
import importlib
_DTR = np.pi/180
# this is used for testing, eventually we want to remove it
importlib.reload(pt)
# ############## GROUP 1 TESTS ############## #
def test_11um_old(rad, threshold):
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
thr = np.array(threshold['bt11'])
confidence = np.zeros(rad.shape)
print("11um test running")
# the C code has the line below that I don't quite understand the purpose of.
# It seems to be setting the bit to 0 if the BT value is greater than the midpoint
#
# if (m31 >= dobt11[1]) (void) set_bit(13, pxout.testbits);
# confidence = utils.conf_test(rad, thr)
confidence = conf.conf_test(rad, thr)
return confidence.reshape(radshape)
def test_11um_var(rad, threshold, var_threshold):
print("11um variability test running")
thr = np.array(threshold['11um_var'])
radshape = rad.shape
var = np.zeros((radshape[0], radshape[1], 9))
# chk_spatial2() need to figure out what this is
# np = rg_var.num_small_diffs * 1.0
test = sliding_window_view(np.pad(rad, [1, 1], mode='constant'), (3, 3)) - np.expand_dims(rad, (2, 3))
var[np.abs(test).reshape(radshape[0], radshape[1], 9) < var_threshold['dovar11']] = 1
var = var.sum(axis=2).reshape(np.prod(radshape))
rad = rad.reshape(np.prod(radshape))
confidence = np.zeros(rad.shape)
confidence[var == 9] = conf.conf_test(rad[var == 9], thr)
return confidence.reshape(radshape)
def test_11_4diff(rad1, rad2, threshold, viirs_data, sg_thresh):
print("11um - 4um difference test running")
radshape = rad1.shape
raddiff = (rad1 - rad2).reshape(np.prod(radshape))
day = np.zeros(radshape)
day[viirs_data.solar_zenith <= 85] = 1
day = day.reshape(raddiff.shape)
sunglint = np.zeros(rad1.shape)
sunglint[viirs_data.sunglint_angle <= sg_thresh] = 1
sunglint = sunglint.reshape(raddiff.shape)
thr = np.array(threshold['test11_4lo'])
confidence = np.zeros(raddiff.shape)
# confidence[(day == 1) & (sunglint == 0)] = utils.conf_test(raddiff[(day == 1) & (sunglint == 0)], thr)
confidence[(day == 1) & (sunglint == 0)] = conf.conf_test(raddiff[(day == 1) & (sunglint == 0)], thr)
return confidence.reshape(radshape)
def vir_refl_test(rad, threshold, viirs_data):
print('Visible reflectance test running')
thr = threshold['vis_refl_test']
radshape = rad.shape()
rad = rad.reshape(np.prod(radshape))
confidence = np.zeros(radshape)
vzcpow = 0.75 # THIS NEEDS TO BE READ FROM THE THRESHOLDS FILE
vza = viirs_data.sensor_zenith.values
dtr = np.pi/180
cosvza = np.cos(vza*dtr)
coeffs = utils.get_b1_thresholds()
coeffs[:, :3] = coeffs[:, :3] * threshold['b1_bias_adj']
# this quantity is the return of get_b1_thresholds() in the C code
# it's defined here to keep a consistent logic with the original source, for now
irtn = 0
if irtn != 0:
coeffs = thr
coeffs[:, :3] = coeffs[:, :3] * 1/np.power(cosvza, vzcpow)
confidence = conf.conf_test(rad, coeffs)
return confidence.reshape(radshape)
def nir_refl_test(rad, threshold, sunglint_thresholds, viirs_data):
print("NIR reflectance test running")
sza = viirs_data.solar_zenith.values
refang = viirs_data.sunglint_angle.values
vza = viirs_data.sensor_zenith.values
dtr = np.pi/180
# Keep in mind that band_n uses MODIS band numbers (i.e. 2=0.86um and 7=2.1um)
# For VIIRS would be 2=M07 (0.865um) and 7=M11 (2.25um)
vzcpow = 0.75 # THIS NEEDS TO BE READ FROM THE THRESHOLDS FILE
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
confidence = np.zeros(rad.shape)
sza = sza.reshape(rad.shape)
vza = vza.reshape(rad.shape)
refang = refang.reshape(rad.shape)
sunglint_flag = utils.sunglint_scene(refang, sunglint_thresholds).reshape(rad.shape)
# ref2 [5]
# b2coeffs [4]
# b2mid [1]
# b2bias_adj [1]
# b2lo [1]
# vzcpow [3] (in different place)
cosvza = np.cos(vza*dtr)
coeffs = threshold['b2coeffs']
hicut0 = np.array(coeffs[0] + coeffs[1]*sza + coeffs[2]*np.power(sza, 2) + coeffs[3]*np.power(sza, 3))
hicut0 = (hicut0 * 0.01) + threshold['b2adj']
hicut0 = hicut0 * threshold['b2bias_adj']
midpt0 = hicut0 + (threshold['b2mid'] * threshold['b2bias_adj'])
locut0 = midpt0 + (threshold['b2lo'] * threshold['b2bias_adj'])
thr = np.array([locut0, midpt0, hicut0, threshold['ref2'][3]*np.ones(rad.shape)])
# corr_thr = np.zeros((4, 4))
corr_thr = np.zeros((4, rad.shape[0]))
corr_thr[:3, sunglint_flag == 0] = thr[:3, sunglint_flag == 0] * (1./np.power(cosvza[sunglint_flag == 0], vzcpow))
corr_thr[3, sunglint_flag == 0] = thr[3, sunglint_flag == 0]
for flag in range(1, 4):
if len(refang[sunglint_flag == flag]) > 0:
sunglint_thr = utils.get_sunglint_thresholds(refang, sunglint_thresholds, band_n, flag, thr)
corr_thr[:3, sunglint_flag == flag] = sunglint_thr[:3, sunglint_flag == flag] * (1./np.power(cosvza[sunglint_flag == flag], vzcpow))
corr_thr[3, sunglint_flag == flag] = sunglint_thr[3, sunglint_flag == flag]
confidence = conf.conf_test(rad, corr_thr)
return confidence.reshape(radshape)
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def vis_nir_ratio_test(rad1, rad2, threshold, sg_threshold):
print("NIR-Visible ratio test running")
if threshold['vis_nir_ratio'][6] == 1:
radshape = rad1.shape
rad1 = rad1.reshape(np.prod(radshape))
rad2 = rad2.reshape(np.prod(radshape))
vrat = rad2/rad1
thresh = np.zeros((7,))
# temp value to avoid linter bitching at me
# eventually we would have the test run in two blocks as:
# confidence[sunglint == 1] = conf.conf_test_dble(vrat[sunglint == 1], sg_threshold['snglnt'])
# confidence[sunglint == 0] = conf.conf_test_dble(vrat[sunglint == 0], threshold['vis_nir_ratio'])
# sunglint needs to be defined somewhere
sunglint = 0
if sunglint:
thresh = threshold['snglnt']
else:
thresh = threshold['vis_nir_ratio']
confidence = conf.conf_test_dble(vrat, thresh)
return confidence.reshape(radshape)
# old class, doesn't use xarray much
class CloudTests_old:
def __init__(self, scene_ids, scene_name, thresholds):
self.scene = scene_ids
self.scene_name = scene_name
self.idx = np.where(scene_ids[scene_name] == 1)
self.threshold = thresholds[scene_name]
def single_threshold_test(self, test_name, rad, cmin):
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
thr = np.array(self.threshold[test_name])
confidence = np.zeros(radshape)
if thr[4] == 1:
print('test running')
confidence[self.idx] = conf.conf_test(rad[self.idx], thr)
cmin[self.idx] = np.minimum(cmin[self.idx], confidence[self.idx])
return cmin
def double_threshold_test(self):
pass
# new class to try to use xarray more extensively
class CloudTests(object):
def __init__(self,
data: xr.Dataset,
scene_name: str,
thresholds: Dict) -> None:
self.data = data
self.scene_name = scene_name
self.thresholds = thresholds
self.scene_idx = tuple(np.nonzero(data[scene_name] == 1))
def run_if_test_exists_for_scene(func):
@functools.wraps(func)
def wrapper(self, *args, test_name, **kwargs):
if test_name not in self.thresholds[self.scene_name]:
print('Not running test for this scene')
# returns cmin. This could be changed into a keyworded argument for readability
return args[-1]
return func(self, *args, test_name, **kwargs)
return wrapper
@run_if_test_exists_for_scene
def test_11um(self,
band: str,
cmin: np.ndarray,
test_name: str = '11um_Test') -> np.ndarray:
confidence = np.ones(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if threshold['perform'] is True:
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
confidence[self.scene_idx] = conf.conf_test_new(rad, threshold['thr'])
cmin = np.fmin(cmin, confidence)
return cmin
@run_if_test_exists_for_scene
def sst_test(self,
band31: str,
band32: str,
cmin: np.ndarray,
test_name: str = 'SST_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
threshold = self.thresholds[self.scene_name][test_name]
if threshold['perform'] is True:
m31 = self.data[band31].values - 273.16
bt_diff = self.data[band31].values - self.data[band32].values
sst = self.data.geos_sfct.values - 273.16
cosvza = np.cos(self.data.sensor_zenith.values*_DTR)
c = threshold['coeffs']
modsst = 273.16 + c[0] + c[1]*m31 + c[2]*bt_diff*sst + c[3]*bt_diff*((1/cosvza) - 1)
sfcdif = self.data.geos_sfct.values - modsst
print(f'Testing "{self.scene_name}"\n')
confidence[self.scene_idx] = conf.conf_test_new(sfcdif[self.scene_idx], threshold['thr'])
cmin = np.fmin(cmin, confidence)
return cmin
@run_if_test_exists_for_scene
def bt_diff_86_11um(self,
band: str,
cmin: np.ndarray,
test_name: str = '8.6-11um_Test') -> np.ndarray:

Paolo Veglio
committed
confidence = np.ones(self.data.M01.shape)
threshold = self.thresholds[self.scene_name][test_name]

Paolo Veglio
committed
if threshold['perform'] is True:
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
confidence[self.scene_idx] = conf.conf_test_new(rad, threshold['thr'])

Paolo Veglio
committed
cmin = np.fmin(cmin, confidence)
return cmin
def run_tests_temp(self):
cmin_G1 = np.ones(self.data.M01.shape)
cmin_G2 = np.ones(self.data.M01.shape)
# Group 1
cmin_G1 = self.test_11um('M15', cmin_G1, test_name='11um_Test')
cmin_G1 = self.sst_test('M15', 'M16', cmin_G1, test_name='SST_Test')
# Group 2
cmin_G2 = self.bt_diff_86_11um('M14-M15', cmin_G2, '8.6-11um_Test')
cmin = cmin_G1 * cmin_G2
return cmin
def single_threshold_test(self, test_name, band, cmin):
if band == 'bad_data':
return cmin
print(f'Running test "{test_name}" for "{self.scene_name}"')
# preproc_thresholds()
if 'thr' in self.thresholds[self.scene_name][test_name]:
thr = np.array(self.thresholds[self.scene_name][test_name]['thr'])
else:
thr = np.array(self.thresholds[self.scene_name][test_name])
thr_xr = xr.Dataset()
if test_name == '11-12um_Cirrus_Test':

Paolo Veglio
committed
thr_xr['threshold'] = pt.preproc(self.data, self.thresholds[self.scene_name], self.scene_name)
thr = np.ones((5,)) # This is only temporary to force the logic of the code
# I need to find a better solution at some point
elif test_name == 'SST_Test':
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'),
np.ones((self.data[band].shape[0], self.data[band].shape[1], 5))*thr)

Paolo Veglio
committed
elif test_name == '7.3-11um_BTD_Mid_Level_Cloud_Test':
thr_xr['threshold'] = pt.get_pn_thresholds(self.data, self.thresholds, self.scene_name,
'7.3-11um_BTD_Mid_Level_Cloud_Test')
thr = np.ones((5,))
elif test_name == 'Surface_Temperature_Test':
thr_xr['threshold'] = pt.preproc_surf_temp(self.data, self.thresholds[self.scene_name])
thr = np.ones((5,))

Paolo Veglio
committed
elif (test_name == '11-4um_Oceanic_Stratus_Test' and
self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert',
'Polar_Day_Desert_Coast']):
thr = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)])
elif test_name == 'NIR_Reflectance_Test':
corr_thr = pt.preproc_nir(self.data, self.thresholds, self.scene_name)
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), corr_thr)

Paolo Veglio
committed
elif test_name == 'Visible_Reflectance_Test':
thr_xr['threshold'], self.data['M128'] = pt.vis_refl_thresholds(self.data,
self.thresholds,
self.scene_name)

Paolo Veglio
committed
elif test_name == '1.6_2.1um_NIR_Reflectance_Test':
corr_thr = pt.nir_refl(self.data, self.thresholds, self.scene_name)
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), corr_thr)
thr = np.ones((5,))

Paolo Veglio
committed
elif test_name == '4-12um_BTD_Thin_Cirrus_Test':
thr_xr['threshold'] = pt.get_pn_thresholds(self.data, self.thresholds, self.scene_name,
'4-12um_BTD_Thin_Cirrus_Test')
thr = np.ones((5,))

Paolo Veglio
committed
elif test_name == 'GEMI_Test':
thr_xr['threshold'] = pt.GEMI_test(self.data, self.thresholds, self.scene_name)
thr = np.ones((5,))
elif (test_name == '1.38um_High_Cloud_Test' and self.scene_name in ['Ocean_Day', 'Polar_Ocean_Day']):
thr_xr['threshold'] = pt.test_1_38um_preproc(self.data, self.thresholds, self.scene_name)
thr = np.ones((5,))
else:
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'),
np.ones((self.data[band].shape[0], self.data[band].shape[1], 5))*thr)

Paolo Veglio
committed
data = xr.Dataset(self.data, coords=thr_xr)
if test_name == 'SST_Test':
data['sfcdif'] = (('number_of_lines', 'number_of_pixels'),
pt.preproc_sst(data, self.thresholds[self.scene_name][test_name]).values)
band = 'sfcdif'

Paolo Veglio
committed
if test_name == '11um_Variability_Test':
var = pt.var_11um(self.data, self.thresholds)
data['11um_var'] = data.M15
data['11um_var'].values[var != 9] = np.nan

Paolo Veglio
committed
if thr[4] == 1:
print('test running...')
confidence = conf_xr.conf_test(data, band)

Paolo Veglio
committed
cmin = np.fmin(cmin, confidence)
return cmin
def double_threshold_test(self, test_name, band, cmin):
data = self.data
if test_name == '11-4um_BT_Difference_Test':
thr = pt.bt11_4um_preproc(self.data, self.thresholds, self.scene_name)

Paolo Veglio
committed
print('test running...')

Paolo Veglio
committed
confidence = conf.conf_test_dble(data['M15-M13'].values, thr)

Paolo Veglio
committed
confidence = confidence.reshape(data.M01.shape)

Paolo Veglio
committed
if test_name == 'Vis/NIR_Ratio_Test':

Paolo Veglio
committed
print('test running...')

Paolo Veglio
committed
thr_no_sunglint = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)])
thr_sunglint = np.array([self.thresholds['Sun_Glint']['snglnt'][i] for i in range(8)])
vrat = data.M07.values/data.M05.values

Paolo Veglio
committed
_dtr = np.pi/180.0
sza = data.sensor_zenith.values
raz = data.relative_azimuth.values
vza = data.sensor_zenith.values
cos_refang = np.sin(vza*_dtr) * np.sin(sza*_dtr) * np.cos(raz*_dtr) + \
np.cos(vza*_dtr) * np.cos(sza*_dtr)
refang = np.arccos(cos_refang) * 180./np.pi
idx = np.nonzero((data.solar_zenith <= 85) & (refang <= data.sunglint_angle))
confidence = conf.conf_test_dble(vrat, thr_no_sunglint)
confidence = confidence.reshape(data.M01.shape)
confidence[idx] = conf.conf_test_dble(vrat[idx], thr_sunglint)
confidence = confidence.reshape(data.M01.shape)

Paolo Veglio
committed
if (test_name == '11-4um_Oceanic_Stratus_Test' and
self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert',
'Polar_Day_Desert_Coast']):
thr = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)])
print('test running...')

Paolo Veglio
committed
confidence = conf.conf_test_dble(data['M15-M16'].values, thr)
confidence = confidence.reshape(data.M01.shape)
cmin = np.fmin(cmin, confidence)
return cmin
class ComputeTests(CloudTests):
def __init__(self,
data: xr.Dataset,
scene_name: str,
thresholds: Dict) -> None:
super().__init__(data, scene_name, thresholds)
def run_tests(self):
cmin_G1 = np.ones(self.data.M01.shape)
cmin_G2 = np.ones(self.data.M01.shape)
# Group 1
cmin_G1 = self.test_11um('M15', cmin_G1, test_name='11um_Test')
cmin_G1 = self.sst_test('M15', 'M16', cmin_G1, test_name='SST_Test')
# Group 2
cmin_G2 = self.bt_diff_86_11um('M14-M15', cmin_G2, test_name='8.6-11um_Test')
cmin = cmin_G1 * cmin_G2
return cmin
def preproc_thresholds(thresholds, data):
thr = np.array(thresholds)
thr_xr = xr.Dataset()
thr_xr['tresholds'] = (('number_of_lines', 'number_of_pixels', 'z'),
np.ones((data['M01'].shape[0], data['M01'].shape[1], 5))*thr)
nl_sfct1 = thresholds['Land_Night']['Surface_Temperature_Test'][0]
# nl_sfct2 = thresholds['Land_Night']['Surface_Temperature_Test'][1]
# nlsfct_pfm = thresholds['Land_Night']['Surface_Temperature_Test'][2]
nl_df1 = thresholds['Land_Night']['Surface_Temperature_Test_difference'][0:2]
nl_df2 = thresholds['Land_Night']['Surface_Temperature_Test_difference'][2:]
# df1 = data.M15 - data.M16
# df2 = data.M15 - data.M13
thr_xr = thr_xr.where(data.desert != 1, nl_sfct1)
thr_xr = thr_xr.where((data['M15-M16'] > nl_df1[0]) |
((data['M15-M16'] < nl_df1[0]) &
((data['M15-M13'] <= nl_df2[0]) | (data['M15-M13'] >= nl_df2[1]))),
nl_sfct1[0])
data = xr.Dataset(data, coords=thr_xr)
return data
def single_threshold_test(test, rad, threshold):
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
thr = np.array(threshold[test])
confidence = np.zeros(rad.shape)
if thr[4] == 1:
print(f"{test} test running")
# the C code has the line below that I don't quite understand the purpose of.
# It seems to be setting the bit to 0 if the BT value is greater than the midpoint
#
# if (m31 >= dobt11[1]) (void) set_bit(13, pxout.testbits);
# confidence = utils.conf_test(rad, thr)
confidence = conf.conf_test(rad, thr)
return confidence.reshape(radshape)
def test():
rad = np.random.randint(50, size=[4, 8])
# coeffs = [5, 42, 20, 28, 15, 35, 1]
# coeffs = [20, 28, 5, 42, 15, 35, 1]
coeffs = [35, 15, 20, 1, 1]
# confidence = conf_test_dble(rad, coeffs)