gfs_reader.py 7.65 KiB
import datetime
from datetime import timezone
import glob
import numpy as np
import xarray as xr
from util.util import value_to_index
#import cartopy
#from cartopy import *
from metpy import *
#import cartopy.crs as ccrs
import random
#gfs_directory = '/apollo/cloud/Ancil_Data/clavrx_ancil_data/dynamic/gfs/'
gfs_directory = '/Users/tomrink/data/gfs/'
gfs_date_format = '%y%m%d'
lon360 = True
lat_coords = np.linspace(-90, 90, 361)
lon_coords = np.linspace(0, 359.5, 720)
class MyGenericException(Exception):
def __init__(self, message):
self.message = message
def get_time_tuple_utc(timestamp):
dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc)
return dt_obj, dt_obj.timetuple()
def get_bounding_gfs_files(timestamp):
dt_obj, time_tup = get_time_tuple_utc(timestamp)
date_str = dt_obj.strftime(gfs_date_format)
dt_obj0 = datetime.datetime.strptime(date_str, gfs_date_format).replace(tzinfo=timezone.utc)
dt_obj1 = dt_obj0 + datetime.timedelta(days=1)
date_str_1 = dt_obj1.strftime(gfs_date_format)
flist_0 = glob.glob(gfs_directory+'gfs.'+date_str+'??_F012.h5')
flist_1 = glob.glob(gfs_directory+'gfs.'+date_str_1+'00_F012.h5')
if (len(flist_0) == 0) or (len(flist_1) == 0):
return None, None, None, None
filelist = flist_0 + flist_1
ftimes = []
for pname in filelist: # TODO: make better with regular expressions (someday)
fname = os.path.split(pname)[1]
toks = fname.split('.')
tstr = toks[1].split('_')[0]
dto = datetime.datetime.strptime(tstr, gfs_date_format+'%H').replace(tzinfo=timezone.utc)
ftimes.append(dto.timestamp())
tarr = np.array(ftimes)
sidxs = tarr.argsort()
farr = np.array(filelist)
farr = farr[sidxs]
ftimes = tarr[sidxs]
above = ftimes >= timestamp
if not above.any():
return None, None, None, None
tR = ftimes[above].min()
below = ftimes <= timestamp
if not below.any():
return None, None, None, None
tL = ftimes[below].max()
iL = np.searchsorted(ftimes, tL, 'left')
iR = iL + 1
fList = farr.tolist()
return fList[iL], ftimes[iL], fList[iR], ftimes[iR]
def get_horz_layer(xr_dataset, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi):
plevs = xr_dataset['pressure levels']
fld = xr_dataset[fld_name]
if lon360:
if lon_lo < 0:
lon_lo += 360
if lon_hi < 0:
lon_hi += 360
x_lo = value_to_index(lon_coords, lon_lo)
x_hi = value_to_index(lon_coords, lon_hi)
y_lo = value_to_index(lat_coords, lat_lo)
y_hi = value_to_index(lat_coords, lat_hi)
p_idx = value_to_index(plevs.values, press)
sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx]
sub_fld = sub_fld.expand_dims('channel')
sub_fld = sub_fld.assign_coords(channel=[fld_name], fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi])
return sub_fld
def get_horz_layer_s(xr_dataset, fld_names, press, lon_lo=None, lon_hi=None, lat_lo=None, lat_hi=None):
plevs = xr_dataset['pressure levels']
if lon360:
if lon_lo < 0:
lon_lo += 360
if lon_hi < 0:
lon_hi += 360
x_lo = value_to_index(lon_coords, lon_lo)
x_hi = value_to_index(lon_coords, lon_hi)
y_lo = value_to_index(lat_coords, lat_lo)
y_hi = value_to_index(lat_coords, lat_hi)
p_idx = value_to_index(plevs.values, press)
sub_fld_s = []
for fld_name in fld_names:
fld = xr_dataset[fld_name]
sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx]
sub_fld_s.append(sub_fld)
sub_fld = xr.concat(sub_fld_s, 'channel')
sub_fld = sub_fld.assign_coords(channel=fld_names, fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi])
return sub_fld
def get_time_interpolated_layer(ds_0, ds_1, time_0, time_1, time, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi):
lyr_0 = get_horz_layer(ds_0, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi)
lyr_1 = get_horz_layer(ds_1, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi)
lyr = xr.concat([lyr_0, lyr_1], 'time')
lyr = lyr.assign_coords(time=[time_0, time_1])
intrp_lyr = lyr.interp(time=time, method='linear')
intrp_lyr = intrp_lyr.expand_dims('channel')
intrp_lyr = intrp_lyr.assign_coords(channel=[fld_name])
return intrp_lyr
def get_time_interpolated_layer_s(ds_0, ds_1, time_0, time_1, time, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi):
lyr_s_0 = get_horz_layer_s(ds_0, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi)
lyr_s_1 = get_horz_layer_s(ds_1, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi)
lyr = xr.concat([lyr_s_0, lyr_s_1], 'time')
lyr = lyr.assign_coords(time=[time_0, time_1])
intrp_lyr = lyr.interp(time=time, method='linear')
return intrp_lyr
def get_vert_profile(xr_dataset, fld_name, lons, lats):
if lon360: # convert -180/+180 to 0,360
lons = np.where(lons < 0, lons + 360, lons)
plevs = xr_dataset['pressure levels']
fld = xr_dataset[fld_name]
fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs)
dim2 = xr.DataArray(lons, dims='k')
dim1 = xr.DataArray(lats, dims='k')
intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear')
return intrp_fld
def get_vert_profile_s(xr_dataset, fld_name_s, lons, lats):
if lon360: # convert -180,+180 to 0,360
lons = np.where(lons < 0, lons + 360, lons)
plevs = xr_dataset['pressure levels']
fld_s = []
for fld_name in fld_name_s:
fld = xr_dataset[fld_name]
fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs)
fld_s.append(fld)
fld = xr.concat(fld_s, 'fld_dim')
dim2 = xr.DataArray(lons, dims='k')
dim1 = xr.DataArray(lats, dims='k')
intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear')
return intrp_fld
def get_point(xr_dataset, fld_name, lons, lats):
if lon360: # convert -180/+180 to 0,360
lons = np.where(lons < 0, lons + 360, lons) # convert -180,180 to 0,360
lat_coords = np.linspace(-90, 90, xr_dataset.fakeDim1.size)
lon_coords = np.linspace(0, 359.5, xr_dataset.fakeDim2.size)
fld = xr_dataset[fld_name]
fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords)
dim1 = xr.DataArray(lats, dims='k')
dim2 = xr.DataArray(lons, dims='k')
intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, method='linear')
return intrp_fld
def get_time_interpolated_vert_profile(ds_0, ds_1, time0, time1, fld_name, time, lons, lats):
prof_0 = get_vert_profile(ds_0, fld_name, lons, lats)
prof_1 = get_vert_profile(ds_1, fld_name, lons, lats)
prof = xr.concat([prof_0, prof_1], 'time')
prof = prof.assign_coords(time=[time0, time1])
intrp_prof = prof.interp(time=time, method='linear')
intrp_prof = intrp_prof.values
return intrp_prof
def get_time_interpolated_vert_profile_s(ds_0, ds_1, time0, time1, fld_name_s, time, lons, lats):
prof_0 = get_vert_profile_s(ds_0, fld_name_s, lons, lats)
prof_1 = get_vert_profile_s(ds_1, fld_name_s, lons, lats)
prof = xr.concat([prof_0, prof_1], 'time')
prof = prof.assign_coords(time=[time0, time1])
intrp_prof = prof.interp(time=time, method='linear')
intrp_prof = intrp_prof.values
return intrp_prof
def get_time_interpolated_point(ds_0, ds_1, time0, time1, fld_name, time, lons, lats):
vals_0 = get_point(ds_0, fld_name, lons, lats)
vals_1 = get_point(ds_1, fld_name, lons, lats)
vals = xr.concat([vals_0, vals_1], 'time')
vals = vals.assign_coords(time=[time0, time1])
intrp_vals = vals.interp(time=time, method='linear')
intrp_vals = intrp_vals.values
return intrp_vals