Skip to content
Snippets Groups Projects
Select Git revision
  • 6b9a47f3c25cdaff7b3992a17068c8b1d074dcba
  • master default protected
  • patch-1
3 results

gfs_reader.py

Blame
  • Forked from Tom Rink / python
    3659 commits behind the upstream repository.
    user avatar
    tomrink authored
    6b9a47f3
    History
    gfs_reader.py 7.15 KiB
    import datetime
    from datetime import timezone
    import glob
    import numpy as np
    import xarray as xr
    from util.util import value_to_index
    #import cartopy
    #from cartopy import *
    from metpy import *
    #import cartopy.crs as ccrs
    import random
    
    gfs_directory = '/apollo/cloud/Ancil_Data/clavrx_ancil_data/dynamic/gfs/'
    gfs_date_format = '%y%m%d'
    
    lon360 = True
    lat_coords = np.linspace(-90, 90, 361)
    lon_coords = np.linspace(0, 359.5, 720)
    
    
    class MyGenericException(Exception):
        def __init__(self, message):
            self.message = message
    
    
    def get_time_tuple_utc(timestamp):
        dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc)
        return dt_obj, dt_obj.timetuple()
    
    
    def get_bounding_gfs_files(timestamp):
        dt_obj, time_tup = get_time_tuple_utc(timestamp)
        date_str = dt_obj.strftime(gfs_date_format)
        dt_obj0 = datetime.datetime.strptime(date_str, gfs_date_format).replace(tzinfo=timezone.utc)
        dt_obj1 = dt_obj0 + datetime.timedelta(days=1)
        date_str_1 = dt_obj1.strftime(gfs_date_format)
    
        flist_0 = glob.glob(gfs_directory+'gfs.'+date_str+'??_F012.hdf')
        flist_1 = glob.glob(gfs_directory+'gfs.'+date_str_1+'00_F012.hdf')
        if (len(flist_0) == 0) or (len(flist_1) == 0):
            return None, None, None, None
        filelist = flist_0 + flist_1
    
        ftimes = []
        for pname in filelist:  # TODO: make better with regular expressions (someday)
            fname = os.path.split(pname)[1]
            toks = fname.split('.')
            tstr = toks[1].split('_')[0]
            dto = datetime.datetime.strptime(tstr, gfs_date_format+'%H').replace(tzinfo=timezone.utc)
            ftimes.append(dto.timestamp())
    
        tarr = np.array(ftimes)
        sidxs = tarr.argsort()
    
        farr = np.array(filelist)
        farr = farr[sidxs]
        ftimes = tarr[sidxs]
    
        above = ftimes >= timestamp
        if not above.any():
            return None, None, None, None
        tR = ftimes[above].min()
    
        below = ftimes <= timestamp
        if not below.any():
            return None, None, None, None
        tL = ftimes[below].max()
    
        iL = np.searchsorted(ftimes, tL, 'left')
        iR = iL + 1
    
        fList = farr.tolist()
    
        return fList[iL], ftimes[iL], fList[iR], ftimes[iR]
    
    
    def get_horz_layer(xr_dataset, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi):
        plevs = xr_dataset['pressure levels']
        fld = xr_dataset[fld_name]
    
        if lon360:
            if lon_lo < 0:
                lon_lo += 360
            if lon_hi < 0:
                lon_hi += 360
    
        x_lo = value_to_index(lon_coords, lon_lo)
        x_hi = value_to_index(lon_coords, lon_hi)
        y_lo = value_to_index(lat_coords, lat_lo)
        y_hi = value_to_index(lat_coords, lat_hi)
        p_idx = value_to_index(plevs.values, press)
    
        sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx]
        sub_fld = sub_fld.expand_dims('channel')
    
        sub_fld = sub_fld.assign_coords(channel=[fld_name], fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi])
    
        return sub_fld
    
    
    def get_horz_layer_s(xr_dataset, fld_names, press, lon_lo=None, lon_hi=None, lat_lo=None, lat_hi=None):
        plevs = xr_dataset['pressure levels']
    
        if lon360:
            if lon_lo < 0:
                lon_lo += 360
            if lon_hi < 0:
                lon_hi += 360
    
        x_lo = value_to_index(lon_coords, lon_lo)
        x_hi = value_to_index(lon_coords, lon_hi)
        y_lo = value_to_index(lat_coords, lat_lo)
        y_hi = value_to_index(lat_coords, lat_hi)
        p_idx = value_to_index(plevs.values, press)
    
        sub_fld_s = []
        for fld_name in fld_names:
            fld = xr_dataset[fld_name]
            sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx]
            sub_fld_s.append(sub_fld)
    
        sub_fld = xr.concat(sub_fld_s, 'channel')
        sub_fld = sub_fld.assign_coords(channel=fld_names, fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi])
    
        return sub_fld
    
    
    def get_time_interpolated_layer(ds_0, ds_1, time_0, time_1, time, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi):
        lyr_0 = get_horz_layer(ds_0, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi)
        lyr_1 = get_horz_layer(ds_1, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi)
    
        lyr = xr.concat([lyr_0, lyr_1], 'time')
        lyr = lyr.assign_coords(time=[time_0, time_1])
    
        intrp_lyr = lyr.interp(time=time, method='linear')
        intrp_lyr = intrp_lyr.expand_dims('channel')
        intrp_lyr = intrp_lyr.assign_coords(channel=[fld_name])
    
        return intrp_lyr
    
    
    def get_vert_profile(xr_dataset, fld_name, lons, lats):
        if lon360:  # convert -180/+180 to 0,360
            lons = np.where(lons < 0, lons + 360, lons)
    
        plevs = xr_dataset['pressure levels']
    
        fld = xr_dataset[fld_name]
        fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs)
    
        dim2 = xr.DataArray(lons, dims='k')
        dim1 = xr.DataArray(lats, dims='k')
    
        intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear')
    
        return intrp_fld
    
    
    def get_vert_profile_s(xr_dataset, fld_name_s, lons, lats):
        if lon360:  # convert -180,+180 to 0,360
            lons = np.where(lons < 0, lons + 360, lons)
    
        plevs = xr_dataset['pressure levels']
    
        fld_s = []
        for fld_name in fld_name_s:
            fld = xr_dataset[fld_name]
            fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs)
            fld_s.append(fld)
    
        fld = xr.concat(fld_s, 'fld_dim')
    
        dim2 = xr.DataArray(lons, dims='k')
        dim1 = xr.DataArray(lats, dims='k')
    
        intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear')
    
        return intrp_fld
    
    
    def get_point(xr_dataset, fld_name, lons, lats):
        if lon360:  # convert -180/+180 to 0,360
            lons = np.where(lons < 0, lons + 360, lons) # convert -180,180 to 0,360
    
        lat_coords = np.linspace(-90, 90, xr_dataset.fakeDim1.size)
        lon_coords = np.linspace(0, 359.5, xr_dataset.fakeDim2.size)
    
        fld = xr_dataset[fld_name]
        fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords)
    
        dim1 = xr.DataArray(lats, dims='k')
        dim2 = xr.DataArray(lons, dims='k')
    
        intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, method='linear')
    
        return intrp_fld
    
    
    def get_time_interpolated_vert_profile(ds_0, ds_1, time0, time1, fld_name, time, lons, lats):
    
        prof_0 = get_vert_profile(ds_0, fld_name, lons, lats)
        prof_1 = get_vert_profile(ds_1, fld_name, lons, lats)
    
        prof = xr.concat([prof_0, prof_1], 'time')
        prof = prof.assign_coords(time=[time0, time1])
    
        intrp_prof = prof.interp(time=time, method='linear')
    
        intrp_prof = intrp_prof.values
    
        return intrp_prof
    
    
    def get_time_interpolated_vert_profile_s(ds_0, ds_1, time0, time1, fld_name_s, time, lons, lats):
    
        prof_0 = get_vert_profile_s(ds_0, fld_name_s, lons, lats)
        prof_1 = get_vert_profile_s(ds_1, fld_name_s, lons, lats)
    
        prof = xr.concat([prof_0, prof_1], 'time')
        prof = prof.assign_coords(time=[time0, time1])
    
        intrp_prof = prof.interp(time=time, method='linear')
    
        intrp_prof = intrp_prof.values
    
        return intrp_prof
    
    
    def get_time_interpolated_point(ds_0, ds_1, time0, time1, fld_name, time, lons, lats):
        vals_0 = get_point(ds_0, fld_name, lons, lats)
        vals_1 = get_point(ds_1, fld_name, lons, lats)
    
        vals = xr.concat([vals_0, vals_1], 'time')
        vals = vals.assign_coords(time=[time0, time1])
    
        intrp_vals = vals.interp(time=time, method='linear')
    
        intrp_vals = intrp_vals.values
    
        return intrp_vals