Skip to content
Snippets Groups Projects
util.py 6.22 KiB
Newer Older
rink's avatar
rink committed
import numpy as np
tomrink's avatar
tomrink committed
import datetime
tomrink's avatar
tomrink committed
from datetime import timezone
rink's avatar
rink committed
from metpy.units import units
from metpy.calc import thickness_hydrostatic
tomrink's avatar
tomrink committed
from collections import namedtuple
tomrink's avatar
tomrink committed
import os
tomrink's avatar
tomrink committed

LatLonTuple = namedtuple('LatLonTuple', ['lat', 'lon'])
rink's avatar
rink committed

tomrink's avatar
tomrink committed
homedir = os.path.expanduser('~') + '/'

rink's avatar
rink committed

tomrink's avatar
tomrink committed
class GenericException(Exception):
rink's avatar
rink committed
    def __init__(self, message):
        self.message = message
rink's avatar
rink committed


tomrink's avatar
tomrink committed
class EarlyStop:
    def __init__(self, window_length=3, patience=5):
        self.patience = patience
tomrink's avatar
tomrink committed
        self.min = np.finfo(np.single).max
tomrink's avatar
tomrink committed
        self.cnt = 0
        self.cnt_wait = 0
        self.window = np.zeros(window_length, dtype=np.single)
        self.window.fill(np.nan)

    def check_stop(self, value):
        self.window[:-1] = self.window[1:]
        self.window[-1] = value

        if np.any(np.isnan(self.window)):
            return False

        ave = np.mean(self.window)
        if ave < self.min:
            self.min = ave
            self.cnt_wait = 0
            return False
        else:
            self.cnt_wait += 1

        if self.cnt_wait > self.patience:
            return True
        else:
            return False


tomrink's avatar
tomrink committed
def get_time_tuple_utc(timestamp):
    dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc)
    return dt_obj, dt_obj.timetuple()


tomrink's avatar
tomrink committed
def add_time_range_to_filename(pathname, tstart, tend):
    dt_obj, _ = get_time_tuple_utc(tstart)
    str_start = dt_obj.strftime('%Y%m%d%H')

    dt_obj, _ = get_time_tuple_utc(tend)
    str_end = dt_obj.strftime('%Y%m%d%H')

    filename = os.path.split(pathname)[1]
    w_o_ext, ext = os.path.splitext(filename)
    filename = w_o_ext+'_'+str_start+'_'+str_end+ext

    path = os.path.split(pathname)[0]
    path = path+'/'+filename
    return path


rink's avatar
rink committed
def haversine_np(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees)

    (lon1, lat1) must be broadcastable with (lon2, lat2).

    """
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2

    c = 2 * np.arcsin(np.sqrt(a))
    km = 6367 * c
    return km


def bin_data_by(a, b, bin_ranges):
    nbins = len(bin_ranges)
    binned_data = []

    for i in range(nbins):
        rng = bin_ranges[i]
        idxs = (b >= rng[0]) & (b < rng[1])
        binned_data.append(a[idxs])

    return binned_data


def get_bin_ranges(lop, hip, bin_size=100):
    bin_ranges = []
    delp = hip - lop
    nbins = int(delp/bin_size)

    for i in range(nbins):
        rng = [lop + i*bin_size, lop + i*bin_size + bin_size]
        bin_ranges.append(rng)

    return bin_ranges


# t must be monotonic increasing
def get_breaks(t, threshold):
    t_0 = t[0:t.shape[0]-1]
    t_1 = t[1:t.shape[0]]
    d = t_1 - t_0
    idxs = np.nonzero(d > threshold)
    return idxs

rink's avatar
rink committed

tomrink's avatar
tomrink committed
def pressure_to_altitude(pres, temp, prof_pres, prof_temp, sfc_pres=None, sfc_temp=None, sfc_elev=0):
rink's avatar
rink committed
    if not np.all(np.diff(prof_pres) > 0):
tomrink's avatar
tomrink committed
        raise GenericException("target pressure profile must be monotonic increasing")
rink's avatar
rink committed

    if pres < prof_pres[0]:
tomrink's avatar
tomrink committed
        raise GenericException("target pressure less than top of pressure profile")
rink's avatar
rink committed

    if temp is None:
        temp = np.interp(pres, prof_pres, prof_temp)

    i_top = np.argmax(np.extract(prof_pres <= pres, prof_pres)) + 1

    pres_s = prof_pres.tolist()
    temp_s = prof_temp.tolist()

    pres_s = [pres] + pres_s[i_top:]
    temp_s = [temp] + temp_s[i_top:]

tomrink's avatar
tomrink committed
    if sfc_pres is not None:
tomrink's avatar
tomrink committed
        if pres > sfc_pres:  # incoming pressure below surface
tomrink's avatar
tomrink committed
            return -1

        prof_pres = np.array(pres_s)
        prof_temp = np.array(temp_s)
        i_bot = prof_pres.shape[0] - 1

rink's avatar
rink committed
        if sfc_pres > prof_pres[i_bot]:  # surface below profile bottom
tomrink's avatar
tomrink committed
            pres_s = pres_s + [sfc_pres]
            temp_s = temp_s + [sfc_temp]
        else:
            idx = np.argmax(np.extract(prof_pres < sfc_pres, prof_pres))
            if sfc_temp is None:
                sfc_temp = np.interp(sfc_pres, prof_pres, prof_temp)
            pres_s = prof_pres.tolist()
            temp_s = prof_temp.tolist()
            pres_s = pres_s[0:idx+1] + [sfc_pres]
            temp_s = temp_s[0:idx+1] + [sfc_temp]
rink's avatar
rink committed

    prof_pres = np.array(pres_s)
    prof_temp = np.array(temp_s)

rink's avatar
rink committed
    prof_pres = prof_pres[::-1]
    prof_temp = prof_temp[::-1]

rink's avatar
rink committed
    prof_pres = prof_pres * units.hectopascal
    prof_temp = prof_temp * units.kelvin
    sfc_elev = sfc_elev * units.meter

    z = thickness_hydrostatic(prof_pres, prof_temp) + sfc_elev

    return z


# http://fourier.eng.hmc.edu/e176/lectures/NM/node25.html
def minimize_quadratic(xa, xb, xc, ya, yb, yc):
tomrink's avatar
tomrink committed
    x_m = xb + 0.5*(((ya-yb)*(xc-xb)*(xc-xb) - (yc-yb)*(xb-xa)*(xb-xa)) / ((ya-yb)*(xc-xb) + (yc-yb)*(xb-xa)))
tomrink's avatar
tomrink committed
    return x_m


def value_to_index(nda, value):
    diff = np.abs(nda - value)
    idx = np.argmin(diff)
tomrink's avatar
tomrink committed
    return idx


# array solzen must be degrees, missing values must NaN. For small roughly 50x50km regions only
def is_day(solzen, test_angle=80.0):
tomrink's avatar
tomrink committed
    solzen = solzen.flatten()
    solzen = solzen[np.invert(np.isnan(solzen))]
    if len(solzen) == 0 or np.sum(solzen <= test_angle) < len(solzen):
tomrink's avatar
tomrink committed
        return False
    else:
        return True


# array solzen must be degrees, missing values must NaN. For small roughly 50x50km regions only
def is_night(solzen, test_angle=100.0):
    solzen = solzen.flatten()
    solzen = solzen[np.invert(np.isnan(solzen))]
    if len(solzen) == 0 or np.sum(solzen >= test_angle) < len(solzen):
        return False
    else:
tomrink's avatar
tomrink committed
        return True


def get_grid_values_all(h5f, grid_name, scale_factor_name='scale_factor', add_offset_name='add_offset'):
    hfds = h5f[grid_name]
    attrs = hfds.attrs

    grd_vals = hfds[:,:]
    grd_vals = np.where(grd_vals == -999, np.nan, grd_vals)
    grd_vals = np.where(grd_vals == -127, np.nan, grd_vals)
    grd_vals = np.where(grd_vals == -32768, np.nan, grd_vals)

    if attrs is None:
        return grd_vals

    if scale_factor_name is not None:
        scale_factor = attrs.get(scale_factor_name)[0]
        grd_vals = grd_vals * scale_factor

    if add_offset_name is not None:
        add_offset = attrs.get(add_offset_name)[0]
        grd_vals = grd_vals + add_offset

    return grd_vals