caliop_clavrx_amv.py 16.35 KiB
import os
import h5py
import numpy as np
import xarray as xr
from netCDF4 import Dataset
from aeolus.datasource import get_datasource, CLAVRx_CALIPSO, get_parameters_caliop_clavrx
from util.util import haversine_np
from amv.intercompare import *
from util.util import pressure_to_altitude
from util.gfs_reader import *
amv_file_duration = 60 # minutes
half_width = 30 # search box centered on CALIOP profile (FGF coordinates)
num_elems = 5424
num_lines = 5424
amv_fmwk_exmpl_file = '/ships19/cloud/scratch/4TH_AMV_INTERCOMPARISON/FMWK2_AMV/GOES16_ABI_2KM_FD_2019293_0020_34_WINDS_AMV_EN-14CT.nc'
amv_ops_exmpl_file = '/ships19/cloud/scratch/AMV_FAST_BIAS/OPS_AMVS/OR_ABI-L2-DMWF-M6C14_G16_s20201050000166_e20201050009474_c20201050023226.nc'
#caliop_clvrx_exmpl_file = '/data/Personal/rink/clavrx_calipso/g16_s20201050200_06kmCLay.matchup.calipso.h5'
#caliop_clvrx_exmpl_file = '/data/Personal/rink/4TH_AMV_INTERCOMPARISON/clavrx_calipso/g16_s20192930150_06kmCLay.matchup.calipso.h5'
caliop_clvrx_exmpl_file = '/data/Personal/rink/AMS_2022/caliop_clavrx/g16_s20192931130_06kmCLay.matchup.calipso.h5'
def get_search_box(cc, ll):
c_rng = [cc - half_width, cc + half_width]
l_rng = [ll - half_width, ll + half_width]
if c_rng[0] < 0:
c_rng[0] = 0
if l_rng[0] < 0:
l_rng[0] = 0
if c_rng[1] >= num_elems:
c_rng[1] = num_elems - 1
if l_rng[1] >= num_lines:
l_rng[1] = num_lines - 1
return c_rng, l_rng
def match_calipso_clavrx_to_amvs(nom_time, calipso_clavrx_nc4, amv_files, calipso_clavrx_params, gfs_ds):
nav = amv_files.get_navigation()
amv_params = amv_files.get_parameters()
all_params = [amv_files.lon_name, amv_files.lat_name, amv_files.elem_name, amv_files.line_name] + amv_params
match_dict = {}
coords = {'num_params': all_params}
dims = ['num_params', 'num_amvs']
amv_fname, ftime, f_idx = amv_files.get_file(nom_time)
if f_idx is None:
return None
print(amv_fname)
gfs_fname, _, _ = gfs_ds.get_file(nom_time, window=180.0)
if gfs_fname is None:
return None
print(gfs_fname)
gfs_xr = xr.open_dataset(gfs_fname)
gfs_press = gfs_xr['pressure levels']
gfs_press = gfs_press.values
match_dict[nom_time] = []
amvs_nd = get_amv_nd(amv_fname, delimiter=',')
amvs_nd = filter_amvs(amvs_nd)
amv_lons = amvs_nd[:, amv_lon_idx]
amv_lats = amvs_nd[:, amv_lat_idx]
amv_cc, amv_ll = nav.earth_to_lc_s(amv_lons, amv_lats)
param_s = []
param_s.append(amv_lons)
param_s.append(amv_lats)
param_s.append(amv_cc)
param_s.append(amv_ll)
param_s.append(amvs_nd[:, amv_pres_idx])
param_s.append(amvs_nd[:, amv_spd_idx])
param_s.append(amvs_nd[:, amv_dir_idx])
coords_a = {'num_calipso_clavrx_params': calipso_clavrx_params}
dims_a = ['num_calipso_clavrx_params']
calipso_clavrx_data = []
for pname in calipso_clavrx_params:
calipso_clavrx_data.append(calipso_clavrx_nc4[pname][:])
lat_s = calipso_clavrx_nc4['closest_calipso_latitude'][:]
lat_s = lat_s[np.logical_and(lat_s > -90, lat_s < 90)]
lon_s = calipso_clavrx_nc4['closest_calipso_longitude'][:]
lon_s = lon_s[np.logical_and(lon_s > -180, lon_s < 180)]
if calipso_clavrx_nc4.variables.get('x') is None:
xs, ys = nav.earth_to_lc_s(lon_s, lat_s)
else:
xs = calipso_clavrx_nc4['x'][:]
ys = calipso_clavrx_nc4['y'][:]
clvr_xy_s = zip(xs, ys)
for idx, clvr_xy in enumerate(clvr_xy_s):
lon = lon_s[idx]
lat = lat_s[idx]
cc, ll = clvr_xy
c_rng, l_rng = get_search_box(cc, ll)
in_cc = np.logical_and(amv_cc > c_rng[0], amv_cc < c_rng[1])
in_ll = np.logical_and(amv_ll > l_rng[0], amv_ll < l_rng[1])
in_box = np.logical_and(in_cc, in_ll)
num_amvs = np.sum(in_box)
if num_amvs == 0:
continue
dist = haversine_np(lon, lat, amv_lons[in_box], amv_lats[in_box])
s_idxs = np.argsort(dist)
param_nd = np.vstack(param_s)
param_nd = param_nd[:, in_box]
param_nd = param_nd[:, s_idxs]
dist_to_amvs = dist[s_idxs]
amvs_da = xr.DataArray(param_nd, coords=coords, dims=dims)
data_nd = np.vstack(calipso_clavrx_data)
data_da = xr.DataArray(data_nd[:, idx], coords=coords_a, dims=dims_a)
lons = param_nd[0, :]
lats = param_nd[1, :]
temp_prof = get_vert_profile_s(gfs_xr, ['temperature'], lons, lats, method='linear')
temp_prof = temp_prof.values
temp_prof_s = temp_prof[0, :, :]
alt_s = []
for k in range(num_amvs):
alt = pressure_to_altitude(param_nd[4, k], None, gfs_press, temp_prof_s[k, :])
alt_s.append(alt)
alt_s = np.array(alt_s)
match_dict[nom_time].append((cc, ll, lon, lat, f_idx, data_da, amvs_da, dist_to_amvs, alt_s))
return match_dict
def create_file(match_dct, output_path, target_filepath, caliop_clavrx_params, amv_params, amv_filenames):
num_aprofs = 0
max_num_amvs = 0
alons = []
alats = []
atimes = []
elems = []
lines = []
fidxs = []
amvs_cnt = []
filename = os.path.split(target_filepath)[1]
w_o_ext, ext = os.path.splitext(filename)
filename = w_o_ext+'.amvs'+'.nc'
output_filepath = output_path+filename
amv_file_s = np.array(amv_filenames, dtype='object')
# scan to get max num levels, amvs
keys = list(match_dct.keys())
for key in keys:
tup_s = match_dct.get(key)
for tup in tup_s:
num_aprofs += 1
atimes.append(key)
elems.append(tup[0])
lines.append(tup[1])
alons.append(tup[2])
alats.append(tup[3])
fidxs.append(tup[4])
amvs = tup[6]
num_amvs = amvs.shape[1]
amvs_cnt.append(num_amvs)
if num_amvs > max_num_amvs:
max_num_amvs = num_amvs
alons = np.array(alons)
alats = np.array(alats)
atimes = np.array(atimes)
elems = np.array(elems)
lines = np.array(lines)
fidxs = np.array(fidxs)
amvs_cnt = np.array(amvs_cnt)
# Sample file to retrieve and copy variable attributes
rg_exmpl = Dataset(amv_ops_exmpl_file)
caliop_clavrx_exmpl = Dataset(caliop_clvrx_exmpl_file, 'r')
# the top level group for the output file
rootgrp = Dataset(output_filepath, 'w', format='NETCDF4')
dim_amvs = rootgrp.createDimension('max_num_amvs', size=max_num_amvs)
dim_num_aeolus_prof = rootgrp.createDimension('num_caliop_profs', size=num_aprofs)
dim_num_files = rootgrp.createDimension('num_amv_files', size=amv_file_s.shape[0])
prf_time = rootgrp.createVariable('time', 'f4', ['num_caliop_profs'])
amv_file_names = rootgrp.createVariable('amv_file_names', str, ['num_amv_files'])
# ---- CALIOP variables ---------------
prf_lon = rootgrp.createVariable('caliop_longitude', 'f4', ['num_caliop_profs'])
prf_lon.units = 'degrees east'
prf_lat = rootgrp.createVariable('caliop_latitude', 'f4', ['num_caliop_profs'])
prf_lat.units = 'degrees north'
prf_time.units = 'seconds since 1970-01-1 00:00:00'
prf_elem = rootgrp.createVariable('FD_elem', 'f4', ['num_caliop_profs'])
prf_line = rootgrp.createVariable('FD_line', 'f4', ['num_caliop_profs'])
amvs_cnt_var = rootgrp.createVariable('num_amvs_per_match', 'i4', ['num_caliop_profs'])
# ----- CALIOP+CLAVRx variables ------
nc4_vars_cc = []
var_s = caliop_clavrx_exmpl.variables
for pidx, param in enumerate(caliop_clavrx_params):
v = var_s[param]
attr_s = v.ncattrs()
if '_FillValue' in attr_s:
var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs'], fill_value=v.getncattr('_FillValue'))
else:
var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs'])
# copy attributes from example to new output variable of the same name
for attr in attr_s:
if attr != '_FillValue':
var.setncattr(attr, v.getncattr(attr))
nc4_vars_cc.append(var)
# ----- AMV variables ----------------
amv_lon = rootgrp.createVariable('amv_longitude', 'f4', ['num_caliop_profs', 'max_num_amvs'])
amv_lon.units = 'degrees east'
amv_lat = rootgrp.createVariable('amv_latitude', 'f4', ['num_caliop_profs', 'max_num_amvs'])
amv_lat.units = 'degrees north'
amv_elem = rootgrp.createVariable('amv_elem', 'f4', ['num_caliop_profs', 'max_num_amvs'])
amv_line = rootgrp.createVariable('amv_line', 'f4', ['num_caliop_profs', 'max_num_amvs'])
amv_fidx = rootgrp.createVariable('amv_file_index', 'i4', ['num_caliop_profs'])
dst_to_amvs = rootgrp.createVariable('distance_to_amvs', 'f4', ['num_caliop_profs', 'max_num_amvs'])
amv_geom_hgt = rootgrp.createVariable('amv_geom_hgt', 'f4', ['num_caliop_profs', 'max_num_amvs'])
amv_geom_hgt.units = 'meter'
amv_geom_hgt.long_name = 'AMV pressure converted to height from GFS temperature vs pressure'
nc4_vars = []
var_s = rg_exmpl.variables
for pidx, param in enumerate(amv_params):
v = var_s[param]
attr_s = v.ncattrs()
if '_FillValue' in attr_s:
var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs', 'max_num_amvs'], fill_value=v.getncattr('_FillValue'))
else:
var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs', 'max_num_amvs'])
# copy attributes from example to new output variable of the same name
for attr in attr_s:
if attr != '_FillValue':
var.setncattr(attr, v.getncattr(attr))
nc4_vars.append(var)
# Write data to file ---------------------
prf_lon[:] = alons
prf_lat[:] = alats
prf_elem[:] = elems
prf_line[:] = lines
prf_time[:] = atimes
amvs_cnt_var[:] = amvs_cnt
amv_fidx[:] = fidxs
amv_file_names[:] = amv_file_s
idx = 0
for key in keys:
tup_s = match_dct.get(key)
for tup in tup_s:
data_nd = tup[5]
amvs_nd = tup[6]
dist = tup[7]
alt = tup[8]
for pidx, param in enumerate(caliop_clavrx_params):
nda = data_nd[pidx]
nc4_vars_cc[pidx][idx] = nda
nda = amvs_nd[0,]
cnt = nda.shape[0]
amv_lon[idx, 0:cnt] = nda
nda = amvs_nd[1,]
cnt = nda.shape[0]
amv_lat[idx, 0:cnt] = nda
nda = amvs_nd[2,]
cnt = nda.shape[0]
amv_elem[idx, 0:cnt] = nda
nda = amvs_nd[3,]
cnt = nda.shape[0]
amv_line[idx, 0:cnt] = nda
dst_to_amvs[idx, 0:cnt] = dist
amv_geom_hgt[idx, 0:cnt] = alt
for pidx, param in enumerate(amv_params):
nda = amvs_nd[pidx+4,]
cnt = nda.shape[0]
nc4_vars[pidx][idx, 0:cnt] = nda
idx += 1
rg_exmpl.close()
rootgrp.close()
def run_caliop_clavrx_amv_match(output_path, path_to_caliop_clavrx, path_to_amvs, path_to_gfs, amv_source='OPS', band='14'):
caliop_clavrx_params = get_parameters_caliop_clavrx(filename=caliop_clvrx_exmpl_file)
caliop_clavrx_ds = CLAVRx_CALIPSO(path_to_caliop_clavrx)
amv_ds = get_datasource(path_to_amvs, amv_source, file_time_span=20, band=band)
amv_params = amv_ds.get_parameters()
amv_filenames = amv_ds.flist
gfs_ds = get_datasource(path_to_gfs, 'GFS')
for f in caliop_clavrx_ds.flist:
nom_time = caliop_clavrx_ds.get_datetime(f).timestamp()
calipso_clavrx_nc4 = Dataset(f)
print('Start processing: ', f)
match_dict = match_calipso_clavrx_to_amvs(nom_time, calipso_clavrx_nc4, amv_ds, caliop_clavrx_params, gfs_ds)
calipso_clavrx_nc4.close()
if match_dict is None or not match_dict[nom_time]:
print('No matches for: ', f)
continue
create_file(match_dict, output_path, f, caliop_clavrx_params, amv_params, amv_filenames)
print('Done processing: ', f)
def analyze(filename):
h5f = h5py.File(filename, 'r')
cc_ctt = h5f['closest_calipso_top_temperature'][:]
cc_ctt = np.where(cc_ctt == -9999.0, np.nan, cc_ctt)
cc_ctt += 273.0
cc_ctp = h5f['closest_calipso_top_pressure'][:]
cc_ctp = np.where(cc_ctp == -9999.0, np.nan, cc_ctp)
lats = h5f['caliop_latitude'][:]
lons = h5f['caliop_longitude'][:]
acha_ctp = h5f['cld_press_acha'][:]
acha_ctp = np.where(acha_ctp == -999.0, np.nan, acha_ctp)
amv_press = h5f['pressure'][:, 0:3]
amv_press = np.where(amv_press == -999.0, np.nan, amv_press)
amv_press_avg = np.nanmean(amv_press, axis=1)
return cc_ctp, acha_ctp, amv_press_avg, lats
new_header = ' TARG LAT LON BOX SRCH SPD DIR PW LLCM SPDG DIRG TBALB MAXC TRKM PERR HAMD QINF QIWF QIC HGT'
num_fmts = ['%6i', '%7.2f', '%8.2f', '%4i', '%4i', '%7.2f', '%7.2f', '%8.2f', '%4i', '%7.2f', '%7.2f', '%8.2f', '%7.2f', '%4i', '%7.2f', '%4i', '%4i', '%6i', '%6i', '%9.2f']
def get_temp_prof_s(gfs_ds, nom_time, lons, lats):
gfs_fname, _, _ = gfs_ds.get_file(nom_time, window=180.0)
if gfs_fname is None:
return None
gfs_xr = xr.open_dataset(gfs_fname)
gfs_press = gfs_xr['pressure levels']
gfs_press = gfs_press.values
temp_prof = get_vert_profile_s(gfs_xr, ['temperature'], lons, lats, method='linear')
temp_prof = temp_prof.values
temp_prof_s = temp_prof[0, :, :]
return temp_prof_s, gfs_press
def get_temp_prof_s_intrp(nom_time, lons, lats):
gfs_fname_left, nom_time_left, gfs_fname_rght, nom_time_rght = get_bounding_gfs_files(nom_time)
gfs_xr_left = xr.open_dataset(gfs_fname_left)
gfs_xr_rght = xr.open_dataset(gfs_fname_rght)
gfs_press = gfs_xr_left['pressure levels'].values
temp_prof = get_time_interpolated_vert_profile([gfs_xr_left, gfs_xr_rght], [nom_time_left, nom_time_rght], 'temperature', nom_time, lons, lats)
pt_s = get_time_interpolated_point_s([gfs_xr_left, gfs_xr_rght], [nom_time_left, nom_time_rght], ['surface pressure', 'surface temperature', 'surface height'], nom_time, lons, lats)
return temp_prof, gfs_press, pt_s[0,], pt_s[1,], pt_s[2,]
def compute_and_add_geo_hgt(path_to_amvs, path_to_gfs, amv_source, band='14', out_file=None):
gfs_ds = get_datasource(path_to_gfs, 'GFS')
amv_ds = get_datasource(path_to_amvs, amv_source, file_time_span=20, band=band)
amv_filenames = amv_ds.flist
alt_s = []
prs_s = []
for fname in amv_filenames:
print('Start: ', fname)
nom_time = amv_ds.get_datetime(fname).timestamp()
amvs_nd = get_amv_nd(fname, delimiter=',')
amvs_nd = filter_amvs(amvs_nd)
num_amvs = amvs_nd.shape[0]
amv_lons = amvs_nd[:, amv_lon_idx]
amv_lats = amvs_nd[:, amv_lat_idx]
temp_prof_s, gfs_press = get_temp_prof_s(gfs_ds, nom_time, amv_lons, amv_lats)
# temp_prof_s, gfs_press, sfc_press, sfc_temp, sfc_hgt, = get_temp_prof_s_intrp(nom_time, amv_lons, amv_lats)
alt_f = []
for k in range(num_amvs):
amv_press = amvs_nd[k, amv_pres_idx]
alt = pressure_to_altitude(amv_press, None, gfs_press, temp_prof_s[k, :])
# alt = pressure_to_altitude(amv_press, None, gfs_press, temp_prof_s[k, :], sfc_pres=sfc_press[k], sfc_temp=sfc_temp[k], sfc_elev=sfc_hgt[k])
alt_s.append(alt)
alt_f.append(alt)
prs_s.append(amv_press)
alt_f = np.array(alt_f)
alt_f = np.reshape(alt_f, [num_amvs, 1])
amvs_nd = np.hstack([amvs_nd, alt_f])
all_params = ['TARG', 'AMV Lat', 'AMV Lon', ' target box size', 'search box size', 'AMV speed', 'AMV direction', 'AMV Pressure',
'LLCM', 'guess speed', 'guess direction', 'TBALB', 'MAXC', 'TRKM', 'PERR', 'HAMD', 'QI no forecast',
'QI with forecast', 'Common QI', 'Height (meters)']
coords = {'num_params': all_params}
dims = ['num_amvs', 'num_params']
amvs_da = xr.DataArray(amvs_nd, coords=coords, dims=dims)
new_amv_fname = os.path.split(fname)[1]
new_amv_fname = os.path.splitext(new_amv_fname)[0] + '.nc4'
amvs_da.to_netcdf(new_amv_fname, format='NETCDF4')
new_amv_fname = os.path.split(fname)[1]
np.savetxt(new_amv_fname, amvs_nd, fmt=num_fmts, header=new_header, delimiter=',')
print('Done...')
alt_s = np.array(alt_s)
prs_s = np.array(prs_s)
if out_file is not None:
np.save(out_file, (prs_s, alt_s))
else:
return prs_s, alt_s