-
Paolo Veglio authoredPaolo Veglio authored
spectral_tests.py 28.08 KiB
import numpy as np
import xarray as xr
# from numpy.lib.stride_tricks import sliding_window_view
from typing import Dict
import functools
# import utils
import conf
import conf_xr
import scene as scn
import preprocess_thresholds as preproc
import restoral
import importlib
_RTD = 180./np.pi
_DTR = np.pi/180
# this is used for testing, eventually we want to remove it
importlib.reload(preproc)
importlib.reload(conf)
importlib.reload(restoral)
class CloudTests(object):
def __init__(self,
data: xr.Dataset,
scene_name: str,
thresholds: Dict) -> None:
self.data = data
self.scene_name = scene_name
self.thresholds = thresholds
self.scene_idx = tuple(np.nonzero(data[scene_name] == 1))
if self.scene_idx[0].shape[0] == 0:
self.pixels_in_scene = False
else:
self.pixels_in_scene = True
def run_if_test_exists_for_scene(func):
@functools.wraps(func)
def wrapper(self, *args, test_name, **kwargs):
if test_name not in self.thresholds[self.scene_name]:
# print('Not running test for this scene')
# returns cmin. This could be changed into a keyworded argument for readability
test_bit = np.zeros(args[-1].shape)
return args[-1], test_bit
return func(self, *args, test_name, **kwargs)
return wrapper
@run_if_test_exists_for_scene
def test_11um(self,
band: str,
cmin: np.ndarray,
test_name: str = '11um_Test') -> np.ndarray:
confidence = np.ones(self.data[band].shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
idx = np.nonzero((self.data[band].values >= threshold['thr'][1]) &
(self.data[self.scene_name] == 1))
test_bit[idx] = 1
confidence[self.scene_idx] = conf.conf_test_new(rad, threshold['thr'])
cmin = np.fmin(cmin, confidence)
return cmin, test_bit
@run_if_test_exists_for_scene
def surface_temperature_test(self,
band31: str,
band32: str,
cmin: np.ndarray,
test_name: str = 'Surface_Temperature_test') -> np.ndarray:
pass
@run_if_test_exists_for_scene
def sst_test(self,
band31: str,
band32: str,
cmin: np.ndarray,
test_name: str = 'SST_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band31].shape)
test_bit = np.zeros(self.data[band31].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
m31 = self.data[band31].values - 273.16
bt_diff = self.data[band31].values - self.data[band32].values
sst = self.data.geos_sfct.values - 273.16
cosvza = np.cos(self.data.sensor_zenith.values*_DTR)
c = threshold['coeffs']
modsst = 273.16 + c[0] + c[1]*m31 + c[2]*bt_diff*sst + c[3]*bt_diff*((1.0/cosvza) - 1.0)
sfcdif = self.data.geos_sfct.values - modsst
idx = np.nonzero((sfcdif < threshold['thr'][1]) &
(self.data[self.scene_name] == 1))
test_bit[idx] = 1
print(f'Testing "{self.scene_name}"\n')
confidence[self.scene_idx] = conf.conf_test_new(sfcdif[self.scene_idx], threshold['thr'])
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def bt_diff_86_11um(self,
band: str,
cmin: np.ndarray,
test_name: str = '8.6-11um_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
idx = np.nonzero((self.data[band].values < threshold['thr'][1]) &
(self.data[self.scene_name] == 1))
test_bit[idx] = 1
confidence[self.scene_idx] = conf.conf_test_new(rad, threshold['thr'])
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def test_11_12um_diff(self,
band: str,
cmin: np.ndarray,
test_name: str = '11-12um_Cirrus_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
thr = preproc.thresholds_11_12um(self.data, threshold, self.scene_name, self.scene_idx)
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
idx = np.nonzero((rad <= thr[1, :]) &
(self.data[self.scene_name].values[self.scene_idx] == 1))
tmp_bit = test_bit[self.scene_idx]
tmp_bit[idx] = 1
test_bit[self.scene_idx] = tmp_bit
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
test_bit = test_bit.reshape(self.data[band].shape)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def oceanic_stratus_11_4um_test(self,
band: str,
cmin: np.ndarray,
test_name: str = '11-4um_Oceanic_Stratus_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
thr = threshold['thr']
if self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert',
'Polar_Day_Desert_Coast']:
confidence[self.scene_idx] = conf.conf_test_dble(rad, thr)
# these scenes have not been implemented yet
elif self.scene_name in ['Land_Night', 'Polar_Night_Land', 'Polar_Day_Snow', 'Polar_Night_Snow',
'Day_Snow', 'Night_Snow', 'Antarctic_Day']:
pass
else:
scene_flag = scn.find_scene(self.data, self.thresholds['Sun_Glint']['bounds'][3])
idx = np.nonzero((self.data[band].values >= threshold['thr'][1]) &
(self.data[self.scene_name] == 1) &
(scene_flag['sunglint'] == 0))
test_bit[idx] = 1
idx = np.nonzero((self.data[self.scene_name] == 1) &
(scene_flag['sunglint'] == 0))
confidence[idx] = conf.conf_test_new(self.data[band].values[idx], thr)
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def nir_reflectance_test(self,
band: str,
cmin: np.ndarray,
test_name: str = 'NIR_Reflectance_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
thr = preproc.thresholds_NIR(self.data, self.thresholds, self.scene_name,
test_name, self.scene_idx)
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
idx = np.nonzero((rad <= thr[1, :]) &
(self.data[self.scene_name].values[self.scene_idx] == 1))
tmp_bit = test_bit[self.scene_idx]
tmp_bit[idx] = 1
test_bit[self.scene_idx] = tmp_bit
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def vis_nir_ratio_test(self,
band: str,
cmin: np.ndarray,
test_name: str = 'Vis/NIR_Reflectance_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
# I'm not using the self.scene_idx here because it messes up with the indexing of the
# confidence array and I don't want to think of a solution at the moment. I will need
# to figure out the logic to make it work once I'm at the stage where I want to optimize
# the code
qa_bit[self.scene_idx] = 1
rad = self.data[band].values # [self.scene_idx]
solar_zenith = self.data.solar_zenith.values # [self.scene_idx]
sunglint = scn.find_scene(self.data, self.thresholds['Sun_Glint']['bounds'][3])['sunglint']
idx = np.nonzero((solar_zenith <= 85) & (sunglint == 1))
thr_no_sunglint = np.array([threshold['thr'][i] for i in range(8)])
thr_sunglint = np.array([self.thresholds['Sun_Glint']['snglnt'][i] for i in range(8)])
print(thr_no_sunglint)
print(thr_sunglint)
# tmp = self.thresholds['Sun_Glint']['snglnt']
# thr_sunglint = np.array([tmp[0], tmp[1], tmp[2], tmp[3], tmp[4], tmp[5], 1])
confidence[sunglint == 0] = conf.conf_test_dble(rad[sunglint == 0], thr_no_sunglint)
confidence[idx] = conf.conf_test_dble(rad[idx], thr_sunglint)
idx = np.nonzero(((rad < thr_no_sunglint[1]) | (rad > thr_no_sunglint[4])) &
(self.data[self.scene_name].values == 1) & (sunglint == 0))
test_bit[tuple(idx)] = 1
idx = np.nonzero(((rad < thr_sunglint[1]) | (rad > thr_sunglint[4])) &
((self.data[self.scene_name].values == 1) &
(solar_zenith <= 85) & (sunglint == 1)))
test_bit[tuple(idx)] = 1
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def test_16_21um_Reflectance(self,
band: str,
cmin: np.ndarray,
test_name: str = '1.6_2.1um_NIR_Reflectance_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
thr = preproc.thresholds_NIR(self.data, self.thresholds, self.scene_name,
test_name, self.scene_idx)
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
idx = np.nonzero((rad <= thr[1, :]) &
(self.data[self.scene_name].values[self.scene_idx] == 1))
tmp_bit = test_bit[self.scene_idx]
tmp_bit[idx] = 1
test_bit[self.scene_idx] = tmp_bit
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def visible_reflectance_test(self,
band: str,
cmin: np.ndarray,
test_name: str = 'Visible_Reflectance_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
print(f'Testing "{self.scene_name}"\n')
thr, rad = preproc.vis_refl_thresholds(self.data, self.thresholds, self.scene_name,
self.scene_idx)
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
return cmin, test_bit
@run_if_test_exists_for_scene
def gemi_test(self,
band: str,
cmin: np.ndarray,
test_name: str = 'GEMI_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
thr = preproc.gemi_thresholds(self.data, threshold, self.scene_name,
self.scene_idx)
rad = self.data[band].values[self.scene_idx]
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
return cmin, test_bit
@run_if_test_exists_for_scene
def test_1_38um_high_clouds(self,
band: str,
cmin: np.ndarray,
test_name: str = '1.38um_High_Cloud_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
qa_bit = np.zeros(self.data[band].shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
qa_bit[self.scene_idx] = 1
if self.scene_name in ['Ocean_Day', 'Polar_Day_Ocean']:
thr = preproc.thresholds_1_38um_test(self.data, self.thresholds, self.scene_name,
self.scene_idx)
else:
return cmin, test_bit
thr = threshold['thr']
print(f'Testing "{self.scene_name}"\n')
rad = self.data[band].values[self.scene_idx]
idx = np.nonzero((rad <= thr[1, :]) &
(self.data[self.scene_name].values[self.scene_idx] == 1))
tmp_bit = test_bit[self.scene_idx]
tmp_bit[idx] = 1
test_bit[self.scene_idx] = tmp_bit
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
return cmin, np.abs(1-test_bit)*qa_bit
@run_if_test_exists_for_scene
def thin_cirrus_4_12um_BTD_test(self,
band: str,
cmin: np.ndarray,
test_name: str = '4-12um_BTD_Thin_Cirrus_Test') -> np.ndarray:
confidence = np.ones(self.data.M01.shape)
test_bit = np.zeros(self.data[band].shape)
threshold = self.thresholds[self.scene_name][test_name]
if (threshold['perform'] is True and self.pixels_in_scene is True):
thr = preproc.polar_night_thresholds(self.data, self.thresholds, self.scene_name,
test_name, self.scene_idx)
rad = self.data[band].values[self.scene_idx]
confidence[self.scene_idx] = conf.conf_test_new(rad, thr)
cmin = np.fmin(cmin, confidence)
return cmin, test_bit
def single_threshold_test(self, test_name, band, cmin):
if band == 'bad_data':
return cmin
print(f'Running test "{test_name}" for "{self.scene_name}"')
# preproc_thresholds()
if 'thr' in self.thresholds[self.scene_name][test_name]:
thr = np.array(self.thresholds[self.scene_name][test_name]['thr'])
else:
thr = np.array(self.thresholds[self.scene_name][test_name])
thr_xr = xr.Dataset()
if test_name == '11-12um_Cirrus_Test':
thr_xr['threshold'] = pt.preproc(self.data, self.thresholds[self.scene_name], self.scene_name)
thr = np.ones((5,)) # This is only temporary to force the logic of the code
# I need to find a better solution at some point
elif test_name == 'SST_Test':
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'),
np.ones((self.data[band].shape[0], self.data[band].shape[1], 5))*thr)
elif test_name == '7.3-11um_BTD_Mid_Level_Cloud_Test':
thr_xr['threshold'] = pt.get_pn_thresholds(self.data, self.thresholds, self.scene_name,
'7.3-11um_BTD_Mid_Level_Cloud_Test')
thr = np.ones((5,))
elif test_name == 'Surface_Temperature_Test':
thr_xr['threshold'] = pt.preproc_surf_temp(self.data, self.thresholds[self.scene_name])
thr = np.ones((5,))
elif (test_name == '11-4um_Oceanic_Stratus_Test' and
self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert',
'Polar_Day_Desert_Coast']):
thr = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)])
elif test_name == 'NIR_Reflectance_Test':
corr_thr = pt.preproc_nir(self.data, self.thresholds, self.scene_name)
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), corr_thr)
elif test_name == 'Visible_Reflectance_Test':
thr_xr['threshold'], self.data['M128'] = pt.vis_refl_thresholds(self.data,
self.thresholds,
self.scene_name)
elif test_name == '1.6_2.1um_NIR_Reflectance_Test':
corr_thr = pt.nir_refl(self.data, self.thresholds, self.scene_name)
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'), corr_thr)
thr = np.ones((5,))
elif test_name == '4-12um_BTD_Thin_Cirrus_Test':
thr_xr['threshold'] = pt.get_pn_thresholds(self.data, self.thresholds, self.scene_name,
'4-12um_BTD_Thin_Cirrus_Test')
thr = np.ones((5,))
elif test_name == 'GEMI_Test':
thr_xr['threshold'] = pt.GEMI_test(self.data, self.thresholds, self.scene_name)
thr = np.ones((5,))
elif (test_name == '1.38um_High_Cloud_Test' and self.scene_name in ['Ocean_Day', 'Polar_Ocean_Day']):
thr_xr['threshold'] = pt.test_1_38um_preproc(self.data, self.thresholds, self.scene_name)
thr = np.ones((5,))
else:
thr_xr['threshold'] = (('number_of_lines', 'number_of_pixels', 'z'),
np.ones((self.data[band].shape[0], self.data[band].shape[1], 5))*thr)
data = xr.Dataset(self.data, coords=thr_xr)
if test_name == 'SST_Test':
data['sfcdif'] = (('number_of_lines', 'number_of_pixels'),
pt.preproc_sst(data, self.thresholds[self.scene_name][test_name]).values)
band = 'sfcdif'
if test_name == '11um_Variability_Test':
var = pt.var_11um(self.data, self.thresholds)
data['11um_var'] = data.M15
data['11um_var'].values[var != 9] = np.nan
if thr[4] == 1:
print('test running...')
confidence = conf_xr.conf_test(data, band)
cmin = np.fmin(cmin, confidence)
return cmin
def double_threshold_test(self, test_name, band, cmin):
data = self.data
if test_name == '11-4um_BT_Difference_Test':
thr = pt.bt11_4um_preproc(self.data, self.thresholds, self.scene_name)
print('test running...')
confidence = conf.conf_test_dble(data['M15-M13'].values, thr)
confidence = confidence.reshape(data.M01.shape)
if test_name == 'Vis/NIR_Ratio_Test':
print('test running...')
thr_no_sunglint = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)])
thr_sunglint = np.array([self.thresholds['Sun_Glint']['snglnt'][i] for i in range(8)])
vrat = data.M07.values/data.M05.values
_dtr = np.pi/180.0
sza = data.sensor_zenith.values
raz = data.relative_azimuth.values
vza = data.sensor_zenith.values
cos_refang = np.sin(vza*_dtr) * np.sin(sza*_dtr) * np.cos(raz*_dtr) + \
np.cos(vza*_dtr) * np.cos(sza*_dtr)
refang = np.arccos(cos_refang) * 180./np.pi
idx = np.nonzero((data.solar_zenith <= 85) & (refang <= data.sunglint_angle))
confidence = conf.conf_test_dble(vrat, thr_no_sunglint)
confidence = confidence.reshape(data.M01.shape)
confidence[idx] = conf.conf_test_dble(vrat[idx], thr_sunglint)
confidence = confidence.reshape(data.M01.shape)
if (test_name == '11-4um_Oceanic_Stratus_Test' and
self.scene_name in ['Land_Day_Desert', 'Land_Day_Desert_Coast', 'Polar_Day_Desert',
'Polar_Day_Desert_Coast']):
thr = np.array([self.thresholds[self.scene_name][test_name][i] for i in range(8)])
print('test running...')
confidence = conf.conf_test_dble(data['M15-M16'].values, thr)
confidence = confidence.reshape(data.M01.shape)
cmin = np.fmin(cmin, confidence)
return cmin
class ComputeTests(CloudTests):
def __init__(self,
data: xr.Dataset,
scene_name: str,
thresholds: Dict) -> None:
super().__init__(data, scene_name, thresholds)
def run_tests(self):
cmin_G1 = np.ones(self.data.M01.shape)
cmin_G2 = np.ones(self.data.M01.shape)
cmin_G3 = np.ones(self.data.M01.shape)
cmin_G4 = np.ones(self.data.M01.shape)
cmin_G5 = np.ones(self.data.M01.shape)
# Group 1
cmin_G1 = self.test_11um('M15', cmin_G1, test_name='11um_Test')
cmin_G1 = self.sst_test('M15', 'M16', cmin_G1, test_name='SST_Test')
# Group 2
cmin_G2 = self.bt_diff_86_11um('M14-M15', cmin_G2, test_name='8.6-11um_Test')
cmin_G2 = self.test_11_12um_diff('M15-M16', cmin_G2, test_name='11-12um_Cirrus_Test')
cmin_G2 = self.oceanic_stratus_11_4um_test('M15-M13', cmin_G2,
test_name='11-4um_Oceanic_Stratus_Test')
# Group 3
cmin_G3 = self.nir_reflectance_test('M07', cmin_G3, test_name='NIR_Reflectance_Test')
cmin_G3 = self.vis_nir_ratio_test('M07-M05ratio', cmin_G3, test_name='Vis/NIR_Ratio_Test')
cmin_G3 = self.nir_reflectance_test('M10', cmin_G3, test_name='1.6_2.1um_NIR_Reflectance_Test')
cmin_G3 = self.visible_reflectance_test('M128', cmin_G3, test_name='Visible_Reflectance_Test')
cmin_G3 = self.gemi_test('GEMI', cmin_G3, test_name='GEMI_Test')
# Group 4
cmin_G4 = self.test_1_38um_high_clouds('M09', cmin_G4, test_name='1.38um_High_Cloud_Test')
# Group 5
cmin_G5 = self.thin_cirrus_4_12um_BTD_test('M13-M16', cmin_G5,
test_name='4-12um_BTD_Thin_Cirrus_Test')
cmin = cmin_G1 * cmin_G2 * cmin_G3 * cmin_G4 * cmin_G5
return cmin
def clear_sky_restoral(self,
cmin: np.ndarray) -> np.ndarray:
total_bit = np.full(self.data.M01.shape, 3)
sunglint_angle = self.thresholds['Sun_Glint']['bounds'][3]
scene_flags = scn.find_scene(self.data, sunglint_angle)
idx = np.nonzero((scene_flags['water'] == 1) & (scene_flags['ice'] == 0) &
(scene_flags['uniform'] == 1) & (cmin <= 0.99) & (cmin >= 0.05))
cmin[idx] = restoral.spatial(self.data, self.thresholds['Sun_Glint'], scene_flags, cmin)[idx]
idx = np.nonzero((scene_flags['water'] == 1) & (scene_flags['sunglint'] == 1) &
(scene_flags['uniform'] == 1) & (cmin <= 0.95))
cmin[idx] = restoral.sunglint(self.data, self.thresholds['Sun_Glint'], total_bit, cmin)[idx]
"""
idx = np.nonzero((scene_flags['day'] == 1) & (scene_flags['land'] == 1) &
(scene_flags['snow'] == 0) & (scene_flags['ice'] == 0) &
(cmin <= 0.95))
cmin[idx] = restoral.land(self.data, self.thresholds, scene_flags, cmin)[idx]
idx = np.nonzero((scene_flags['day'] == 1) & (scene_flags['land'] == 1) &
(scene_flags['coast'] == 1) & (scene_flags['snow'] == 0) &
(scene_flags['ice'] == 0))
cmin[idx] = restoral.coast(self.data, self.thresholds, scene_flags, cmin)[idx]
"""
return cmin
def preproc_thresholds(thresholds, data):
thr = np.array(thresholds)
thr_xr = xr.Dataset()
thr_xr['tresholds'] = (('number_of_lines', 'number_of_pixels', 'z'),
np.ones((data['M01'].shape[0], data['M01'].shape[1], 5))*thr)
nl_sfct1 = thresholds['Land_Night']['Surface_Temperature_Test'][0]
# nl_sfct2 = thresholds['Land_Night']['Surface_Temperature_Test'][1]
# nlsfct_pfm = thresholds['Land_Night']['Surface_Temperature_Test'][2]
nl_df1 = thresholds['Land_Night']['Surface_Temperature_Test_difference'][0:2]
nl_df2 = thresholds['Land_Night']['Surface_Temperature_Test_difference'][2:]
# df1 = data.M15 - data.M16
# df2 = data.M15 - data.M13
thr_xr = thr_xr.where(data.desert != 1, nl_sfct1)
thr_xr = thr_xr.where((data['M15-M16'] > nl_df1[0]) |
((data['M15-M16'] < nl_df1[0]) &
((data['M15-M13'] <= nl_df2[0]) | (data['M15-M13'] >= nl_df2[1]))),
nl_sfct1[0])
data = xr.Dataset(data, coords=thr_xr)
return data
def single_threshold_test(test, rad, threshold):
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
thr = np.array(threshold[test])
confidence = np.zeros(rad.shape)
if thr[4] == 1:
print(f"{test} test running")
# the C code has the line below that I don't quite understand the purpose of.
# It seems to be setting the bit to 0 if the BT value is greater than the midpoint
#
# if (m31 >= dobt11[1]) (void) set_bit(13, pxout.testbits);
# confidence = utils.conf_test(rad, thr)
confidence = conf.conf_test(rad, thr)
return confidence.reshape(radshape)
def test():
rad = np.random.randint(50, size=[4, 8])
# coeffs = [5, 42, 20, 28, 15, 35, 1]
# coeffs = [20, 28, 5, 42, 15, 35, 1]
coeffs = [35, 15, 20, 1, 1]
# confidence = conf_test_dble(rad, coeffs)
confidence = test_11um(rad, coeffs)
print(rad)
print('\n')
print(confidence)
if __name__ == "__main__":
test()