Skip to content
Snippets Groups Projects
test.py 4.33 KiB
Newer Older
""" Routines for doing nearest neighbor interpolation of results of sounder collocation."""
from pyhdf.SD import SD, SDC
import numpy as np
from iff_avhrr_nearest_neighbor import do_interpolation, get_noscale

figdir = './figs/'

def get_band_names(sd):
    raw = sd.select('BrightnessTemperatureBands').attributes()['band_names']
    return raw.split(', ')


def get_avhrr_ch_index(sd, band_number):
    return get_band_names(sd).index(str(band_number))


def get_viirs_ch_index(sd, band_number):
    return get_band_names(sd).index('SVM' + str(band_number))


def get_hirs_ch_index(sd, band_number):
    # hirs channel list https://nwpsaf.eu/deliverables/aapp/hirs_3.html
    return get_band_names(sd).index('HIRS-' + str(band_number))


def get_cris_ch_index(sd, band_number):
    # note this is actually a MODIS band number
    return get_band_names(sd).index('CrIS-' + str(band_number))


# For AVHRR GAC, scaling is pretty simple, see: http://oiswww.eumetsat.org/webops/html/eps-pg/AVHRR/AVHRR-PG-4ProdOverview.htm
# TODO: validate xtrack_scale and alongtrack_scale (using e.g. actual lat/lon values)
def save_results(fancy_result, dumb_result, bt_bands, lat, lon, line_start, line_end, sndr_band, sndr_mask):
    """ Make some plots to test the results of interpolating. """
    lines = slice(line_start, line_end)  # the subset of lines we want.
    band = imgr_idx_fn(iff, imgr_ch)
    bt_bands[band, lines, :].dump(figdir + 'imgr11.npdump')
    band = sndr_band
    sndr_raw = bt_bands[band, lines, :].copy()
    sndr_raw[sndr_mask[lines, :] == 0] = np.nan
    sndr_raw.dump(figdir + 'sndr_raw.npdump')
    fancy_result[lines, :].dump(figdir + 'sndr_fancy_interp.npdump')
    dumb_result[lines, :].dump(figdir + 'sndr_dumb_interp.npdump')
    lat[lines, :].dump(figdir + 'lat.npdump')
    lon[lines, :].dump(figdir + 'lon.npdump')


if __name__ == "__main__":
    import cProfile
    # viirs:
    #  filename = './data/IFF_npp_viirs_svm_d20130329_t065328_e065528_c20150909001250.hdf'
    #  sndr_idx_fn = get_cris_ch_index
    #  imgr_idx_fn = get_viirs_ch_index
    #  sndr_ch = 31
    #  imgr_ch = 15
    #  imgr_type = 'viirs'
    # avhrr:
    filename = './data/IFF_noaa19_avhrr_nss_d20150321_t171500_e190000_c20151001030031.hdf'
    sndr_idx_fn = get_hirs_ch_index
    imgr_idx_fn = get_avhrr_ch_index
    sndr_ch = 8
    imgr_ch = 4
    imgr_type = 'avhrr'

    iff = SD('./'+filename, SDC.READ)
    btbands = get_noscale(iff, 'BrightnessTemperatureBands')
    sndr_line_ind = get_noscale(iff, 'SounderY').astype(np.int32)
    sndr_ele_ind = get_noscale(iff, 'SounderX').astype(np.int32)
    sndr_fov = get_noscale(iff, 'SounderFOV').astype(np.int32)
    lat = get_noscale(iff, 'Latitude')
    lon = get_noscale(iff, 'Longitude')
    try:
        # this will fail for viirs
        sndr_mask = get_noscale(iff, 'HirsMask').astype(np.uint32)
    except:
        sndr_mask = np.zeros(sndr_line_ind.shape).astype(np.uint32)
        sndr_mask[sndr_line_ind >= 0] = 1
    line_start = 0
    line_end = sndr_mask.shape[0]
    #  line_start = 2500
    #  line_end = 2900
    #cProfile.run('result = do_interpolation(sndr_mask, sndr_line_ind, sndr_ele_ind, line_start, line_end)', sort='time')

    sndr_band_save = sndr_idx_fn(iff, sndr_ch)
    for fancy in [True, False]:
        result = do_interpolation(
                     sndr_mask, sndr_line_ind, sndr_ele_ind, sndr_fov, 
                     line_start, line_end,
                     imgr11=btbands[imgr_idx_fn(iff, imgr_ch)].astype(np.float32),
                     sndr11=btbands[sndr_idx_fn(iff, sndr_ch)].astype(np.float32),
                     fancy_interp=fancy,
                     imgr_type=imgr_type)
        (interp_i, interp_j, sndr_i, sndr_j) = result
        print('writing interpolated array')
        if fancy:
            fancy_interpolated = btbands[sndr_band_save].copy()
            fancy_interpolated[interp_i, interp_j] = (btbands[sndr_band_save])[sndr_i, sndr_j]
        else:
            dumb_interpolated = btbands[sndr_band_save].copy()
            dumb_interpolated[interp_i, interp_j] = (btbands[sndr_band_save])[sndr_i, sndr_j]

    print('saving results')
    save_results(np.ma.masked_less(fancy_interpolated, 0), 
                  np.ma.masked_less(dumb_interpolated, 0),
                  np.ma.masked_invalid(btbands), 
                  lat, lon,
                  line_start, line_end, sndr_band_save, sndr_mask)
    iff.end()