aeolus_amv.py 19.78 KiB
import datetime, os
from datetime import timezone
import glob
import numpy as np
import xarray as xr
from netCDF4 import Dataset, Dimension, Variable
from aeolus.geos_nav import GEOSNavigation
from util.util import haversine_np
import math
amv_file_duration = 60 # minutes
half_width = 20 # search box centered on AEOLUS profile (FGF coordinates)
num_elems = 5424
num_lines = 5424
class MyGenericException(Exception):
def __init__(self, message):
self.message = message
class AMVFiles:
def __init__(self, files_path, file_time_span, pattern, band='14'):
self.flist = glob.glob(files_path + pattern)
self.band = band
self.ftimes = []
for pname in self.flist: # TODO: make better with regular expressions (someday)
dto = self.get_datetime(pname)
dto_start = dto
dto_end = dto + datetime.timedelta(minutes=file_time_span)
self.ftimes.append((dto_start.timestamp(), dto_end.timestamp()))
def get_datetime(self, pathname):
pass
def get_navigation(self):
pass
def get_file_containing_time(self, timestamp):
k = -1
for i in range(len(self.ftimes)):
if (timestamp >= self.ftimes[i][0]) and (timestamp < self.ftimes[i][1]):
k = i
break
if k < 0:
return None, None, None
return self.flist[k], self.ftimes[k], k
def get_parameters(self):
pass
def get_out_parameters(self):
pass
def get_meta_dict(self):
pass
class Framework(AMVFiles):
def __init__(self, files_path, file_time_span, band='14'):
super().__init__(files_path, file_time_span, '*WINDS_AMV_EN-'+band+'*.nc', band)
def get_navigation(self):
GEOSNavigation(sub_lon=-75.0)
def get_datetime(self, pathname):
fname = os.path.split(pathname)[1]
toks = fname.split('_')
dstr = toks[4]
tstr = toks[5]
dtstr = dstr + tstr
dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc)
return dto
class OPS(AMVFiles):
def __init__(self, files_path, file_time_span, band='14'):
super().__init__(files_path, file_time_span, 'OR_ABI-L2-DMWF*'+'C'+band+'*.nc', band)
def get_navigation(self):
return GEOSNavigation(sub_lon=-75.0)
def get_datetime(self, pathname):
fname = os.path.split(pathname)[1]
toks = fname.split('_')
dtstr = toks[3]
dtstr = dtstr[:-3]
dto = datetime.datetime.strptime(dtstr, 's%Y%j%H%M').replace(tzinfo=timezone.utc)
return dto
class CarrStereo(AMVFiles):
def __init__(self, files_path, file_time_span, band='14'):
super().__init__(files_path, file_time_span, 'tdw_qc_GOES*'+'ch_'+band+'.nc', band)
self.elem_name = 'Element'
self.line_name = 'Line'
self.lon_name = 'Lon'
self.lat_name = 'Lat'
self.out_params = ['Lon', 'Lat', 'Element', 'Line', 'V_3D_u', 'V_3D_v', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen',
'InversionFlag', 'CloudPhase', 'CloudType']
self.params = ['V_3D', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen',
'InversionFlag', 'CloudPhase', 'CloudType']
self.meta_dict = {'H_3D': ('m', 'f4'), 'pres': ('hPa', 'f4'), 'Fcst_Spd': ('m s-1', 'f4'),
'Fcst_Dir': ('degree', 'f4'),
'SatZen': ('degree', 'f4'), 'InversionFlag': (None, 'u1'),
'CloudPhase': (None, 'u1'), 'CloudType': (None, 'u1'),
'V_3D_u': ('m s-1', 'f4'), 'V_3D_v': ('m s-1', 'f4'), 'Lon': ('degrees east', 'f4'),
'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4')}
def get_navigation(self):
return GEOSNavigation(sub_lon=-137.0)
def get_datetime(self, pathname):
fname = os.path.split(pathname)[1]
toks = fname.split('_')
dtstr = toks[3]
dto = datetime.datetime.strptime(dtstr, '%Y%j.%H%M.ch').replace(tzinfo=timezone.utc)
return dto
def get_parameters(self):
return self.params
def get_out_parameters(self):
return self.out_params
def get_meta_dict(self):
return self.meta_dict
# imports the S4 NOAA output
# filename: full path as a string, '/home/user/filename'
# returns a dict: time -> list of profiles (a profile is a list of levels)
def get_aeolus_time_dict(filename, lon360=False, do_sort=True):
time_dict = {}
with open(filename) as file:
while True:
prof_hdr = file.readline()
if not prof_hdr:
break
toks = prof_hdr.split()
yr = int(float(toks[0]))
mon = int(float(toks[1]))
dy = int(float(toks[2]))
hr = int(float(toks[3]))
mn = int(float(toks[4]))
ss = int(float(toks[5]))
lon = float(toks[6])
lat = float(toks[7])
nlevs = int(toks[8])
if lon360:
if lon < 0:
lon += 360.0
else:
if lon > 180.0:
lon -= 360.0
dto = datetime.datetime(year=yr, month=mon, day=dy, hour=hr, minute=mn, second=ss)
dto = dto.replace(tzinfo=timezone.utc)
timestamp = dto.timestamp()
prof = []
if time_dict.get(timestamp, -1) == -1:
prof_s = []
prof_s.append(prof)
time_dict[timestamp] = prof_s
else:
prof_s = time_dict.get(timestamp)
prof_s.append(prof)
for k in range(nlevs):
line = file.readline()
toks = line.split()
lvlidx = int(toks[0])
hhh = float(toks[1]) * 1000.0
hht = float(toks[2]) * 1000.0
hhb = float(toks[3]) * 1000.0
err = float(toks[4])
azm = float(toks[5])
ws = float(toks[6])
len = float(toks[7])
tup = (lat, lon, hhh, hht, hhb, azm, ws)
prof.append(tup)
file.close()
if do_sort:
keys = np.array(list(time_dict.keys()))
keys.sort()
keys = keys.tolist()
sorted_time_dict = {}
for key in keys:
sorted_time_dict[key] = time_dict.get(key)
time_dict = sorted_time_dict
return time_dict
# make each profile at a timestamp a numpy array
def time_dict_to_nd(time_dict):
keys = list(time_dict.keys())
for key in keys:
vals = time_dict[key]
if vals is not None:
for i in range(len(vals)):
nda = np.array(vals[i])
vals[i] = nda
return time_dict
def concat(t_dct_0, t_dct_1):
keys_0 = list(t_dct_0.keys())
nda_0 = np.array(keys_0)
keys_1 = list(t_dct_1.keys())
nda_1 = np.array(keys_1)
comm_keys, comm0, comm1 = np.intersect1d(nda_0, nda_1, return_indices=True)
comm_keys = comm_keys.tolist()
for key in comm_keys:
t_dct_1.pop(key)
t_dct_0.update(t_dct_1)
return t_dct_0
def get_aeolus_time_dict_s(files_path, lon360=False, do_sort=True, chan='mie'):
ftimes = []
fnames = glob.glob(files_path + chan + '1day.out.*')
time_dct = {}
for pathname in fnames:
fname = os.path.split(pathname)[1]
toks = fname.split('.')
dstr = toks[2]
dto = datetime.datetime.strptime(dstr, '%Y-%m-%d').replace(tzinfo=timezone.utc)
ts = dto.timestamp()
ftimes.append(ts)
time_dct[ts] = pathname
sorted_filenames = []
ftimes.sort()
for t in ftimes:
sorted_filenames.append(time_dct.get(t))
dct_s = []
for fname in sorted_filenames:
a_dct = get_aeolus_time_dict(fname, lon360=lon360, do_sort=do_sort)
# t_dct = time_dict_to_nd(a_dct)
dct_s.append(a_dct)
t_dct = dct_s[0]
for dct in dct_s[1:]:
concat(t_dct, dct)
return t_dct
def time_dict_to_cld_layers(time_dict):
time_dict_layers = {}
keys = list(time_dict.keys())
for key in keys:
prof_s = time_dict[key]
layers = []
prof = prof_s[0]
if len(prof) == 1:
tup = prof[0]
layers.append((tup[0], tup[1], tup[3], tup[4]))
time_dict_layers[key] = layers
continue
top = -9999.9
last_bot = -9999.9
tup = None
for i in range(len(prof)):
tup = prof[i]
if i == 0:
top = tup[3]
bot = tup[4]
last_bot = bot
else:
if math.fabs(last_bot - tup[3]) > 10.0:
layers.append((tup[0], tup[1], top, last_bot))
top = tup[3]
last_bot = tup[4]
layers.append((tup[0], tup[1], top, tup[4]))
time_dict_layers[key] = layers
return time_dict_layers
def time_dict_to_nd_2(time_dict):
keys = list(time_dict.keys())
for key in keys:
vals = time_dict[key]
if vals is not None:
time_dict[key] = np.stack(vals)
return time_dict
def get_cloud_layers_dict(filename, lon360=False):
a_d = get_aeolus_time_dict(filename, lon360=lon360)
c_d = time_dict_to_cld_layers(a_d)
cld_lyr_dct = time_dict_to_nd_2(c_d)
return cld_lyr_dct
def run_amv_aeolus_best_fit(match_dict, aeolus_dict):
keys = list(match_dict.keys())
for key in keys:
profs = aeolus_dict.get(key)
layers = profs[0]
if layers is None:
continue
lat = layers[0, 0]
lon = layers[0, 1]
return None
def get_search_box(nav, lon, lat):
cc, ll = nav.earth_to_lc(lon, lat)
if cc is None:
return None, None
c_rng = [cc - half_width, cc + half_width]
l_rng = [ll - half_width, ll + half_width]
if c_rng[0] < 0:
c_rng[0] = 0
if l_rng[0] < 0:
l_rng[0] = 0
if c_rng[1] >= num_elems:
c_rng[1] = num_elems - 1
if l_rng[1] >= num_lines:
l_rng[1] = num_lines - 1
return c_rng, l_rng
# aeolus_dict: time -> profiles
# amv_files_path: directory containing AMVs, '/home/user/amvdir/'
# return dict: aeolus time -> tuple (amv_lon, amv_lat, amv_pres, amv_spd, amv_dir)
def match_amvs_to_aeolus(aeolus_dict, amv_files_path, amv_source='OPS', band='14', amv_files=None):
nav = amv_files.get_navigation()
amv_params = amv_files.get_parameters()
match_dict = {}
keys = list(aeolus_dict.keys())
last_f_idx = -1
for key in keys:
fname, ftime, f_idx = amv_files.get_file_containing_time(key)
if f_idx is None:
continue
profs = aeolus_dict.get(key)
if profs is None:
continue
layers = profs
if isinstance(profs, list):
layers = profs[0]
lat = layers[0, 0]
lon = layers[0, 1]
c_rng, l_rng = get_search_box(nav, lon, lat)
if c_rng is None:
continue
if f_idx != last_f_idx:
last_f_idx = f_idx
ds = Dataset(fname)
amv_lons = ds[amv_files.lon_name][:]
amv_lats = ds[amv_files.lat_name][:]
cc = ds[amv_files.elem_name][:]
ll = ds[amv_files.line_name][:]
# cc, ll = nav.earth_to_lc_s(amv_lons, amv_lats)
param_s = []
param_s.append(amv_lons)
param_s.append(amv_lats)
param_s.append(cc)
param_s.append(ll)
for param in amv_params:
if param == 'V_3D':
param_s.append(ds[param][:, 0])
param_s.append(ds[param][:, 1])
else:
param_s.append(ds[param][:])
ds.close()
in_cc = np.logical_and(cc > c_rng[0], cc < c_rng[1])
in_ll = np.logical_and(ll > l_rng[0], ll < l_rng[1])
in_box = np.logical_and(in_cc, in_ll)
num_amvs = np.sum(in_box)
if num_amvs == 0:
continue
# dist = haversine_np(lon, lat, amv_lons[in_box], amv_lats[in_box])
param_nd = np.vstack(param_s)
param_nd = param_nd[:, in_box]
match_dict[key] = param_nd
return match_dict
# full path as string filename to create, '/home/user/newfilename'
# aeolus_to_amv_dct: output from match_amvs_to_aeolus
# aeolus_dct: output from get_aeolus_time_dict
# amv_files: container representing specific AMV product info
def create_file(filename, aeolus_to_amv_dct, aeolus_dct, amv_files, cld_lyr=False):
keys = list(aeolus_to_amv_dct.keys())
num_amvs = []
num_levs = []
times = []
namvs = 0
nlevs = 0
for key in keys:
param_nd = aeolus_to_amv_dct.get(key)
num_amvs.append(param_nd.shape[1])
namvs += param_nd.shape[1]
prof_s = aeolus_dct.get(key)
prof = prof_s[0]
num_levs.append(prof.shape[0])
nlevs += prof.shape[0]
times.append(key)
amv_per_alus = len(aeolus_to_amv_dct)
rootgrp = Dataset(filename, 'w', format='NETCDF4')
dim_amvs = rootgrp.createDimension('amvs', size=namvs)
dim_alus = rootgrp.createDimension('profs', size=nlevs)
dim_num_aeolus_prof = rootgrp.createDimension('num_aeolus_profs', size=len(aeolus_to_amv_dct))
nc4_vars = []
out_params = amv_files.get_out_parameters()
meta_dict = amv_files.get_meta_dict()
for pidx, param in enumerate(out_params):
u, t = meta_dict.get(param)
var = rootgrp.createVariable(param, t, ['amvs'])
if u is not None:
var.units = u
nc4_vars.append(var)
dist = rootgrp.createVariable('dist_to_prof', 'f4', ['amvs'])
dist.units = 'km'
num_amvs_per_prof = rootgrp.createVariable('num_amvs_per_prof', 'i4', ['num_aeolus_profs'])
num_levs_per_prof = rootgrp.createVariable('num_levs_per_prof', 'i4', ['num_aeolus_profs'])
prof_time = rootgrp.createVariable('time', 'f4', ['num_aeolus_profs'])
# ---- Profile variables ---------------
prf_lon = rootgrp.createVariable('prof_longitude', 'f4', ['num_aeolus_profs'])
prf_lon.units = 'degrees east'
prf_lat = rootgrp.createVariable('prof_latitude', 'f4', ['num_aeolus_profs'])
prf_lat.units = 'degrees north'
prof_time.units = 'seconds since 1970-01-1 00:00:00'
if not cld_lyr:
prf_azm = rootgrp.createVariable('prof_azm', 'f4', ['profs'])
prf_azm.units = 'degree'
prf_spd = rootgrp.createVariable('prof_spd', 'f4', ['profs'])
prf_spd.units = 'm s-1'
prf_hht = rootgrp.createVariable('prof_hht', 'f4', ['profs'])
prf_hht.units = 'meter'
prf_hhb = rootgrp.createVariable('prof_hhb', 'f4', ['profs'])
prf_hhb.units = 'meter'
# --------------------------------------
i_a = 0
i_c = 0
for idx, key in enumerate(keys):
namvs = num_amvs[idx]
nlevs = num_levs[idx]
i_b = i_a + namvs
i_d = i_c + nlevs
prof_s = aeolus_dct.get(key)
prof = prof_s
if isinstance(prof_s, list):
prof = prof_s[0]
if not cld_lyr:
prf_hht[i_c:i_d] = prof[:, 3]
prf_hhb[i_c:i_d] = prof[:, 4]
prf_azm[i_c:i_d] = prof[:, 5]
prf_spd[i_c:i_d] = prof[:, 6]
else:
prf_hht[i_c:i_d] = prof[:, 2]
prf_hhb[i_c:i_d] = prof[:, 3]
i_c += nlevs
plat = prof[0, 0]
plon = prof[0, 1]
prf_lat[idx::] = plat
prf_lon[idx::] = plon
param_nd = aeolus_to_amv_dct.get(key)
for pidx, param in enumerate(out_params):
nc4_vars[pidx][i_a:i_b] = param_nd[pidx, :]
dist[i_a:i_b] = haversine_np(plon, plat, param_nd[0, :], param_nd[1, :])
i_a += namvs
num_amvs_per_prof[:] = num_amvs
num_levs_per_prof[:] = num_levs
prof_time[:] = times
rootgrp.close()
# aeolus_files_dir: S4 NOAA txt output files
# amv_files_dir: G16/17 AMV product files
# outfile: pathname for the Netcdf match file
def create_amv_to_aeolus_match_file(aeolus_files_dir, amv_files_dir, outfile=None, amv_source='OPS', band='14', chan='mie'):
a_d = get_aeolus_time_dict_s(aeolus_files_dir, chan=chan)
# a_d = time_dict_to_nd(a_d)
#----------------------------------
a_d = time_dict_to_cld_layers(a_d)
a_d = time_dict_to_nd_2(a_d)
amv_files = None
if amv_source == 'CARR':
amv_files = CarrStereo(amv_files_dir, 60, band)
m_d = match_amvs_to_aeolus(a_d, amv_files_dir, amv_source, band, amv_files)
if outfile is not None:
cld_lyr = False
if chan == 'mie':
cld_lyr = True
create_file(outfile, m_d, a_d, amv_files, cld_lyr=cld_lyr)
# match_file: pathname for the product file
# dt_str_0: start time (YYYY-MM-DD_HH:MM)
# dt_str_1: end time (YYYY-MM-DD_HH:MM)
# amv_var_names: list of amv parameters (see match_file ncdump) to subset
# returns: Xarray.DataArray
# amvs[nprofs, max_num_amvs_per_prof, num_of_params], profs[nprofs, max_num_levs_per_prof, num_of_params],
# prof_locs[nprofs, (lon, lat)
def subset_by_time(match_file, dt_str_0, dt_str_1, amv_var_names):
rootgrp = Dataset(match_file, 'r', format='NETCDF4')
all_dims = rootgrp.dimensions
t_var = rootgrp['time']
n_profs = len(all_dims['num_aeolus_profs'])
n_amvs_per_prof = rootgrp['num_amvs_per_prof'][:]
n_levs_per_prof = rootgrp['num_levs_per_prof'][:]
a_nc_vars = []
for vname in amv_var_names:
a_nc_vars.append(rootgrp[vname])
nvars = len(a_nc_vars)
p_lon_v = rootgrp['prof_longitude']
p_lat_v = rootgrp['prof_latitude']
p_azm_v = rootgrp['prof_azm']
p_spd_v = rootgrp['prof_spd']
p_hhb_v = rootgrp['prof_hhb']
p_hht_v = rootgrp['prof_hht']
dto = datetime.datetime.strptime(dt_str_0, '%Y-%m-%d_%H:%M').replace(tzinfo=timezone.utc)
dto.replace(tzinfo=timezone.utc)
t_0 = dto.timestamp()
dto = datetime.datetime.strptime(dt_str_1, '%Y-%m-%d_%H:%M').replace(tzinfo=timezone.utc)
dto.replace(tzinfo=timezone.utc)
t_1 = dto.timestamp()
if t_1 < t_0:
t = t_0
t_1 = t_0
t_0 = t
times = t_var[:]
time_idxs = np.arange(n_profs)
valid = np.logical_and(times >= t_0, times < t_1)
time_idxs = time_idxs[valid]
n_times = time_idxs.shape[0]
lons = p_lon_v[:]
lats = p_lat_v[:]
prf_idx_start = np.sum(n_levs_per_prof[0:time_idxs[0]])
amv_idx_start = np.sum(n_amvs_per_prof[0:time_idxs[0]])
mx_namvs = np.max(n_amvs_per_prof[time_idxs[0]:time_idxs[0]+n_times])
mx_nlevs = np.max(n_levs_per_prof[time_idxs[0]:time_idxs[0]+n_times])
amvs = np.zeros((n_times, mx_namvs, nvars))
profs = np.zeros((n_times, mx_nlevs, 4))
amvs.fill(np.nan)
profs.fill(np.nan)
accum_prf = prf_idx_start
accum_amv = amv_idx_start
for idx, t_i in enumerate(time_idxs):
n_amvs = n_amvs_per_prof[t_i]
n_levs = n_levs_per_prof[t_i]
a = accum_amv
b = accum_amv + n_amvs
c = accum_prf
d = accum_prf + n_levs
for k in range(nvars):
amvs[idx, 0:n_amvs, k] = a_nc_vars[k][a:b]
profs[idx, 0:n_levs, 0] = p_spd_v[c:d]
profs[idx, 0:n_levs, 1] = p_azm_v[c:d]
profs[idx, 0:n_levs, 2] = p_hhb_v[c:d]
profs[idx, 0:n_levs, 3] = p_hht_v[c:d]
accum_amv += n_amvs
accum_prf += n_levs
coords = {'num_profs': times[time_idxs], 'num_params': ['speed', 'azimuth', 'layer_bot', 'layer_top']}
prof_da = xr.DataArray(profs, coords=coords, dims=['num_profs', 'max_num_levels', 'num_params'])
coords = {'num_profs': times[time_idxs], 'num_params': amv_var_names}
amvs_da = xr.DataArray(amvs, coords=coords, dims=['num_profs', 'max_num_amvs', 'num_params'])
prof_locs_da = xr.DataArray(np.column_stack([lons[time_idxs], lats[time_idxs]]),
coords=[times[time_idxs], ['longitude', 'latitude']],
dims=['num_profs', 'space'])
return prof_da, amvs_da, prof_locs_da