import datetime from datetime import timezone import glob import numpy as np import xarray as xr from util.util import value_to_index #import cartopy #from cartopy import * from metpy import * #import cartopy.crs as ccrs import random #gfs_directory = '/apollo/cloud/Ancil_Data/clavrx_ancil_data/dynamic/gfs/' gfs_directory = '/Users/tomrink/data/gfs/' gfs_date_format = '%y%m%d' lon360 = True lat_coords = np.linspace(-90, 90, 361) lon_coords = np.linspace(0, 359.5, 720) class MyGenericException(Exception): def __init__(self, message): self.message = message def get_time_tuple_utc(timestamp): dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc) return dt_obj, dt_obj.timetuple() def get_bounding_gfs_files(timestamp): dt_obj, time_tup = get_time_tuple_utc(timestamp) date_str = dt_obj.strftime(gfs_date_format) dt_obj0 = datetime.datetime.strptime(date_str, gfs_date_format).replace(tzinfo=timezone.utc) dt_obj1 = dt_obj0 + datetime.timedelta(days=1) date_str_1 = dt_obj1.strftime(gfs_date_format) flist_0 = glob.glob(gfs_directory+'gfs.'+date_str+'??_F012.h5') flist_1 = glob.glob(gfs_directory+'gfs.'+date_str_1+'00_F012.h5') if (len(flist_0) == 0) or (len(flist_1) == 0): return None, None, None, None filelist = flist_0 + flist_1 ftimes = [] for pname in filelist: # TODO: make better with regular expressions (someday) fname = os.path.split(pname)[1] toks = fname.split('.') tstr = toks[1].split('_')[0] dto = datetime.datetime.strptime(tstr, gfs_date_format+'%H').replace(tzinfo=timezone.utc) ftimes.append(dto.timestamp()) tarr = np.array(ftimes) sidxs = tarr.argsort() farr = np.array(filelist) farr = farr[sidxs] ftimes = tarr[sidxs] above = ftimes >= timestamp if not above.any(): return None, None, None, None tR = ftimes[above].min() below = ftimes <= timestamp if not below.any(): return None, None, None, None tL = ftimes[below].max() iL = np.searchsorted(ftimes, tL, 'left') iR = iL + 1 fList = farr.tolist() return fList[iL], ftimes[iL], fList[iR], ftimes[iR] def get_horz_layer(xr_dataset, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi): plevs = xr_dataset['pressure levels'] fld = xr_dataset[fld_name] if lon360: if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 x_lo = value_to_index(lon_coords, lon_lo) x_hi = value_to_index(lon_coords, lon_hi) y_lo = value_to_index(lat_coords, lat_lo) y_hi = value_to_index(lat_coords, lat_hi) p_idx = value_to_index(plevs.values, press) sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx] sub_fld = sub_fld.expand_dims('channel') sub_fld = sub_fld.assign_coords(channel=[fld_name], fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi]) return sub_fld def get_horz_layer_s(xr_dataset, fld_names, press, lon_lo=None, lon_hi=None, lat_lo=None, lat_hi=None): plevs = xr_dataset['pressure levels'] if lon360: if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 x_lo = value_to_index(lon_coords, lon_lo) x_hi = value_to_index(lon_coords, lon_hi) y_lo = value_to_index(lat_coords, lat_lo) y_hi = value_to_index(lat_coords, lat_hi) p_idx = value_to_index(plevs.values, press) sub_fld_s = [] for fld_name in fld_names: fld = xr_dataset[fld_name] sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx] sub_fld_s.append(sub_fld) sub_fld = xr.concat(sub_fld_s, 'channel') sub_fld = sub_fld.assign_coords(channel=fld_names, fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi]) return sub_fld def get_time_interpolated_layer(ds_0, ds_1, time_0, time_1, time, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi): lyr_0 = get_horz_layer(ds_0, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi) lyr_1 = get_horz_layer(ds_1, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi) lyr = xr.concat([lyr_0, lyr_1], 'time') lyr = lyr.assign_coords(time=[time_0, time_1]) intrp_lyr = lyr.interp(time=time, method='linear') intrp_lyr = intrp_lyr.expand_dims('channel') intrp_lyr = intrp_lyr.assign_coords(channel=[fld_name]) return intrp_lyr def get_time_interpolated_layer_s(ds_0, ds_1, time_0, time_1, time, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi): lyr_s_0 = get_horz_layer_s(ds_0, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi) lyr_s_1 = get_horz_layer_s(ds_1, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi) lyr = xr.concat([lyr_s_0, lyr_s_1], 'time') lyr = lyr.assign_coords(time=[time_0, time_1]) intrp_lyr = lyr.interp(time=time, method='linear') return intrp_lyr def get_vert_profile(xr_dataset, fld_name, lons, lats): if lon360: # convert -180/+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) plevs = xr_dataset['pressure levels'] fld = xr_dataset[fld_name] fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs) dim2 = xr.DataArray(lons, dims='k') dim1 = xr.DataArray(lats, dims='k') intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear') return intrp_fld def get_vert_profile_s(xr_dataset, fld_name_s, lons, lats): if lon360: # convert -180,+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) plevs = xr_dataset['pressure levels'] fld_s = [] for fld_name in fld_name_s: fld = xr_dataset[fld_name] fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs) fld_s.append(fld) fld = xr.concat(fld_s, 'fld_dim') dim2 = xr.DataArray(lons, dims='k') dim1 = xr.DataArray(lats, dims='k') intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear') return intrp_fld def get_point(xr_dataset, fld_name, lons, lats): if lon360: # convert -180/+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) # convert -180,180 to 0,360 lat_coords = np.linspace(-90, 90, xr_dataset.fakeDim1.size) lon_coords = np.linspace(0, 359.5, xr_dataset.fakeDim2.size) fld = xr_dataset[fld_name] fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords) dim1 = xr.DataArray(lats, dims='k') dim2 = xr.DataArray(lons, dims='k') intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, method='linear') return intrp_fld def get_time_interpolated_vert_profile(ds_0, ds_1, time0, time1, fld_name, time, lons, lats): prof_0 = get_vert_profile(ds_0, fld_name, lons, lats) prof_1 = get_vert_profile(ds_1, fld_name, lons, lats) prof = xr.concat([prof_0, prof_1], 'time') prof = prof.assign_coords(time=[time0, time1]) intrp_prof = prof.interp(time=time, method='linear') intrp_prof = intrp_prof.values return intrp_prof def get_time_interpolated_vert_profile_s(ds_0, ds_1, time0, time1, fld_name_s, time, lons, lats): prof_0 = get_vert_profile_s(ds_0, fld_name_s, lons, lats) prof_1 = get_vert_profile_s(ds_1, fld_name_s, lons, lats) prof = xr.concat([prof_0, prof_1], 'time') prof = prof.assign_coords(time=[time0, time1]) intrp_prof = prof.interp(time=time, method='linear') intrp_prof = intrp_prof.values return intrp_prof def get_time_interpolated_point(ds_0, ds_1, time0, time1, fld_name, time, lons, lats): vals_0 = get_point(ds_0, fld_name, lons, lats) vals_1 = get_point(ds_1, fld_name, lons, lats) vals = xr.concat([vals_0, vals_1], 'time') vals = vals.assign_coords(time=[time0, time1]) intrp_vals = vals.interp(time=time, method='linear') intrp_vals = intrp_vals.values return intrp_vals