Skip to content
Snippets Groups Projects
gfs_reader.py 7.65 KiB
import datetime
from datetime import timezone
import glob
import numpy as np
import xarray as xr
from util.util import value_to_index
#import cartopy
#from cartopy import *
from metpy import *
#import cartopy.crs as ccrs
import random

#gfs_directory = '/apollo/cloud/Ancil_Data/clavrx_ancil_data/dynamic/gfs/'
gfs_directory = '/Users/tomrink/data/gfs/'
gfs_date_format = '%y%m%d'

lon360 = True
lat_coords = np.linspace(-90, 90, 361)
lon_coords = np.linspace(0, 359.5, 720)


class MyGenericException(Exception):
    def __init__(self, message):
        self.message = message


def get_time_tuple_utc(timestamp):
    dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc)
    return dt_obj, dt_obj.timetuple()


def get_bounding_gfs_files(timestamp):
    dt_obj, time_tup = get_time_tuple_utc(timestamp)
    date_str = dt_obj.strftime(gfs_date_format)
    dt_obj0 = datetime.datetime.strptime(date_str, gfs_date_format).replace(tzinfo=timezone.utc)
    dt_obj1 = dt_obj0 + datetime.timedelta(days=1)
    date_str_1 = dt_obj1.strftime(gfs_date_format)

    flist_0 = glob.glob(gfs_directory+'gfs.'+date_str+'??_F012.h5')
    flist_1 = glob.glob(gfs_directory+'gfs.'+date_str_1+'00_F012.h5')
    if (len(flist_0) == 0) or (len(flist_1) == 0):
        return None, None, None, None
    filelist = flist_0 + flist_1

    ftimes = []
    for pname in filelist:  # TODO: make better with regular expressions (someday)
        fname = os.path.split(pname)[1]
        toks = fname.split('.')
        tstr = toks[1].split('_')[0]
        dto = datetime.datetime.strptime(tstr, gfs_date_format+'%H').replace(tzinfo=timezone.utc)
        ftimes.append(dto.timestamp())

    tarr = np.array(ftimes)
    sidxs = tarr.argsort()

    farr = np.array(filelist)
    farr = farr[sidxs]
    ftimes = tarr[sidxs]

    above = ftimes >= timestamp
    if not above.any():
        return None, None, None, None
    tR = ftimes[above].min()

    below = ftimes <= timestamp
    if not below.any():
        return None, None, None, None
    tL = ftimes[below].max()

    iL = np.searchsorted(ftimes, tL, 'left')
    iR = iL + 1

    fList = farr.tolist()

    return fList[iL], ftimes[iL], fList[iR], ftimes[iR]


def get_horz_layer(xr_dataset, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi):
    plevs = xr_dataset['pressure levels']
    fld = xr_dataset[fld_name]

    if lon360:
        if lon_lo < 0:
            lon_lo += 360
        if lon_hi < 0:
            lon_hi += 360

    x_lo = value_to_index(lon_coords, lon_lo)
    x_hi = value_to_index(lon_coords, lon_hi)
    y_lo = value_to_index(lat_coords, lat_lo)
    y_hi = value_to_index(lat_coords, lat_hi)
    p_idx = value_to_index(plevs.values, press)

    sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx]
    sub_fld = sub_fld.expand_dims('channel')

    sub_fld = sub_fld.assign_coords(channel=[fld_name], fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi])

    return sub_fld


def get_horz_layer_s(xr_dataset, fld_names, press, lon_lo=None, lon_hi=None, lat_lo=None, lat_hi=None):
    plevs = xr_dataset['pressure levels']

    if lon360:
        if lon_lo < 0:
            lon_lo += 360
        if lon_hi < 0:
            lon_hi += 360

    x_lo = value_to_index(lon_coords, lon_lo)
    x_hi = value_to_index(lon_coords, lon_hi)
    y_lo = value_to_index(lat_coords, lat_lo)
    y_hi = value_to_index(lat_coords, lat_hi)
    p_idx = value_to_index(plevs.values, press)

    sub_fld_s = []
    for fld_name in fld_names:
        fld = xr_dataset[fld_name]
        sub_fld = fld[y_lo:y_hi, x_lo:x_hi, p_idx]
        sub_fld_s.append(sub_fld)

    sub_fld = xr.concat(sub_fld_s, 'channel')
    sub_fld = sub_fld.assign_coords(channel=fld_names, fakeDim2=lon_coords[x_lo:x_hi], fakeDim1=lat_coords[y_lo:y_hi])

    return sub_fld


def get_time_interpolated_layer(ds_0, ds_1, time_0, time_1, time, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi):
    lyr_0 = get_horz_layer(ds_0, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi)
    lyr_1 = get_horz_layer(ds_1, fld_name, press, lon_lo, lon_hi, lat_lo, lat_hi)

    lyr = xr.concat([lyr_0, lyr_1], 'time')
    lyr = lyr.assign_coords(time=[time_0, time_1])

    intrp_lyr = lyr.interp(time=time, method='linear')
    intrp_lyr = intrp_lyr.expand_dims('channel')
    intrp_lyr = intrp_lyr.assign_coords(channel=[fld_name])

    return intrp_lyr


def get_time_interpolated_layer_s(ds_0, ds_1, time_0, time_1, time, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi):
    lyr_s_0 = get_horz_layer_s(ds_0, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi)
    lyr_s_1 = get_horz_layer_s(ds_1, fld_name_s, press, lon_lo, lon_hi, lat_lo, lat_hi)

    lyr = xr.concat([lyr_s_0, lyr_s_1], 'time')
    lyr = lyr.assign_coords(time=[time_0, time_1])

    intrp_lyr = lyr.interp(time=time, method='linear')

    return intrp_lyr


def get_vert_profile(xr_dataset, fld_name, lons, lats):
    if lon360:  # convert -180/+180 to 0,360
        lons = np.where(lons < 0, lons + 360, lons)

    plevs = xr_dataset['pressure levels']

    fld = xr_dataset[fld_name]
    fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs)

    dim2 = xr.DataArray(lons, dims='k')
    dim1 = xr.DataArray(lats, dims='k')

    intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear')

    return intrp_fld


def get_vert_profile_s(xr_dataset, fld_name_s, lons, lats):
    if lon360:  # convert -180,+180 to 0,360
        lons = np.where(lons < 0, lons + 360, lons)

    plevs = xr_dataset['pressure levels']

    fld_s = []
    for fld_name in fld_name_s:
        fld = xr_dataset[fld_name]
        fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords, fakeDim0=plevs)
        fld_s.append(fld)

    fld = xr.concat(fld_s, 'fld_dim')

    dim2 = xr.DataArray(lons, dims='k')
    dim1 = xr.DataArray(lats, dims='k')

    intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, fakeDim0=plevs, method='linear')

    return intrp_fld


def get_point(xr_dataset, fld_name, lons, lats):
    if lon360:  # convert -180/+180 to 0,360
        lons = np.where(lons < 0, lons + 360, lons) # convert -180,180 to 0,360

    lat_coords = np.linspace(-90, 90, xr_dataset.fakeDim1.size)
    lon_coords = np.linspace(0, 359.5, xr_dataset.fakeDim2.size)

    fld = xr_dataset[fld_name]
    fld = fld.assign_coords(fakeDim2=lon_coords, fakeDim1=lat_coords)

    dim1 = xr.DataArray(lats, dims='k')
    dim2 = xr.DataArray(lons, dims='k')

    intrp_fld = fld.interp(fakeDim1=dim1, fakeDim2=dim2, method='linear')

    return intrp_fld


def get_time_interpolated_vert_profile(ds_0, ds_1, time0, time1, fld_name, time, lons, lats):

    prof_0 = get_vert_profile(ds_0, fld_name, lons, lats)
    prof_1 = get_vert_profile(ds_1, fld_name, lons, lats)

    prof = xr.concat([prof_0, prof_1], 'time')
    prof = prof.assign_coords(time=[time0, time1])

    intrp_prof = prof.interp(time=time, method='linear')

    intrp_prof = intrp_prof.values

    return intrp_prof


def get_time_interpolated_vert_profile_s(ds_0, ds_1, time0, time1, fld_name_s, time, lons, lats):

    prof_0 = get_vert_profile_s(ds_0, fld_name_s, lons, lats)
    prof_1 = get_vert_profile_s(ds_1, fld_name_s, lons, lats)

    prof = xr.concat([prof_0, prof_1], 'time')
    prof = prof.assign_coords(time=[time0, time1])

    intrp_prof = prof.interp(time=time, method='linear')

    intrp_prof = intrp_prof.values

    return intrp_prof


def get_time_interpolated_point(ds_0, ds_1, time0, time1, fld_name, time, lons, lats):
    vals_0 = get_point(ds_0, fld_name, lons, lats)
    vals_1 = get_point(ds_1, fld_name, lons, lats)

    vals = xr.concat([vals_0, vals_1], 'time')
    vals = vals.assign_coords(time=[time0, time1])

    intrp_vals = vals.interp(time=time, method='linear')

    intrp_vals = intrp_vals.values

    return intrp_vals