import datetime, os from datetime import timezone import glob import numpy as np import xarray as xr from netCDF4 import Dataset, Dimension, Variable from aeolus.geos_nav import GEOSNavigation from util.util import haversine_np import math amv_file_duration = 60 # minutes half_width = 20 # search box centered on AEOLUS profile (FGF coordinates) num_elems = 5424 num_lines = 5424 class MyGenericException(Exception): def __init__(self, message): self.message = message class AMVFiles: def __init__(self, files_path, file_time_span, pattern, band='14'): self.flist = glob.glob(files_path + pattern) self.band = band self.ftimes = [] for pname in self.flist: # TODO: make better with regular expressions (someday) dto = self.get_datetime(pname) dto_start = dto dto_end = dto + datetime.timedelta(minutes=file_time_span) self.ftimes.append((dto_start.timestamp(), dto_end.timestamp())) def get_datetime(self, pathname): pass def get_navigation(self): pass def get_file_containing_time(self, timestamp): k = -1 for i in range(len(self.ftimes)): if (timestamp >= self.ftimes[i][0]) and (timestamp < self.ftimes[i][1]): k = i break if k < 0: return None, None, None return self.flist[k], self.ftimes[k], k def get_parameters(self): pass def get_out_parameters(self): pass def get_meta_dict(self): pass class Framework(AMVFiles): def __init__(self, files_path, file_time_span, band='14'): super().__init__(files_path, file_time_span, '*WINDS_AMV_EN-'+band+'*.nc', band) def get_navigation(self): GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class OPS(AMVFiles): def __init__(self, files_path, file_time_span, band='14'): super().__init__(files_path, file_time_span, 'OR_ABI-L2-DMWF*'+'C'+band+'*.nc', band) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dtstr = dtstr[:-3] dto = datetime.datetime.strptime(dtstr, 's%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class CarrStereo(AMVFiles): def __init__(self, files_path, file_time_span, band='14'): super().__init__(files_path, file_time_span, 'tdw_qc_GOES*'+'ch_'+band+'.nc', band) self.elem_name = 'Element' self.line_name = 'Line' self.lon_name = 'Lon' self.lat_name = 'Lat' self.out_params = ['Lon', 'Lat', 'Element', 'Line', 'V_3D_u', 'V_3D_v', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen', 'InversionFlag', 'CloudPhase', 'CloudType'] self.params = ['V_3D', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen', 'InversionFlag', 'CloudPhase', 'CloudType'] self.meta_dict = {'H_3D': ('m', 'f4'), 'pres': ('hPa', 'f4'), 'Fcst_Spd': ('m s-1', 'f4'), 'Fcst_Dir': ('degree', 'f4'), 'SatZen': ('degree', 'f4'), 'InversionFlag': (None, 'u1'), 'CloudPhase': (None, 'u1'), 'CloudType': (None, 'u1'), 'V_3D_u': ('m s-1', 'f4'), 'V_3D_v': ('m s-1', 'f4'), 'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4')} def get_navigation(self): return GEOSNavigation(sub_lon=-137.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dto = datetime.datetime.strptime(dtstr, '%Y%j.%H%M.ch').replace(tzinfo=timezone.utc) return dto def get_parameters(self): return self.params def get_out_parameters(self): return self.out_params def get_meta_dict(self): return self.meta_dict # imports the S4 NOAA output # filename: full path as a string, '/home/user/filename' # returns a dict: time -> list of profiles (a profile is a list of levels) def get_aeolus_time_dict(filename, lon360=False, do_sort=True): time_dict = {} with open(filename) as file: while True: prof_hdr = file.readline() if not prof_hdr: break toks = prof_hdr.split() yr = int(float(toks[0])) mon = int(float(toks[1])) dy = int(float(toks[2])) hr = int(float(toks[3])) mn = int(float(toks[4])) ss = int(float(toks[5])) lon = float(toks[6]) lat = float(toks[7]) nlevs = int(toks[8]) if lon360: if lon < 0: lon += 360.0 else: if lon > 180.0: lon -= 360.0 dto = datetime.datetime(year=yr, month=mon, day=dy, hour=hr, minute=mn, second=ss) dto = dto.replace(tzinfo=timezone.utc) timestamp = dto.timestamp() prof = [] if time_dict.get(timestamp, -1) == -1: prof_s = [] prof_s.append(prof) time_dict[timestamp] = prof_s else: prof_s = time_dict.get(timestamp) prof_s.append(prof) for k in range(nlevs): line = file.readline() toks = line.split() lvlidx = int(toks[0]) hhh = float(toks[1]) * 1000.0 hht = float(toks[2]) * 1000.0 hhb = float(toks[3]) * 1000.0 err = float(toks[4]) azm = float(toks[5]) ws = float(toks[6]) len = float(toks[7]) tup = (lat, lon, hhh, hht, hhb, azm, ws) prof.append(tup) file.close() if do_sort: keys = np.array(list(time_dict.keys())) keys.sort() keys = keys.tolist() sorted_time_dict = {} for key in keys: sorted_time_dict[key] = time_dict.get(key) time_dict = sorted_time_dict return time_dict # make each profile at a timestamp a numpy array def time_dict_to_nd(time_dict): keys = list(time_dict.keys()) for key in keys: vals = time_dict[key] if vals is not None: nda = np.array(vals[0]) # only take one profile per second time_dict[key] = nda # for i in range(len(vals)): # nda = np.array(vals[i]) # vals[i] = nda return time_dict def concat(t_dct_0, t_dct_1): keys_0 = list(t_dct_0.keys()) nda_0 = np.array(keys_0) keys_1 = list(t_dct_1.keys()) nda_1 = np.array(keys_1) comm_keys, comm0, comm1 = np.intersect1d(nda_0, nda_1, return_indices=True) comm_keys = comm_keys.tolist() for key in comm_keys: t_dct_1.pop(key) t_dct_0.update(t_dct_1) return t_dct_0 def get_aeolus_time_dict_s(files_path, lon360=False, do_sort=True, chan='mie'): ftimes = [] fnames = glob.glob(files_path + chan + '1day.out.*') time_dct = {} for pathname in fnames: fname = os.path.split(pathname)[1] toks = fname.split('.') dstr = toks[2] dto = datetime.datetime.strptime(dstr, '%Y-%m-%d').replace(tzinfo=timezone.utc) ts = dto.timestamp() ftimes.append(ts) time_dct[ts] = pathname sorted_filenames = [] ftimes.sort() for t in ftimes: sorted_filenames.append(time_dct.get(t)) dct_s = [] for fname in sorted_filenames: a_dct = get_aeolus_time_dict(fname, lon360=lon360, do_sort=do_sort) # t_dct = time_dict_to_nd(a_dct) dct_s.append(a_dct) t_dct = dct_s[0] for dct in dct_s[1:]: concat(t_dct, dct) return t_dct def time_dict_to_cld_layers(time_dict): time_dict_layers = {} keys = list(time_dict.keys()) for key in keys: prof_s = time_dict[key] layers = [] prof = prof_s[0] if len(prof) == 1: tup = prof[0] layers.append((tup[0], tup[1], tup[3], tup[4])) time_dict_layers[key] = layers continue top = -9999.9 last_bot = -9999.9 tup = None for i in range(len(prof)): tup = prof[i] if i == 0: top = tup[3] bot = tup[4] last_bot = bot else: if math.fabs(last_bot - tup[3]) > 10.0: layers.append((tup[0], tup[1], top, last_bot)) top = tup[3] last_bot = tup[4] layers.append((tup[0], tup[1], top, tup[4])) time_dict_layers[key] = layers return time_dict_layers def time_dict_to_nd_2(time_dict): keys = list(time_dict.keys()) for key in keys: vals = time_dict[key] if vals is not None: time_dict[key] = np.stack(vals) return time_dict def get_cloud_layers_dict(filename, lon360=False): a_d = get_aeolus_time_dict(filename, lon360=lon360) c_d = time_dict_to_cld_layers(a_d) cld_lyr_dct = time_dict_to_nd_2(c_d) return cld_lyr_dct def run_amv_aeolus_best_fit(match_dict, aeolus_dict): keys = list(match_dict.keys()) for key in keys: profs = aeolus_dict.get(key) layers = profs[0] if layers is None: continue lat = layers[0, 0] lon = layers[0, 1] return None def get_search_box(nav, lon, lat): cc, ll = nav.earth_to_lc(lon, lat) if cc is None: return None, None c_rng = [cc - half_width, cc + half_width] l_rng = [ll - half_width, ll + half_width] if c_rng[0] < 0: c_rng[0] = 0 if l_rng[0] < 0: l_rng[0] = 0 if c_rng[1] >= num_elems: c_rng[1] = num_elems - 1 if l_rng[1] >= num_lines: l_rng[1] = num_lines - 1 return c_rng, l_rng # aeolus_dict: time -> profiles # amv_files_path: directory containing AMVs, '/home/user/amvdir/' # return dict: aeolus time -> tuple (amv_lon, amv_lat, amv_pres, amv_spd, amv_dir) def match_amvs_to_aeolus(aeolus_dict, amv_files_path, amv_source='OPS', band='14', amv_files=None): nav = amv_files.get_navigation() amv_params = amv_files.get_parameters() match_dict = {} keys = list(aeolus_dict.keys()) last_f_idx = -1 for key in keys: fname, ftime, f_idx = amv_files.get_file_containing_time(key) if f_idx is None: continue profs = aeolus_dict.get(key) if profs is None: continue layers = profs # if isinstance(profs, list): # layers = profs[0] lat = layers[0, 0] lon = layers[0, 1] c_rng, l_rng = get_search_box(nav, lon, lat) if c_rng is None: continue if f_idx != last_f_idx: last_f_idx = f_idx ds = Dataset(fname) amv_lons = ds[amv_files.lon_name][:] amv_lats = ds[amv_files.lat_name][:] cc = ds[amv_files.elem_name][:] ll = ds[amv_files.line_name][:] # cc, ll = nav.earth_to_lc_s(amv_lons, amv_lats) param_s = [] param_s.append(amv_lons) param_s.append(amv_lats) param_s.append(cc) param_s.append(ll) for param in amv_params: if param == 'V_3D': param_s.append(ds[param][:, 0]) param_s.append(ds[param][:, 1]) else: param_s.append(ds[param][:]) ds.close() in_cc = np.logical_and(cc > c_rng[0], cc < c_rng[1]) in_ll = np.logical_and(ll > l_rng[0], ll < l_rng[1]) in_box = np.logical_and(in_cc, in_ll) num_amvs = np.sum(in_box) if num_amvs == 0: continue # dist = haversine_np(lon, lat, amv_lons[in_box], amv_lats[in_box]) param_nd = np.vstack(param_s) param_nd = param_nd[:, in_box] match_dict[key] = param_nd return match_dict # full path as string filename to create, '/home/user/newfilename' # aeolus_to_amv_dct: output from match_amvs_to_aeolus # aeolus_dct: output from get_aeolus_time_dict # amv_files: container representing specific AMV product info def create_file(filename, aeolus_to_amv_dct, aeolus_dct, amv_files, cld_lyr=False): keys = list(aeolus_to_amv_dct.keys()) num_amvs = [] num_levs = [] times = [] namvs = 0 nlevs = 0 for key in keys: param_nd = aeolus_to_amv_dct.get(key) num_amvs.append(param_nd.shape[1]) namvs += param_nd.shape[1] prof_s = aeolus_dct.get(key) prof = prof_s # if isinstance(prof, list): # prof = prof_s[0] num_levs.append(prof.shape[0]) nlevs += prof.shape[0] times.append(key) amv_per_alus = len(aeolus_to_amv_dct) rootgrp = Dataset(filename, 'w', format='NETCDF4') dim_amvs = rootgrp.createDimension('amvs', size=namvs) dim_alus = rootgrp.createDimension('profs', size=nlevs) dim_num_aeolus_prof = rootgrp.createDimension('num_aeolus_profs', size=len(aeolus_to_amv_dct)) nc4_vars = [] out_params = amv_files.get_out_parameters() meta_dict = amv_files.get_meta_dict() for pidx, param in enumerate(out_params): u, t = meta_dict.get(param) var = rootgrp.createVariable(param, t, ['amvs']) if u is not None: var.units = u nc4_vars.append(var) dist = rootgrp.createVariable('dist_to_prof', 'f4', ['amvs']) dist.units = 'km' num_amvs_per_prof = rootgrp.createVariable('num_amvs_per_prof', 'i4', ['num_aeolus_profs']) num_levs_per_prof = rootgrp.createVariable('num_levs_per_prof', 'i4', ['num_aeolus_profs']) prof_time = rootgrp.createVariable('time', 'f4', ['num_aeolus_profs']) # ---- Profile variables --------------- prf_lon = rootgrp.createVariable('prof_longitude', 'f4', ['num_aeolus_profs']) prf_lon.units = 'degrees east' prf_lat = rootgrp.createVariable('prof_latitude', 'f4', ['num_aeolus_profs']) prf_lat.units = 'degrees north' prof_time.units = 'seconds since 1970-01-1 00:00:00' if not cld_lyr: prf_azm = rootgrp.createVariable('prof_azm', 'f4', ['profs']) prf_azm.units = 'degree' prf_spd = rootgrp.createVariable('prof_spd', 'f4', ['profs']) prf_spd.units = 'm s-1' prf_hht = rootgrp.createVariable('prof_hht', 'f4', ['profs']) prf_hht.units = 'meter' prf_hhb = rootgrp.createVariable('prof_hhb', 'f4', ['profs']) prf_hhb.units = 'meter' # -------------------------------------- i_a = 0 i_c = 0 for idx, key in enumerate(keys): namvs = num_amvs[idx] nlevs = num_levs[idx] i_b = i_a + namvs i_d = i_c + nlevs prof_s = aeolus_dct.get(key) prof = prof_s # if isinstance(prof_s, list): # prof = prof_s[0] if not cld_lyr: prf_hht[i_c:i_d] = prof[:, 3] prf_hhb[i_c:i_d] = prof[:, 4] prf_azm[i_c:i_d] = prof[:, 5] prf_spd[i_c:i_d] = prof[:, 6] else: prf_hht[i_c:i_d] = prof[:, 2] prf_hhb[i_c:i_d] = prof[:, 3] i_c += nlevs plat = prof[0, 0] plon = prof[0, 1] prf_lat[idx::] = plat prf_lon[idx::] = plon param_nd = aeolus_to_amv_dct.get(key) for pidx, param in enumerate(out_params): nc4_vars[pidx][i_a:i_b] = param_nd[pidx, :] dist[i_a:i_b] = haversine_np(plon, plat, param_nd[0, :], param_nd[1, :]) i_a += namvs num_amvs_per_prof[:] = num_amvs num_levs_per_prof[:] = num_levs prof_time[:] = times rootgrp.close() # aeolus_files_dir: S4 NOAA txt output files # amv_files_dir: G16/17 AMV product files # outfile: pathname for the Netcdf match file def create_amv_to_aeolus_match_file(aeolus_files_dir, amv_files_dir, outfile=None, amv_source='OPS', band='14', chan='mie'): a_d = get_aeolus_time_dict_s(aeolus_files_dir, chan=chan) # a_d = time_dict_to_nd(a_d) #---------------------------------- a_d = time_dict_to_cld_layers(a_d) a_d = time_dict_to_nd_2(a_d) amv_files = None if amv_source == 'CARR': amv_files = CarrStereo(amv_files_dir, 60, band) m_d = match_amvs_to_aeolus(a_d, amv_files_dir, amv_source, band, amv_files) if outfile is not None: cld_lyr = False if chan == 'mie': cld_lyr = True create_file(outfile, m_d, a_d, amv_files, cld_lyr=cld_lyr) # match_file: pathname for the product file # dt_str_0: start time (YYYY-MM-DD_HH:MM) # dt_str_1: end time (YYYY-MM-DD_HH:MM) # amv_var_names: list of amv parameters (see match_file ncdump) to subset # returns: Xarray.DataArray # amvs[nprofs, max_num_amvs_per_prof, num_of_params], profs[nprofs, max_num_levs_per_prof, num_of_params], # prof_locs[nprofs, (lon, lat) def subset_by_time(match_file, dt_str_0, dt_str_1, amv_var_names): rootgrp = Dataset(match_file, 'r', format='NETCDF4') all_dims = rootgrp.dimensions t_var = rootgrp['time'] n_profs = len(all_dims['num_aeolus_profs']) n_amvs_per_prof = rootgrp['num_amvs_per_prof'][:] n_levs_per_prof = rootgrp['num_levs_per_prof'][:] a_nc_vars = [] for vname in amv_var_names: a_nc_vars.append(rootgrp[vname]) nvars = len(a_nc_vars) p_lon_v = rootgrp['prof_longitude'] p_lat_v = rootgrp['prof_latitude'] p_azm_v = rootgrp['prof_azm'] p_spd_v = rootgrp['prof_spd'] p_hhb_v = rootgrp['prof_hhb'] p_hht_v = rootgrp['prof_hht'] dto = datetime.datetime.strptime(dt_str_0, '%Y-%m-%d_%H:%M').replace(tzinfo=timezone.utc) dto.replace(tzinfo=timezone.utc) t_0 = dto.timestamp() dto = datetime.datetime.strptime(dt_str_1, '%Y-%m-%d_%H:%M').replace(tzinfo=timezone.utc) dto.replace(tzinfo=timezone.utc) t_1 = dto.timestamp() if t_1 < t_0: t = t_0 t_1 = t_0 t_0 = t times = t_var[:] time_idxs = np.arange(n_profs) valid = np.logical_and(times >= t_0, times < t_1) time_idxs = time_idxs[valid] n_times = time_idxs.shape[0] lons = p_lon_v[:] lats = p_lat_v[:] prf_idx_start = np.sum(n_levs_per_prof[0:time_idxs[0]]) amv_idx_start = np.sum(n_amvs_per_prof[0:time_idxs[0]]) mx_namvs = np.max(n_amvs_per_prof[time_idxs[0]:time_idxs[0]+n_times]) mx_nlevs = np.max(n_levs_per_prof[time_idxs[0]:time_idxs[0]+n_times]) amvs = np.zeros((n_times, mx_namvs, nvars)) profs = np.zeros((n_times, mx_nlevs, 4)) amvs.fill(np.nan) profs.fill(np.nan) accum_prf = prf_idx_start accum_amv = amv_idx_start for idx, t_i in enumerate(time_idxs): n_amvs = n_amvs_per_prof[t_i] n_levs = n_levs_per_prof[t_i] a = accum_amv b = accum_amv + n_amvs c = accum_prf d = accum_prf + n_levs for k in range(nvars): amvs[idx, 0:n_amvs, k] = a_nc_vars[k][a:b] profs[idx, 0:n_levs, 0] = p_spd_v[c:d] profs[idx, 0:n_levs, 1] = p_azm_v[c:d] profs[idx, 0:n_levs, 2] = p_hhb_v[c:d] profs[idx, 0:n_levs, 3] = p_hht_v[c:d] accum_amv += n_amvs accum_prf += n_levs coords = {'num_profs': times[time_idxs], 'num_params': ['speed', 'azimuth', 'layer_bot', 'layer_top']} prof_da = xr.DataArray(profs, coords=coords, dims=['num_profs', 'max_num_levels', 'num_params']) coords = {'num_profs': times[time_idxs], 'num_params': amv_var_names} amvs_da = xr.DataArray(amvs, coords=coords, dims=['num_profs', 'max_num_amvs', 'num_params']) prof_locs_da = xr.DataArray(np.column_stack([lons[time_idxs], lats[time_idxs]]), coords=[times[time_idxs], ['longitude', 'latitude']], dims=['num_profs', 'space']) return prof_da, amvs_da, prof_locs_da