import os import h5py import numpy as np import xarray as xr from netCDF4 import Dataset from aeolus.datasource import get_datasource, CLAVRx_CALIPSO, get_parameters_caliop_clavrx from util.util import haversine_np from amv.intercompare import * from util.util import pressure_to_altitude from util.gfs_reader import * amv_file_duration = 60 # minutes half_width = 30 # search box centered on CALIOP profile (FGF coordinates) num_elems = 5424 num_lines = 5424 amv_fmwk_exmpl_file = '/ships19/cloud/scratch/4TH_AMV_INTERCOMPARISON/FMWK2_AMV/GOES16_ABI_2KM_FD_2019293_0020_34_WINDS_AMV_EN-14CT.nc' amv_ops_exmpl_file = '/ships19/cloud/scratch/AMV_FAST_BIAS/OPS_AMVS/OR_ABI-L2-DMWF-M6C14_G16_s20201050000166_e20201050009474_c20201050023226.nc' #caliop_clvrx_exmpl_file = '/data/Personal/rink/clavrx_calipso/g16_s20201050200_06kmCLay.matchup.calipso.h5' #caliop_clvrx_exmpl_file = '/data/Personal/rink/4TH_AMV_INTERCOMPARISON/clavrx_calipso/g16_s20192930150_06kmCLay.matchup.calipso.h5' caliop_clvrx_exmpl_file = '/data/Personal/rink/AMS_2022/caliop_clavrx/g16_s20192931130_06kmCLay.matchup.calipso.h5' def get_search_box(cc, ll): c_rng = [cc - half_width, cc + half_width] l_rng = [ll - half_width, ll + half_width] if c_rng[0] < 0: c_rng[0] = 0 if l_rng[0] < 0: l_rng[0] = 0 if c_rng[1] >= num_elems: c_rng[1] = num_elems - 1 if l_rng[1] >= num_lines: l_rng[1] = num_lines - 1 return c_rng, l_rng def match_calipso_clavrx_to_amvs(nom_time, calipso_clavrx_nc4, amv_files, calipso_clavrx_params, gfs_ds): nav = amv_files.get_navigation() amv_params = amv_files.get_parameters() all_params = [amv_files.lon_name, amv_files.lat_name, amv_files.elem_name, amv_files.line_name] + amv_params match_dict = {} coords = {'num_params': all_params} dims = ['num_params', 'num_amvs'] amv_fname, ftime, f_idx = amv_files.get_file(nom_time) if f_idx is None: return None print(amv_fname) gfs_fname, _, _ = gfs_ds.get_file(nom_time, window=180.0) if gfs_fname is None: return None print(gfs_fname) gfs_xr = xr.open_dataset(gfs_fname) gfs_press = gfs_xr['pressure levels'] gfs_press = gfs_press.values match_dict[nom_time] = [] amvs_nd = get_amv_nd(amv_fname, delimiter=',') amvs_nd = filter_amvs(amvs_nd) amv_lons = amvs_nd[:, amv_lon_idx] amv_lats = amvs_nd[:, amv_lat_idx] amv_cc, amv_ll = nav.earth_to_lc_s(amv_lons, amv_lats) param_s = [] param_s.append(amv_lons) param_s.append(amv_lats) param_s.append(amv_cc) param_s.append(amv_ll) param_s.append(amvs_nd[:, amv_pres_idx]) param_s.append(amvs_nd[:, amv_spd_idx]) param_s.append(amvs_nd[:, amv_dir_idx]) coords_a = {'num_calipso_clavrx_params': calipso_clavrx_params} dims_a = ['num_calipso_clavrx_params'] calipso_clavrx_data = [] for pname in calipso_clavrx_params: calipso_clavrx_data.append(calipso_clavrx_nc4[pname][:]) lat_s = calipso_clavrx_nc4['latitude'][:] lon_s = calipso_clavrx_nc4['longitude'][:] if calipso_clavrx_nc4.variables.get('x') is None: xs, ys = nav.earth_to_lc_s(lon_s, lat_s) else: xs = calipso_clavrx_nc4['x'][:] ys = calipso_clavrx_nc4['y'][:] clvr_xy_s = zip(xs, ys) for idx, clvr_xy in enumerate(clvr_xy_s): lon = lon_s[idx] lat = lat_s[idx] cc, ll = clvr_xy c_rng, l_rng = get_search_box(cc, ll) in_cc = np.logical_and(amv_cc > c_rng[0], amv_cc < c_rng[1]) in_ll = np.logical_and(amv_ll > l_rng[0], amv_ll < l_rng[1]) in_box = np.logical_and(in_cc, in_ll) num_amvs = np.sum(in_box) if num_amvs == 0: continue dist = haversine_np(lon, lat, amv_lons[in_box], amv_lats[in_box]) s_idxs = np.argsort(dist) param_nd = np.vstack(param_s) param_nd = param_nd[:, in_box] param_nd = param_nd[:, s_idxs] dist_to_amvs = dist[s_idxs] amvs_da = xr.DataArray(param_nd, coords=coords, dims=dims) data_nd = np.vstack(calipso_clavrx_data) data_da = xr.DataArray(data_nd[:, idx], coords=coords_a, dims=dims_a) lons = param_nd[0, :] lats = param_nd[1, :] temp_prof = get_vert_profile_s(gfs_xr, ['temperature'], lons, lats, method='nearest') temp_prof = temp_prof.values temp_prof_s = temp_prof[0, :, :] alt_s = [] for k in range(num_amvs): alt = pressure_to_altitude(param_nd[4, k], None, gfs_press, temp_prof_s[k, :]) alt_s.append(alt.magnitude) alt_s = np.array(alt_s) match_dict[nom_time].append((cc, ll, lon, lat, f_idx, data_da, amvs_da, dist_to_amvs, alt_s)) return match_dict def create_file(match_dct, output_path, target_filepath, caliop_clavrx_params, amv_params, amv_filenames): num_aprofs = 0 max_num_amvs = 0 alons = [] alats = [] atimes = [] elems = [] lines = [] fidxs = [] amvs_cnt = [] filename = os.path.split(target_filepath)[1] w_o_ext, ext = os.path.splitext(filename) filename = w_o_ext+'.amvs'+'.nc' output_filepath = output_path+filename amv_file_s = np.array(amv_filenames, dtype='object') # scan to get max num levels, amvs keys = list(match_dct.keys()) for key in keys: tup_s = match_dct.get(key) for tup in tup_s: num_aprofs += 1 atimes.append(key) elems.append(tup[0]) lines.append(tup[1]) alons.append(tup[2]) alats.append(tup[3]) fidxs.append(tup[4]) amvs = tup[6] num_amvs = amvs.shape[1] amvs_cnt.append(num_amvs) if num_amvs > max_num_amvs: max_num_amvs = num_amvs alons = np.array(alons) alats = np.array(alats) atimes = np.array(atimes) elems = np.array(elems) lines = np.array(lines) fidxs = np.array(fidxs) amvs_cnt = np.array(amvs_cnt) # Sample file to retrieve and copy variable attributes rg_exmpl = Dataset(amv_ops_exmpl_file) caliop_clavrx_exmpl = Dataset(caliop_clvrx_exmpl_file, 'r') # the top level group for the output file rootgrp = Dataset(output_filepath, 'w', format='NETCDF4') dim_amvs = rootgrp.createDimension('max_num_amvs', size=max_num_amvs) dim_num_aeolus_prof = rootgrp.createDimension('num_caliop_profs', size=num_aprofs) dim_num_files = rootgrp.createDimension('num_amv_files', size=amv_file_s.shape[0]) prf_time = rootgrp.createVariable('time', 'f4', ['num_caliop_profs']) amv_file_names = rootgrp.createVariable('amv_file_names', str, ['num_amv_files']) # ---- CALIOP variables --------------- prf_lon = rootgrp.createVariable('caliop_longitude', 'f4', ['num_caliop_profs']) prf_lon.units = 'degrees east' prf_lat = rootgrp.createVariable('caliop_latitude', 'f4', ['num_caliop_profs']) prf_lat.units = 'degrees north' prf_time.units = 'seconds since 1970-01-1 00:00:00' prf_elem = rootgrp.createVariable('FD_elem', 'f4', ['num_caliop_profs']) prf_line = rootgrp.createVariable('FD_line', 'f4', ['num_caliop_profs']) amvs_cnt_var = rootgrp.createVariable('num_amvs_per_match', 'i4', ['num_caliop_profs']) # ----- CALIOP+CLAVRx variables ------ nc4_vars_cc = [] var_s = caliop_clavrx_exmpl.variables for pidx, param in enumerate(caliop_clavrx_params): v = var_s[param] attr_s = v.ncattrs() if '_FillValue' in attr_s: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs'], fill_value=v.getncattr('_FillValue')) else: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs']) # copy attributes from example to new output variable of the same name for attr in attr_s: if attr != '_FillValue': var.setncattr(attr, v.getncattr(attr)) nc4_vars_cc.append(var) # ----- AMV variables ---------------- amv_lon = rootgrp.createVariable('amv_longitude', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_lon.units = 'degrees east' amv_lat = rootgrp.createVariable('amv_latitude', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_lat.units = 'degrees north' amv_elem = rootgrp.createVariable('amv_elem', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_line = rootgrp.createVariable('amv_line', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_fidx = rootgrp.createVariable('amv_file_index', 'i4', ['num_caliop_profs']) dst_to_amvs = rootgrp.createVariable('distance_to_amvs', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_geom_hgt = rootgrp.createVariable('amv_geom_hgt', 'f4', ['num_caliop_profs', 'max_num_amvs']) nc4_vars = [] var_s = rg_exmpl.variables for pidx, param in enumerate(amv_params): v = var_s[param] attr_s = v.ncattrs() if '_FillValue' in attr_s: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs', 'max_num_amvs'], fill_value=v.getncattr('_FillValue')) else: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs', 'max_num_amvs']) # copy attributes from example to new output variable of the same name for attr in attr_s: if attr != '_FillValue': var.setncattr(attr, v.getncattr(attr)) nc4_vars.append(var) # Write data to file --------------------- prf_lon[:] = alons prf_lat[:] = alats prf_elem[:] = elems prf_line[:] = lines prf_time[:] = atimes amvs_cnt_var[:] = amvs_cnt amv_fidx[:] = fidxs amv_file_names[:] = amv_file_s idx = 0 for key in keys: tup_s = match_dct.get(key) for tup in tup_s: data_nd = tup[5] amvs_nd = tup[6] dist = tup[7] alt = tup[8] for pidx, param in enumerate(caliop_clavrx_params): nda = data_nd[pidx] nc4_vars_cc[pidx][idx] = nda nda = amvs_nd[0,] cnt = nda.shape[0] amv_lon[idx, 0:cnt] = nda nda = amvs_nd[1,] cnt = nda.shape[0] amv_lat[idx, 0:cnt] = nda nda = amvs_nd[2,] cnt = nda.shape[0] amv_elem[idx, 0:cnt] = nda nda = amvs_nd[3,] cnt = nda.shape[0] amv_line[idx, 0:cnt] = nda dst_to_amvs[idx, 0:cnt] = dist amv_geom_hgt[idx, 0:cnt] = alt for pidx, param in enumerate(amv_params): nda = amvs_nd[pidx+4,] cnt = nda.shape[0] nc4_vars[pidx][idx, 0:cnt] = nda idx += 1 rg_exmpl.close() rootgrp.close() def run_caliop_clavrx_amv_match(output_path, path_to_caliop_clavrx, path_to_amvs, path_to_gfs, amv_source='OPS', band='14'): caliop_clavrx_params = get_parameters_caliop_clavrx(filename=caliop_clvrx_exmpl_file) caliop_clavrx_ds = CLAVRx_CALIPSO(path_to_caliop_clavrx) amv_ds = get_datasource(path_to_amvs, amv_source, file_time_span=20, band=band) amv_params = amv_ds.get_parameters() amv_filenames = amv_ds.flist gfs_ds = get_datasource(path_to_gfs, 'GFS') for f in caliop_clavrx_ds.flist: nom_time = caliop_clavrx_ds.get_datetime(f).timestamp() calipso_clavrx_nc4 = Dataset(f) print('Start processing: ', f) match_dict = match_calipso_clavrx_to_amvs(nom_time, calipso_clavrx_nc4, amv_ds, caliop_clavrx_params, gfs_ds) calipso_clavrx_nc4.close() if match_dict is None or not match_dict[nom_time]: print('No matches for: ', f) continue create_file(match_dict, output_path, f, caliop_clavrx_params, amv_params, amv_filenames) print('Done processing: ', f) def analyze(filename): h5f = h5py.File(filename, 'r') cc_ctt = h5f['closest_calipso_top_temperature'][:] cc_ctt = np.where(cc_ctt == -9999.0, np.nan, cc_ctt) cc_ctt += 273.0 cc_ctp = h5f['closest_calipso_top_pressure'][:] cc_ctp = np.where(cc_ctp == -9999.0, np.nan, cc_ctp) lats = h5f['caliop_latitude'][:] lons = h5f['caliop_longitude'][:] acha_ctp = h5f['cld_press_acha'][:] acha_ctp = np.where(acha_ctp == -999.0, np.nan, acha_ctp) amv_press = h5f['pressure'][:, 0:3] amv_press = np.where(amv_press == -999.0, np.nan, amv_press) amv_press_avg = np.nanmean(amv_press, axis=1) return cc_ctp, acha_ctp, amv_press_avg, lats new_header = ' TARG LAT LON BOX SRCH SPD DIR PW LLCM SPDG DIRG TBALB MAXC TRKM PERR HAMD QINF QIWF QIC HGT' num_fmts = ['%6i', '%7.2f', '%8.2f', '%4i', '%4i', '%7.2f', '%7.2f', '%8.2f', '%4i', '%7.2f', '%7.2f', '%8.2f', '%7.2f', '%4i', '%7.2f', '%4i', '%4i', '%6i', '%6i', '%9.2f'] def get_temp_prof_s(gfs_ds, nom_time, amv_lons, amv_lats): gfs_fname, _, _ = gfs_ds.get_file(nom_time, window=180.0) if gfs_fname is None: return None gfs_xr = xr.open_dataset(gfs_fname) gfs_press = gfs_xr['pressure levels'] gfs_press = gfs_press.values temp_prof = get_vert_profile_s(gfs_xr, ['temperature'], amv_lons, amv_lats, method='linear') temp_prof = temp_prof.values temp_prof_s = temp_prof[0, :, :] return temp_prof_s, gfs_press def get_temp_prof_s_intrp(nom_time, amv_lons, amv_lats): gfs_fname_left, nom_time_left, gfs_fname_rght, nom_time_rght = get_bounding_gfs_files(nom_time) gfs_xr_left = xr.open_dataset(gfs_fname_left) gfs_xr_rght = xr.open_dataset(gfs_fname_rght) gfs_press = gfs_xr_left['pressure levels'].values temp_prof = get_time_interpolated_vert_profile([gfs_xr_left, gfs_xr_rght], [nom_time_left, nom_time_rght], 'temperature', nom_time, amv_lons, amv_lats) pt_s = get_time_interpolated_point_s([gfs_xr_left, gfs_xr_rght], [nom_time_left, nom_time_rght], ['surface pressure', 'surface temperature', 'surface height'], nom_time, amv_lons, amv_lats) return temp_prof, gfs_press, pt_s[0,], pt_s[1,], pt_s[2,] def compute_and_add_geo_hgt(path_to_amvs, path_to_gfs, amv_source, band='14', out_file=None): gfs_ds = get_datasource(path_to_gfs, 'GFS') amv_ds = get_datasource(path_to_amvs, amv_source, file_time_span=20, band=band) amv_filenames = amv_ds.flist alt_s = [] prs_s = [] for fname in amv_filenames: print('Start: ', fname) nom_time = amv_ds.get_datetime(fname).timestamp() amvs_nd = get_amv_nd(fname, delimiter=',') amvs_nd = filter_amvs(amvs_nd) num_amvs = amvs_nd.shape[0] amv_lons = amvs_nd[:, amv_lon_idx] amv_lats = amvs_nd[:, amv_lat_idx] temp_prof_s, gfs_press = get_temp_prof_s(gfs_ds, nom_time, amv_lons, amv_lats) # temp_prof_s, gfs_press, sfc_press, sfc_temp, sfc_hgt, = get_temp_prof_s_intrp(nom_time, amv_lons, amv_lats) alt_f = [] for k in range(num_amvs): amv_press = amvs_nd[k, amv_pres_idx] alt = pressure_to_altitude(amv_press, None, gfs_press, temp_prof_s[k, :]) # alt = pressure_to_altitude(amv_press, None, gfs_press, temp_prof_s[k, :], sfc_pres=sfc_press[k], sfc_temp=sfc_temp[k], sfc_elev=sfc_hgt[k]) alt_s.append(alt) alt_f.append(alt) prs_s.append(amv_press) alt_f = np.array(alt_f) alt_f = np.reshape(alt_f, [num_amvs, 1]) amvs_nd = np.hstack([amvs_nd, alt_f]) amv_params = amv_ds.get_parameters() all_params = [amv_ds.lon_name, amv_ds.lat_name, amv_ds.elem_name, amv_ds.line_name] + amv_params coords = {'num_params': all_params} dims = ['num_amvs', 'num_params'] amvs_da = xr.DataArray(amvs_nd, coords=coords, dims=dims) new_amv_fname = os.path.split(fname)[1] new_amv_fname = os.path.splitext(new_amv_fname)[0] + 'nc4' amvs_da.to_netcdf(new_amv_fname, format='NETCDF4') new_amv_fname = os.path.split(fname)[1] np.savetxt(new_amv_fname, amvs_nd, fmt=num_fmts, header=new_header, delimiter=',') print('Done...') alt_s = np.array(alt_s) prs_s = np.array(prs_s) if out_file is not None: np.save(out_file, (prs_s, alt_s)) else: return prs_s, alt_s