-
Paolo Veglio authored
first working version of the hires-mvcm. It only works for ocean day. Results shown at AMS 2022 in poster session
Paolo Veglio authoredfirst working version of the hires-mvcm. It only works for ocean day. Results shown at AMS 2022 in poster session
ocean_day_tests.py 10.67 KiB
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
import utils
import conf
import ancillary_data as c_tools
# ############## GROUP 1 TESTS ############## #
# 11 micron brightness temperature threshold test
def simple_test(rad, threshold, cmin):
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
thr = np.array(threshold)
confidence = np.ones(rad.shape)
bit = np.zeros(rad.shape)
if thr[4] == 1:
print("simple test running")
# the C code has the line below that I don't quite understand the purpose of.
# It seems to be setting the bit to 0 if the BT value is greater than the midpoint
#
# if (m31 >= dobt11[1]) (void) set_bit(13, pxout.testbits);
# confidence = utils.conf_test(rad, thr)
confidence = conf.conf_test(rad, thr)
bit[rad >= thr[1]] = 1
return np.minimum(cmin, confidence.reshape(radshape)), confidence.reshape(radshape), bit.reshape(radshape)
def sst_test(rad1, rad2, vza, surf_temp, threshold, cmin):
a1 = 1.8860
a2 = 0.9380
a3 = 0.1280
a4 = 1.0940
radshape = rad1.shape
b31 = rad1.reshape(np.prod(radshape)) - 273.16
b32 = rad2.reshape(np.prod(radshape)) - 273.16
thr = np.array(threshold)
confidence = np.ones(b31.shape)
bit = np.zeros(b31.shape)
rad_diff = b31 - b32
sstc = surf_temp.reshape(np.prod(radshape)) - 273.16
mu = np.cos(vza.reshape(np.prod(radshape)) * np.pi/180.0)
modsst = 273.16 + a1 + a2*b31 + a3*rad_diff*sstc + a4*rad_diff*((1/mu)-1)
sfcdif = surf_temp.reshape(np.prod(radshape)) - modsst
if thr[4] == 1:
print('SST test running')
confidence = conf.conf_test(sfcdif, thr)
bit[sfcdif < thr[1]] = 1
return np.minimum(cmin, confidence.reshape(radshape)), confidence.reshape(radshape), bit.reshape(radshape)
def test_11_12_diff(data, threshold, cmin):
radshape = data.M15.shape
b31 = data.M15.values.reshape(np.prod(radshape))
b32 = data.M15.values.reshape(np.prod(radshape))
vza = data.sensor_zenith.values.reshape(np.prod(radshape))
thr = np.array(threshold)
confidence = np.ones(b31.shape)
bit = np.zeros(b31.shape)
rad_diff = b31 - b32
# Get secant of viewing zenith angle
dtr = np.pi/180
cosvza = np.cos(vza * dtr)
schi = np.full(cosvza.shape, 99.0)
schi[cosvza > 0.0] = 1.0/cosvza[cosvza > 0.0]
# Need to define this in cython
btd_thr = c_tools.py_cithr(1, np.array(schi, dtype=np.float32), np.array(b31, dtype=np.float32))
idx = np.nonzero((btd_thr < 0.1) | (np.abs(schi-99.0) < 0.0001))
btd_thr[idx] = thr[0]
locut = btd_thr + 0.3*btd_thr
hicut = btd_thr - 1.25
corr_thr = np.array([locut, btd_thr, hicut, np.ones(locut.shape)], dtype=np.float)
if thr[1] == 1:
print('11-12um diff test running')
bit[rad_diff < thr[1]] = 1
confidence = conf.conf_test(rad_diff, corr_thr)
return np.minimum(cmin, confidence.reshape(radshape)), confidence.reshape(radshape), bit.reshape(radshape)
def test_11_4_diff(rad1, rad2, threshold, scene_flags, cmin):
radshape = rad1.shape
b31 = rad1.reshape(np.prod(radshape))
b20 = rad2.reshape(np.prod(radshape))
thr = np.array(threshold)
sunglint = scene_flags['sunglint'].reshape(np.prod(radshape))
confidence = np.ones(b31.shape)
if thr[4] == 1:
print('11-4um diff test running')
confidence[sunglint == 0] = conf.conf_test((b31-b20)[sunglint == 0], thr)
return np.minimum(cmin, confidence.reshape(radshape)), confidence.reshape(radshape)
def vis_nir_ratio_test(rad1, rad2, threshold, scene, cmin):
if threshold['Daytime_Ocean']['vis_nir_ratio'][6] == 1:
print("NIR-Visible ratio test running")
radshape = rad1.shape
rad1 = rad1.reshape(np.prod(radshape))
rad2 = rad2.reshape(np.prod(radshape))
sunglint = scene['sunglint'].reshape(np.prod(radshape))
vrat = rad2/rad1
confidence = np.ones(rad1.shape)
tmp = threshold['Daytime_Ocean']['vis_nir_ratio']
thr_no_sunglint = np.array([tmp[0], tmp[1], tmp[2], tmp[3], tmp[4], tmp[5], 1, 1])
tmp = threshold['Sun_Glint']['snglnt']
thr_sunglint = np.array([tmp[0], tmp[1], tmp[2], tmp[3], tmp[4], tmp[5], 1])
# thr_no_sunglint = np.array(threshold['Daytime_Ocean']['vis_nir_ratio'])
# thr_sunglint = np.array(threshold['Sun_Glint']['snglnt'])
# thr_sunglint = np.append(thr_sunglint, 1)
# temp value to avoid linter bitching at me
# eventually we would have the test run in two blocks as:
# confidence[sunglint == 1] = conf.conf_test_dble(vrat[sunglint == 1], sg_threshold['snglnt'])
# confidence[sunglint == 0] = conf.conf_test_dble(vrat[sunglint == 0], threshold['vis_nir_ratio'])
# sunglint needs to be defined somewhere
# thr = np.full((rad.shape[0], 4), thr[:4]).T
# thresh = np.full((rad1.shape[0], thr_no_sunglint.shape[0]), thr_no_sunglint)
# thresh[sunglint == 1, :6] = thr_sunglint
confidence[sunglint == 0] = conf.conf_test_dble(vrat, thr_no_sunglint)[sunglint == 0]
confidence[sunglint == 1] = conf.conf_test_dble(vrat, thr_sunglint)[sunglint == 1]
# confidence = conf.conf_test_dble(vrat, thresh.T)
return np.minimum(cmin, confidence.reshape(radshape)), confidence.reshape(radshape)
def nir_refl_test(rad, threshold, sunglint_thresholds, viirs_data, cmin):
print("NIR reflectance test running")
sza = viirs_data.solar_zenith.values
refang = viirs_data.sunglint_angle.values
vza = viirs_data.sensor_zenith.values
dtr = np.pi/180
# Keep in mind that band_n uses MODIS band numbers (i.e. 2=0.86um and 7=2.1um)
# For VIIRS would be 2=M07 (0.865um) and 7=M11 (2.25um)
band_n = 2
vzcpow = 0.75 # THIS NEEDS TO BE READ FROM THE THRESHOLDS FILE
radshape = rad.shape
rad = rad.reshape(np.prod(radshape))
confidence = np.ones(rad.shape)
sza = sza.reshape(rad.shape)
vza = vza.reshape(rad.shape)
refang = refang.reshape(rad.shape)
sunglint_flag = utils.sunglint_scene(refang, sunglint_thresholds).reshape(rad.shape)
# ref2 [5]
# b2coeffs [4]
# b2mid [1]
# b2bias_adj [1]
# b2lo [1]
# vzcpow [3] (in different place)
cosvza = np.cos(vza*dtr)
coeffs = threshold['b2coeffs']
hicut0 = np.array(coeffs[0] + coeffs[1]*sza + coeffs[2]*np.power(sza, 2) + coeffs[3]*np.power(sza, 3))
hicut0 = (hicut0 * 0.01) + threshold['b2adj']
hicut0 = hicut0 * threshold['b2bias_adj']
midpt0 = hicut0 + (threshold['b2mid'] * threshold['b2bias_adj'])
locut0 = midpt0 + (threshold['b2lo'] * threshold['b2bias_adj'])
thr = np.array([locut0, midpt0, hicut0, threshold['ref2'][3]*np.ones(rad.shape)])
corr_thr = np.zeros((4, rad.shape[0]))
corr_thr[:3, sunglint_flag == 0] = thr[:3, sunglint_flag == 0] * (1./np.power(cosvza[sunglint_flag == 0], vzcpow))
corr_thr[3, sunglint_flag == 0] = thr[3, sunglint_flag == 0]
# corr_thr[:3, :] = thr[:3, :] * (1./np.power(cosvza[:], vzcpow))
# corr_thr[3, :] = thr[3, :]
for flag in range(1, 4):
if len(refang[sunglint_flag == flag]) > 0:
sunglint_thr = utils.get_sunglint_thresholds(refang, sunglint_thresholds, band_n, flag, thr)
corr_thr[:3, sunglint_flag == flag] = sunglint_thr[:3, sunglint_flag == flag] * (1./np.power(cosvza[sunglint_flag == flag], vzcpow))
corr_thr[3, sunglint_flag == flag] = sunglint_thr[3, sunglint_flag == flag]
confidence = conf.conf_test(rad, corr_thr)
return np.minimum(cmin, confidence.reshape(radshape)), confidence.reshape(radshape)
def nir_high_cloud_test():
pass
def test_11um_var(rad, threshold, var_threshold):
print("11um variability test running")
thr = np.array(threshold['11um_var'])
radshape = rad.shape
var = np.zeros((radshape[0], radshape[1], 9))
# chk_spatial2() need to figure out what this is
# np = rg_var.num_small_diffs * 1.0
test = sliding_window_view(np.pad(rad, [1, 1], mode='constant'), (3, 3)) - np.expand_dims(rad, (2, 3))
var[np.abs(test).reshape(radshape[0], radshape[1], 9) < var_threshold['dovar11']] = 1
var = var.sum(axis=2).reshape(np.prod(radshape))
rad = rad.reshape(np.prod(radshape))
confidence = np.zeros(rad.shape)
confidence[var == 9] = conf.conf_test(rad[var == 9], thr)
return confidence.reshape(radshape)
def test_11_4diff(rad1, rad2, threshold, viirs_data, sg_thresh):
print("11um - 4um difference test running")
radshape = rad1.shape
raddiff = (rad1 - rad2).reshape(np.prod(radshape))
day = np.zeros(radshape)
day[viirs_data.solar_zenith <= 85] = 1
day = day.reshape(raddiff.shape)
sunglint = np.zeros(rad1.shape)
sunglint[viirs_data.sunglint_angle <= sg_thresh] = 1
sunglint = sunglint.reshape(raddiff.shape)
thr = np.array(threshold['test11_4lo'])
confidence = np.zeros(raddiff.shape)
# confidence[(day == 1) & (sunglint == 0)] = utils.conf_test(raddiff[(day == 1) & (sunglint == 0)], thr)
confidence[(day == 1) & (sunglint == 0)] = conf.conf_test(raddiff[(day == 1) & (sunglint == 0)], thr)
return confidence.reshape(radshape)
def vir_refl_test(rad, threshold, viirs_data):
print('Visible reflectance test running')
thr = threshold['vis_refl_test']
radshape = rad.shape()
rad = rad.reshape(np.prod(radshape))
confidence = np.zeros(radshape)
vzcpow = 0.75 # THIS NEEDS TO BE READ FROM THE THRESHOLDS FILE
vza = viirs_data.sensor_zenith.values
dtr = np.pi/180
cosvza = np.cos(vza*dtr)
coeffs = utils.get_b1_thresholds()
coeffs[:, :3] = coeffs[:, :3] * threshold['b1_bias_adj']
# this quantity is the return of get_b1_thresholds() in the C code
# it's defined here to keep a consistent logic with the original source, for now
irtn = 0
if irtn != 0:
coeffs = thr
coeffs[:, :3] = coeffs[:, :3] * 1/np.power(cosvza, vzcpow)
confidence = conf.conf_test(rad, coeffs)
return confidence.reshape(radshape)
class CloudMaskTests(object):
def __init__(self, scene, radiance, coefficients):
self.scene = scene
self.coefficients = coefficients
def select_coefficients(self):
pass
def test_G1(self):
pass
def test_G2(self):
pass
def test_G3(self):
pass
def test_G4(self):
pass
def overall_confidence(self):
pass
def test():
rad = np.random.randint(50, size=[4, 8])
# coeffs = [5, 42, 20, 28, 15, 35, 1]
# coeffs = [20, 28, 5, 42, 15, 35, 1]
coeffs = [35, 15, 20, 1, 1]
# confidence = conf_test_dble(rad, coeffs)
confidence = test_11um(rad, coeffs)
print(rad)
print('\n')
print(confidence)
if __name__ == "__main__":
test()