import datetime from datetime import timezone import glob import os import numpy as np import xarray as xr # from util.util import value_to_index, homedir from metpy.units import units # gfs_directory = '/ships22/cloud/Ancil_Data/clavrx_ancil_data/dynamic/gfs/' homedir = os.path.expanduser('~') + '/' gfs_directory = homedir+'data/gfs/' gfs_date_format = '%y%m%d' # force incoming longitude to (0, 360) to match GFS lon360 = True # GFS half degree resolution NX = 720 NY = 361 lat_coords = np.linspace(-90, 90, NY) lon_coords = np.linspace(0, 359.5, NX) plevs = np.array([10.0, 20.0, 30.0, 50.0, 70.0, 100.0, 150.0, 200.0, 250.0, 300.0, 350.0, 400.0, 450.0, 500.0, 550.0, 600.0, 650.0, 700.0, 750.0, 800.0, 850.0, 900.0, 925.0, 950.0, 975.0, 1000.0]) NZ = plevs.shape[0] class MyGenericException(Exception): def __init__(self, message): self.message = message # Return index of nda closest to value. nda must be 1d def value_to_index(nda, value): diff = np.abs(nda - value) idx = np.argmin(diff) return idx def get_timestamp(filename): toks = filename.split('.') tstr = toks[1].split('_')[0] dto = datetime.datetime.strptime(tstr, gfs_date_format + '%H').replace(tzinfo=timezone.utc) dto = dto + datetime.timedelta(hours=12) return dto.timestamp() def get_time_tuple_utc(timestamp): dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc) return dt_obj, dt_obj.timetuple() # def get_bounding_gfs_files(timestamp): # dt_obj, time_tup = get_time_tuple_utc(timestamp) # dt_obj = dt_obj + datetime.timedelta(hours=12) # date_str = dt_obj.strftime(gfs_date_format) # dt_obj = datetime.datetime.strptime(date_str, gfs_date_format).replace(tzinfo=timezone.utc) # # dt_obj_r = dt_obj + datetime.timedelta(days=1) # date_str_r = dt_obj_r.strftime(gfs_date_format) # # dt_obj_l = dt_obj - datetime.timedelta(days=1) # date_str_l = dt_obj_l.strftime(gfs_date_format) # # flist_l = glob.glob(gfs_directory+'gfs.'+date_str_l+'??_F012.h5') # flist = glob.glob(gfs_directory+'gfs.'+date_str+'??_F012.h5') # flist_r = glob.glob(gfs_directory+'gfs.'+date_str_r+'??_F012.h5') # filelist = flist_l + flist + flist_r # if len(filelist) == 0: # return None, None, None, None # # ftimes = [] # for pname in filelist: # TODO: make better with regular expressions (someday) # fname = os.path.split(pname)[1] # ftimes.append(get_timestamp(fname)) # # tarr = np.array(ftimes) # sidxs = tarr.argsort() # # farr = np.array(filelist) # farr = farr[sidxs] # ftimes = tarr[sidxs] # idxs = np.arange(len(filelist)) # # above = ftimes >= timestamp # if not above.any(): # return None, None, None, None # tR = ftimes[above].min() # # below = ftimes <= timestamp # if not below.any(): # return None, None, None, None # tL = ftimes[below].max() # # iL = idxs[below].max() # iR = iL + 1 # # fList = farr.tolist() # # return fList[iL], ftimes[iL], fList[iR], ftimes[iR] class GFSData: def __init__(self, filename, press_range=None, lon_range=None, lat_range=None): self.filename = filename self.dataset = None self.lon_range = self.lat_range = self.press_range = None self.x_lo = self.x_hi = self.y_lo = self.y_hi = self.z_lo = self.z_hi = None self.update(press_range=press_range, lon_range=lon_range, lat_range=lat_range) def __enter__(self): # Open the dataset and assign it to self.dataset self.dataset = xr.open_dataset(self.filename) return self def __exit__(self, exc_type, exc_value, exc_traceback): # Close the dataset before exiting self.dataset.close() def update(self, press_range=None, lon_range=None, lat_range=None): self._calc_indexes(press_range=press_range, lon_range=lon_range, lat_range=lat_range) def _calc_indexes(self, press_range=None, lon_range=None, lat_range=None): if lon_range is not None and lon_range != self.lon_range: self.lon_range = lon_range lon_lo = lon_range[0] lon_hi = lon_range[1] if lon360: # convert -180/+180 to 0,360 if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 self.x_lo = value_to_index(lon_coords, lon_lo) self.x_hi = value_to_index(lon_coords, lon_hi) elif self.lon_range is None: self.x_lo, self.x_hi = 0, NX if lat_range is not None and lat_range != self.lat_range: self.lat_range = lat_range lat_lo = lat_range[0] lat_hi = lat_range[1] self.y_lo = value_to_index(lat_coords, lat_lo) self.y_hi = value_to_index(lat_coords, lat_hi) elif self.lat_range is None: self.y_lo, self.y_hi = 0, NY if press_range is not None and press_range != self.press_range: self.press_range = press_range self.z_lo = value_to_index(plevs, press_range[0]) self.z_hi = value_to_index(plevs, press_range[1]) elif self.press_range is None: self.z_lo, self.z_hi = 0, NZ def get_volume(self, fld_name, unit_str): nda = self.dataset[fld_name].values sub_nda = nda[self.y_lo:self.y_hi, self.x_lo:self.x_hi, self.z_lo:self.z_hi] xra = xr.DataArray(sub_nda, dims=['Latitude', 'Longitude', 'Pressure'], coords={"Latitude": lat_coords[self.y_lo:self.y_hi], "Longitude": lon_coords[self.x_lo:self.x_hi], "Pressure": plevs[self.z_lo:self.z_hi]}, attrs={"description": fld_name, "units": unit_str}) return xra def volume_np_to_xr(self, nda, dims): xra = xr.DataArray(nda, dims=dims, coords={"Latitude": lat_coords[self.y_lo:self.y_hi], "Longitude": lon_coords[self.x_lo:self.x_hi], "Pressure": plevs[self.z_lo:self.z_hi]}) return xra def get_horz_layer(xr_dataset, fld_name, press, lon_range=None, lat_range=None): p_idx = value_to_index(plevs, press) x_lo, x_hi = 0, NX y_lo, y_hi = 0, NY if lon_range is not None: lon_lo = lon_range[0] lon_hi = lon_range[1] lat_lo = lat_range[0] lat_hi = lat_range[1] if lon360: if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 x_lo = value_to_index(lon_coords, lon_lo) x_hi = value_to_index(lon_coords, lon_hi) y_lo = value_to_index(lat_coords, lat_lo) y_hi = value_to_index(lat_coords, lat_hi) nda = xr_dataset[fld_name].values sub_nda = nda[y_lo:y_hi, x_lo:x_hi, p_idx] xra = xr.DataArray(sub_nda, dims=['latitude', 'longitude'], coords={"latitude": lat_coords[y_lo:y_hi], "longitude": lon_coords[x_lo:x_hi]}, attrs={"description": fld_name, "units": 'm/s'}) return xra def get_horz_layer_s(xr_dataset, fld_names, press, lon_range=None, lat_range=None): p_idx = value_to_index(plevs, press) x_lo = 0 x_hi = NX y_lo = 0 y_hi = NY if lon_range is not None: lon_lo = lon_range[0] lon_hi = lon_range[1] lat_lo = lat_range[0] lat_hi = lat_range[1] if lon360: if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 x_lo = value_to_index(lon_coords, lon_lo) x_hi = value_to_index(lon_coords, lon_hi) y_lo = value_to_index(lat_coords, lat_lo) y_hi = value_to_index(lat_coords, lat_hi) sub_fld_s = [] for fld_name in fld_names: fld = xr_dataset[fld_name] sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx] sub_fld_s.append(sub_fld) sub_fld = xr.concat(sub_fld_s, 'channel') sub_fld = sub_fld.assign_coords(channel=fld_names, fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi]) return sub_fld def get_time_interpolated_layer(xr_dataset_s, time_s, time, fld_name, press, lon_range=None, lat_range=None, method='linear'): layer_s = [] for ds in xr_dataset_s: lyr = get_horz_layer(ds, fld_name, press, lon_range=lon_range, lat_range=lat_range) layer_s.append(lyr) lyr = xr.concat(layer_s, 'time') lyr = lyr.assign_coords(time=time_s) intrp_lyr = lyr.interp(time=time, method=method) return intrp_lyr def get_time_interpolated_layer_s(xr_dataset_s, time_s, time, fld_name_s, press, lon_range=None, lat_range=None, method='linear'): layer_s = [] for ds in xr_dataset_s: lyr = get_horz_layer_s(ds, fld_name_s, press, lon_range=lon_range, lat_range=lat_range) layer_s.append(lyr) lyr = xr.concat(layer_s, 'time') lyr = lyr.assign_coords(time=time_s) intrp_lyr = lyr.interp(time=time, method='linear') return intrp_lyr def get_vert_profile(xr_dataset, fld_name, lons, lats, method='linear'): if lon360: # convert -180/+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) fld = xr_dataset[fld_name] fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs) dim2 = xr.DataArray(lons, dims='k') dim1 = xr.DataArray(lats, dims='k') intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method=method) return intrp_fld def get_vert_profile_s(xr_dataset, fld_name_s, lons, lats, method='linear'): if lon360: # convert -180,+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) fld_s = [] for fld_name in fld_name_s: fld = xr_dataset[fld_name] fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs) fld_s.append(fld) fld = xr.concat(fld_s, 'fld_dim') dim2 = xr.DataArray(lons, dims='k') dim1 = xr.DataArray(lats, dims='k') intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method=method) return intrp_fld def get_point(xr_dataset, fld_name, lons, lats, pres_s=None, method='nearest'): if lon360: # convert -180/+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) # convert -180,180 to 0,360 lat_coords = np.linspace(-90, 90, xr_dataset.fakeDim1.size) lon_coords = np.linspace(0, 359.5, xr_dataset.fakeDim2.size) fld = xr_dataset[fld_name] if pres_s is not None: fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs) else: fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords) dim1 = xr.DataArray(lats, dims='k') dim2 = xr.DataArray(lons, dims='k') if pres_s is not None: dim0 = xr.DataArray(pres_s, dims='k') intrp_fld = fld.interp(fakeDim0=dim0, fakeDim1=dim1, fakeDim2=dim2, method=method) else: intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, method=method) return intrp_fld def get_point_s(xr_dataset, fld_name_s, lons, lats, pres_s=None, method='nearest'): if lon360: # convert -180/+180 to 0,360 lons = np.where(lons < 0, lons + 360, lons) # convert -180,180 to 0,360 lat_coords = np.linspace(-90, 90, xr_dataset.fakeDim1.size) lon_coords = np.linspace(0, 359.5, xr_dataset.fakeDim2.size) fld_s = [] for fld_name in fld_name_s: fld = xr_dataset[fld_name] if pres_s is not None: fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs) else: fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords) fld_s.append(fld) fld = xr.concat(fld_s, 'fld_dim') dim1 = xr.DataArray(lats, dims='k') dim2 = xr.DataArray(lons, dims='k') if pres_s is not None: dim0 = xr.DataArray(pres_s, dims='k') intrp_fld = fld.interp(fakeDim0=dim0, fakeDim1=dim1, fakeDim2=dim2, method=method) else: intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, method=method) return intrp_fld def get_time_interpolated_vert_profile(xr_dataset_s, time_s, fld_name, time, lons, lats, method='linear'): prof_s = [] for ds in xr_dataset_s: vp = get_vert_profile(ds, fld_name, lons, lats, method=method) prof_s.append(vp) prof = xr.concat(prof_s, 'time') prof = prof.assign_coords(time=time_s) intrp_prof = prof.interp(time=time, method=method) intrp_prof = intrp_prof.values return intrp_prof def get_time_interpolated_vert_profile_s(xr_dataset_s, time_s, fld_name_s, time, lons, lats, method='linear'): prof_s = [] for ds in xr_dataset_s: vp = get_vert_profile_s(ds, fld_name_s, lons, lats, method=method) prof_s.append(vp) prof = xr.concat(prof_s, 'time') prof = prof.assign_coords(time=time_s) intrp_prof = prof.interp(time=time, method=method) intrp_prof = intrp_prof.values return intrp_prof def get_time_interpolated_point(ds_0, ds_1, time0, time1, fld_name, time, lons, lats, method='linear'): vals_0 = get_point(ds_0, fld_name, lons, lats) vals_1 = get_point(ds_1, fld_name, lons, lats) vals = xr.concat([vals_0, vals_1], 'time') vals = vals.assign_coords(time=[time0, time1]) intrp_vals = vals.interp(time=time, method=method) intrp_vals = intrp_vals.values return intrp_vals def get_time_interpolated_point_s(xr_dataset_s, time_s, fld_name_s, time, lons, lats, method='linear'): pt_s = [] for ds in xr_dataset_s: pt = get_point_s(ds, fld_name_s, lons, lats, method=method) pt_s.append(pt) pt = xr.concat(pt_s, 'time') pt = pt.assign_coords(time=time_s) intrp_pt = pt.interp(time=time, method=method) intrp_pt = intrp_pt.values return intrp_pt def get_time_interpolated_voxel(xr_dataset_s, time_s, time, fld_name, lon, lat, press, x_width=3, y_width=3, z_width=3, method='linear'): vox_s = [] for ds in xr_dataset_s: vox = get_voxel(ds, fld_name, lon, lat, press, x_width=x_width, y_width=y_width, z_width=z_width) vox_s.append(vox) vox = xr.concat(vox_s, 'time') vox = vox.assign_coords(time=time_s) intrp_vox = vox.interp(time=time, method=method) return intrp_vox def get_voxel(xr_dataset, fld_name, lon, lat, press, x_width=3, y_width=3, z_width=3): if lon360: if lon < 0: lon += 360 fld = xr_dataset[fld_name] p_c = value_to_index(plevs, press) x_c = value_to_index(lon_coords, lon) y_c = value_to_index(lat_coords, lat) y_h = int(y_width / 2) x_h = int(x_width / 2) p_h = int(z_width / 2) y_start = y_c - y_h x_start = x_c - x_h z_start = p_c - p_h if y_start < 0 or x_start < 0 or z_start < 0: return None y_stop = y_c + y_h + 1 x_stop = x_c + x_h + 1 z_stop = p_c + p_h + 1 if y_stop > NY-1 or x_stop > NX-1 or z_stop > NZ-1: return None sub_fld = fld[y_start:y_stop, x_start:x_stop, z_start:z_stop] sub_fld = sub_fld.expand_dims('channel') sub_fld = sub_fld.assign_coords(channel=[fld_name], fakeDim2=lon_coords[x_start:x_stop], fakeDim1=lat_coords[y_start:y_stop], fakeDim0=plevs[z_start:z_stop]) return sub_fld def get_time_interpolated_voxel_s(xr_dataset_s, time_s, time, fld_name_s, lon, lat, press, x_width=3, y_width=3, z_width=3, method='linear'): vox_s = [] for ds in xr_dataset_s: vox = get_voxel_s(ds, fld_name_s, lon, lat, press, x_width=x_width, y_width=y_width, z_width=z_width) vox_s.append(vox) vox = xr.concat(vox_s, 'time') vox = vox.assign_coords(time=time_s) intrp_vox = vox.interp(time=time, method=method) return intrp_vox def get_voxel_s(xr_dataset, fld_name_s, lon, lat, press, x_width=3, y_width=3, z_width=3): if lon360: if lon < 0: lon += 360 p_c = value_to_index(plevs, press) x_c = value_to_index(lon_coords, lon) y_c = value_to_index(lat_coords, lat) y_h = int(y_width / 2) x_h = int(x_width / 2) p_h = int(z_width / 2) y_start = y_c - y_h x_start = x_c - x_h z_start = p_c - p_h if y_start < 0 or x_start < 0 or z_start < 0: return None y_stop = y_c + y_h + 1 x_stop = x_c + x_h + 1 z_stop = p_c + p_h + 1 if y_stop > NY-1 or x_stop > NX-1 or z_stop > NZ-1: return None sub_fld_s = [] for name in fld_name_s: fld = xr_dataset[name] sub_fld = fld[y_start:y_stop, x_start:x_stop, z_start:z_stop] sub_fld_s.append(sub_fld) sub_fld = xr.concat(sub_fld_s, 'channel') sub_fld = sub_fld.assign_coords(channel=fld_name_s, fakeDim2=lon_coords[x_start:x_stop], fakeDim1=lat_coords[y_start:y_stop], fakeDim0=plevs[z_start:z_stop]) return sub_fld def get_volume(xr_dataset, fld_name, unit_str, press_range=None, lon_range=None, lat_range=None): x_lo, x_hi = 0, NX y_lo, y_hi = 0, NY z_lo, z_hi = 0, NZ if lon_range is not None: lon_lo = lon_range[0] lon_hi = lon_range[1] if lon360: if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 x_lo = value_to_index(lon_coords, lon_lo) x_hi = value_to_index(lon_coords, lon_hi) if lat_range is not None: lat_lo = lat_range[0] lat_hi = lat_range[1] y_lo = value_to_index(lat_coords, lat_lo) y_hi = value_to_index(lat_coords, lat_hi) if press_range is not None: z_lo = value_to_index(plevs, press_range[0]) z_hi = value_to_index(plevs, press_range[1]) nda = self.dataset[fld_name].values sub_nda = nda[y_lo:y_hi, x_lo:x_hi, z_lo:z_hi] xra = xr.DataArray(sub_nda, dims=['Latitude', 'Longitude', 'Pressure'], coords={"Latitude": lat_coords[y_lo:y_hi], "Longitude": lon_coords[x_lo:x_hi], "Pressure": plevs[z_lo:z_hi]}, attrs={"description": fld_name, "units": unit_str}) return xra def volume_np_to_xr(nda, dims, press_range=None, lon_range=None, lat_range=None): x_lo, x_hi = 0, NX y_lo, y_hi = 0, NY z_lo, z_hi = 0, NZ if lon_range is not None: lon_lo = lon_range[0] lon_hi = lon_range[1] if lon360: if lon_lo < 0: lon_lo += 360 if lon_hi < 0: lon_hi += 360 x_lo = value_to_index(lon_coords, lon_lo) x_hi = value_to_index(lon_coords, lon_hi) if lat_range is not None: lat_lo = lat_range[0] lat_hi = lat_range[1] y_lo = value_to_index(lat_coords, lat_lo) y_hi = value_to_index(lat_coords, lat_hi) if press_range is not None: z_lo = value_to_index(plevs, press_range[0]) z_hi = value_to_index(plevs, press_range[1]) xra = xr.DataArray(nda, dims=dims, coords={"Latitude": lat_coords[y_lo:y_hi], "Longitude": lon_coords[x_lo:x_hi], "Pressure": plevs[z_lo:z_hi]}) return xra