import os import h5py import numpy as np import xarray as xr from netCDF4 import Dataset from aeolus.datasource import get_datasource, CLAVRx_CALIPSO, get_parameters_caliop_clavrx from util.util import haversine_np from amv.intercompare import * from util.util import pressure_to_altitude from util.gfs_reader import * amv_file_duration = 60 # minutes half_width = 30 # search box centered on CALIOP profile (FGF coordinates) num_elems = 5424 num_lines = 5424 amv_fmwk_exmpl_file = '/ships19/cloud/scratch/4TH_AMV_INTERCOMPARISON/FMWK2_AMV/GOES16_ABI_2KM_FD_2019293_0020_34_WINDS_AMV_EN-14CT.nc' amv_ops_exmpl_file = '/ships19/cloud/scratch/AMV_FAST_BIAS/OPS_AMVS/OR_ABI-L2-DMWF-M6C14_G16_s20201050000166_e20201050009474_c20201050023226.nc' #caliop_clvrx_exmpl_file = '/data/Personal/rink/clavrx_calipso/g16_s20201050200_06kmCLay.matchup.calipso.h5' #caliop_clvrx_exmpl_file = '/data/Personal/rink/4TH_AMV_INTERCOMPARISON/clavrx_calipso/g16_s20192930150_06kmCLay.matchup.calipso.h5' caliop_clvrx_exmpl_file = '/data/Personal/rink/AMS_2022/caliop_clavrx/g16_s20192931130_06kmCLay.matchup.calipso.h5' def get_search_box(cc, ll): c_rng = [cc - half_width, cc + half_width] l_rng = [ll - half_width, ll + half_width] if c_rng[0] < 0: c_rng[0] = 0 if l_rng[0] < 0: l_rng[0] = 0 if c_rng[1] >= num_elems: c_rng[1] = num_elems - 1 if l_rng[1] >= num_lines: l_rng[1] = num_lines - 1 return c_rng, l_rng def match_calipso_clavrx_to_amvs(nom_time, calipso_clavrx_nc4, amv_files, calipso_clavrx_params, gfs_ds): nav = amv_files.get_navigation() amv_params = amv_files.get_parameters() all_params = [amv_files.lon_name, amv_files.lat_name, amv_files.elem_name, amv_files.line_name] + amv_params match_dict = {} coords = {'num_params': all_params} dims = ['num_params', 'num_amvs'] amv_fname, ftime, f_idx = amv_files.get_file(nom_time) if f_idx is None: return None print(amv_fname) gfs_fname, _, _ = gfs_ds.get_file(nom_time, window=180.0) if gfs_fname is None: return None print(gfs_fname) gfs_xr = xr.open_dataset(gfs_fname) gfs_press = gfs_xr['pressure levels'] gfs_press = gfs_press.values match_dict[nom_time] = [] amvs_nd = get_amv_nd(amv_fname, delimiter=',') amvs_nd = filter_amvs(amvs_nd) amv_lons = amvs_nd[:, amv_lon_idx] amv_lats = amvs_nd[:, amv_lat_idx] amv_cc, amv_ll = nav.earth_to_lc_s(amv_lons, amv_lats) param_s = [] param_s.append(amv_lons) param_s.append(amv_lats) param_s.append(amv_cc) param_s.append(amv_ll) param_s.append(amvs_nd[:, amv_pres_idx]) param_s.append(amvs_nd[:, amv_spd_idx]) param_s.append(amvs_nd[:, amv_dir_idx]) coords_a = {'num_calipso_clavrx_params': calipso_clavrx_params} dims_a = ['num_calipso_clavrx_params'] calipso_clavrx_data = [] for pname in calipso_clavrx_params: calipso_clavrx_data.append(calipso_clavrx_nc4[pname][:]) lat_s = calipso_clavrx_nc4['latitude'][:] lon_s = calipso_clavrx_nc4['longitude'][:] if calipso_clavrx_nc4.variables.get('x') is None: xs, ys = nav.earth_to_lc_s(lon_s, lat_s) else: xs = calipso_clavrx_nc4['x'][:] ys = calipso_clavrx_nc4['y'][:] clvr_xy_s = zip(xs, ys) for idx, clvr_xy in enumerate(clvr_xy_s): lon = lon_s[idx] lat = lat_s[idx] cc, ll = clvr_xy c_rng, l_rng = get_search_box(cc, ll) in_cc = np.logical_and(amv_cc > c_rng[0], amv_cc < c_rng[1]) in_ll = np.logical_and(amv_ll > l_rng[0], amv_ll < l_rng[1]) in_box = np.logical_and(in_cc, in_ll) num_amvs = np.sum(in_box) if num_amvs == 0: continue dist = haversine_np(lon, lat, amv_lons[in_box], amv_lats[in_box]) s_idxs = np.argsort(dist) param_nd = np.vstack(param_s) param_nd = param_nd[:, in_box] param_nd = param_nd[:, s_idxs] dist_to_amvs = dist[s_idxs] amvs_da = xr.DataArray(param_nd, coords=coords, dims=dims) data_nd = np.vstack(calipso_clavrx_data) data_da = xr.DataArray(data_nd[:, idx], coords=coords_a, dims=dims_a) lons = param_nd[0, :] lats = param_nd[1, :] temp_prof = get_vert_profile_s(gfs_xr, ['temperature'], lons, lats, method='nearest') temp_prof = temp_prof.values temp_prof_s = temp_prof[0, :, :] alt_s = [] for k in range(num_amvs): alt = pressure_to_altitude(param_nd[4, k], None, gfs_press, temp_prof_s[k, :]) alt_s.append(alt.magnitude) alt_s = np.array(alt_s) match_dict[nom_time].append((cc, ll, lon, lat, f_idx, data_da, amvs_da, dist_to_amvs, alt_s)) return match_dict def create_file(match_dct, output_path, target_filepath, caliop_clavrx_params, amv_params, amv_filenames): num_aprofs = 0 max_num_amvs = 0 alons = [] alats = [] atimes = [] elems = [] lines = [] fidxs = [] amvs_cnt = [] filename = os.path.split(target_filepath)[1] w_o_ext, ext = os.path.splitext(filename) filename = w_o_ext+'.amvs'+'.nc' output_filepath = output_path+filename amv_file_s = np.array(amv_filenames, dtype='object') # scan to get max num levels, amvs keys = list(match_dct.keys()) for key in keys: tup_s = match_dct.get(key) for tup in tup_s: num_aprofs += 1 atimes.append(key) elems.append(tup[0]) lines.append(tup[1]) alons.append(tup[2]) alats.append(tup[3]) fidxs.append(tup[4]) amvs = tup[6] num_amvs = amvs.shape[1] amvs_cnt.append(num_amvs) if num_amvs > max_num_amvs: max_num_amvs = num_amvs alons = np.array(alons) alats = np.array(alats) atimes = np.array(atimes) elems = np.array(elems) lines = np.array(lines) fidxs = np.array(fidxs) amvs_cnt = np.array(amvs_cnt) # Sample file to retrieve and copy variable attributes rg_exmpl = Dataset(amv_ops_exmpl_file) caliop_clavrx_exmpl = Dataset(caliop_clvrx_exmpl_file, 'r') # the top level group for the output file rootgrp = Dataset(output_filepath, 'w', format='NETCDF4') dim_amvs = rootgrp.createDimension('max_num_amvs', size=max_num_amvs) dim_num_aeolus_prof = rootgrp.createDimension('num_caliop_profs', size=num_aprofs) dim_num_files = rootgrp.createDimension('num_amv_files', size=amv_file_s.shape[0]) prf_time = rootgrp.createVariable('time', 'f4', ['num_caliop_profs']) amv_file_names = rootgrp.createVariable('amv_file_names', str, ['num_amv_files']) # ---- CALIOP variables --------------- prf_lon = rootgrp.createVariable('caliop_longitude', 'f4', ['num_caliop_profs']) prf_lon.units = 'degrees east' prf_lat = rootgrp.createVariable('caliop_latitude', 'f4', ['num_caliop_profs']) prf_lat.units = 'degrees north' prf_time.units = 'seconds since 1970-01-1 00:00:00' prf_elem = rootgrp.createVariable('FD_elem', 'f4', ['num_caliop_profs']) prf_line = rootgrp.createVariable('FD_line', 'f4', ['num_caliop_profs']) amvs_cnt_var = rootgrp.createVariable('num_amvs_per_match', 'i4', ['num_caliop_profs']) # ----- CALIOP+CLAVRx variables ------ nc4_vars_cc = [] var_s = caliop_clavrx_exmpl.variables for pidx, param in enumerate(caliop_clavrx_params): v = var_s[param] attr_s = v.ncattrs() if '_FillValue' in attr_s: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs'], fill_value=v.getncattr('_FillValue')) else: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs']) # copy attributes from example to new output variable of the same name for attr in attr_s: if attr != '_FillValue': var.setncattr(attr, v.getncattr(attr)) nc4_vars_cc.append(var) # ----- AMV variables ---------------- amv_lon = rootgrp.createVariable('amv_longitude', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_lon.units = 'degrees east' amv_lat = rootgrp.createVariable('amv_latitude', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_lat.units = 'degrees north' amv_elem = rootgrp.createVariable('amv_elem', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_line = rootgrp.createVariable('amv_line', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_fidx = rootgrp.createVariable('amv_file_index', 'i4', ['num_caliop_profs']) dst_to_amvs = rootgrp.createVariable('distance_to_amvs', 'f4', ['num_caliop_profs', 'max_num_amvs']) amv_geom_hgt = rootgrp.createVariable('amv_geom_hgt', 'f4', ['num_caliop_profs', 'max_num_amvs']) nc4_vars = [] var_s = rg_exmpl.variables for pidx, param in enumerate(amv_params): v = var_s[param] attr_s = v.ncattrs() if '_FillValue' in attr_s: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs', 'max_num_amvs'], fill_value=v.getncattr('_FillValue')) else: var = rootgrp.createVariable(param, v.dtype, ['num_caliop_profs', 'max_num_amvs']) # copy attributes from example to new output variable of the same name for attr in attr_s: if attr != '_FillValue': var.setncattr(attr, v.getncattr(attr)) nc4_vars.append(var) # Write data to file --------------------- prf_lon[:] = alons prf_lat[:] = alats prf_elem[:] = elems prf_line[:] = lines prf_time[:] = atimes amvs_cnt_var[:] = amvs_cnt amv_fidx[:] = fidxs amv_file_names[:] = amv_file_s idx = 0 for key in keys: tup_s = match_dct.get(key) for tup in tup_s: data_nd = tup[5] amvs_nd = tup[6] dist = tup[7] alt = tup[8] for pidx, param in enumerate(caliop_clavrx_params): nda = data_nd[pidx] nc4_vars_cc[pidx][idx] = nda nda = amvs_nd[0,] cnt = nda.shape[0] amv_lon[idx, 0:cnt] = nda nda = amvs_nd[1,] cnt = nda.shape[0] amv_lat[idx, 0:cnt] = nda nda = amvs_nd[2,] cnt = nda.shape[0] amv_elem[idx, 0:cnt] = nda nda = amvs_nd[3,] cnt = nda.shape[0] amv_line[idx, 0:cnt] = nda dst_to_amvs[idx, 0:cnt] = dist amv_geom_hgt[idx, 0:cnt] = alt for pidx, param in enumerate(amv_params): nda = amvs_nd[pidx+4,] cnt = nda.shape[0] nc4_vars[pidx][idx, 0:cnt] = nda idx += 1 rg_exmpl.close() rootgrp.close() def run_caliop_clavrx_amv_match(output_path, path_to_caliop_clavrx, path_to_amvs, path_to_gfs, amv_source='OPS', band='14'): caliop_clavrx_params = get_parameters_caliop_clavrx(filename=caliop_clvrx_exmpl_file) caliop_clavrx_ds = CLAVRx_CALIPSO(path_to_caliop_clavrx) amv_ds = get_datasource(path_to_amvs, amv_source, file_time_span=20, band=band) amv_params = amv_ds.get_parameters() amv_filenames = amv_ds.flist gfs_ds = get_datasource(path_to_gfs, 'GFS') for f in caliop_clavrx_ds.flist: nom_time = caliop_clavrx_ds.get_datetime(f).timestamp() calipso_clavrx_nc4 = Dataset(f) print('Start processing: ', f) match_dict = match_calipso_clavrx_to_amvs(nom_time, calipso_clavrx_nc4, amv_ds, caliop_clavrx_params, gfs_ds) calipso_clavrx_nc4.close() if match_dict is None or not match_dict[nom_time]: print('No matches for: ', f) continue create_file(match_dict, output_path, f, caliop_clavrx_params, amv_params, amv_filenames) print('Done processing: ', f) def analyze(filename): h5f = h5py.File(filename, 'r') cc_ctt = h5f['closest_calipso_top_temperature'][:] cc_ctt = np.where(cc_ctt == -9999.0, np.nan, cc_ctt) cc_ctt += 273.0 cc_ctp = h5f['closest_calipso_top_pressure'][:] cc_ctp = np.where(cc_ctp == -9999.0, np.nan, cc_ctp) lats = h5f['caliop_latitude'][:] lons = h5f['caliop_longitude'][:] acha_ctp = h5f['cld_press_acha'][:] acha_ctp = np.where(acha_ctp == -999.0, np.nan, acha_ctp) amv_press = h5f['pressure'][:, 0:3] amv_press = np.where(amv_press == -999.0, np.nan, amv_press) amv_press_avg = np.nanmean(amv_press, axis=1) return cc_ctp, acha_ctp, amv_press_avg, lats new_header = ' TARG LAT LON BOX SRCH SPD DIR PW LLCM SPDG DIRG TBALB MAXC TRKM PERR HAMD QINF QIWF QIC HGT' num_fmts = ['%6i', '%7.2f', '%8.2f', '%4i', '%4i', '%7.2f', '%7.2f', '%8.2f', '%4i', '%7.2f', '%7.2f', '%8.2f', '%7.2f', '%4i', '%7.2f', '%4i', '%4i', '%6i', '%6i', '%9.2f'] def get_temp_prof_s(gfs_ds, nom_time, amv_lons, amv_lats): gfs_fname, _, _ = gfs_ds.get_file(nom_time, window=180.0) if gfs_fname is None: return None gfs_xr = xr.open_dataset(gfs_fname) gfs_press = gfs_xr['pressure levels'] gfs_press = gfs_press.values temp_prof = get_vert_profile_s(gfs_xr, ['temperature'], amv_lons, amv_lats, method='linear') temp_prof = temp_prof.values temp_prof_s = temp_prof[0, :, :] return temp_prof_s, gfs_press def get_temp_prof_s_intrp(nom_time, amv_lons, amv_lats): gfs_fname_left, nom_time_left, gfs_fname_rght, nom_time_rght = get_bounding_gfs_files(nom_time) gfs_xr_left = xr.open_dataset(gfs_fname_left) gfs_xr_rght = xr.open_dataset(gfs_fname_rght) gfs_press = gfs_xr_left['pressure levels'].values temp_prof = get_time_interpolated_vert_profile([gfs_xr_left, gfs_xr_rght], [nom_time_left, nom_time_rght], 'temperature', nom_time, amv_lons, amv_lats) pt_s = get_time_interpolated_point_s([gfs_xr_left, gfs_xr_rght], [nom_time_left, nom_time_rght], ['surface pressure', 'surface temperature', 'surface height'], nom_time, amv_lons, amv_lats) return temp_prof, gfs_press, pt_s[0,], pt_s[1,], pt_s[2,] def compute_and_add_geo_hgt(path_to_amvs, path_to_gfs, amv_source, band='14', out_file=None): gfs_ds = get_datasource(path_to_gfs, 'GFS') amv_ds = get_datasource(path_to_amvs, amv_source, file_time_span=20, band=band) amv_filenames = amv_ds.flist alt_s = [] prs_s = [] for fname in amv_filenames: print('Start: ', fname) nom_time = amv_ds.get_datetime(fname).timestamp() amvs_nd = get_amv_nd(fname, delimiter=',') amvs_nd = filter_amvs(amvs_nd) num_amvs = amvs_nd.shape[0] amv_lons = amvs_nd[:, amv_lon_idx] amv_lats = amvs_nd[:, amv_lat_idx] temp_prof_s, gfs_press = get_temp_prof_s(gfs_ds, nom_time, amv_lons, amv_lats) # temp_prof_s, gfs_press, sfc_press, sfc_temp, sfc_hgt, = get_temp_prof_s_intrp(nom_time, amv_lons, amv_lats) alt_f = [] for k in range(num_amvs): amv_press = amvs_nd[k, amv_pres_idx] alt = pressure_to_altitude(amv_press, None, gfs_press, temp_prof_s[k, :]) # alt = pressure_to_altitude(amv_press, None, gfs_press, temp_prof_s[k, :], sfc_pres=sfc_press[k], sfc_temp=sfc_temp[k], sfc_elev=sfc_hgt[k]) alt_s.append(alt) alt_f.append(alt) prs_s.append(amv_press) alt_f = np.array(alt_f) alt_f = np.reshape(alt_f, [num_amvs, 1]) amvs_nd = np.hstack([amvs_nd, alt_f]) all_params = ['TARG', 'AMV Lat', 'AMV Lon', ' target box size', 'search box size', 'AMV speed', 'AMV direction', 'AMV Pressure', 'LLCM', 'guess speed', 'guess direction', 'TBALB', 'MAXC', 'TRKM', 'PERR', 'HAMD', 'QI no forecast', 'QI with forecast', 'Common QI', 'Height (meters)'] coords = {'num_params': all_params} dims = ['num_amvs', 'num_params'] amvs_da = xr.DataArray(amvs_nd, coords=coords, dims=dims) new_amv_fname = os.path.split(fname)[1] new_amv_fname = os.path.splitext(new_amv_fname)[0] + '.nc4' amvs_da.to_netcdf(new_amv_fname, format='NETCDF4') new_amv_fname = os.path.split(fname)[1] np.savetxt(new_amv_fname, amvs_nd, fmt=num_fmts, header=new_header, delimiter=',') print('Done...') alt_s = np.array(alt_s) prs_s = np.array(prs_s) if out_file is not None: np.save(out_file, (prs_s, alt_s)) else: return prs_s, alt_s