Skip to content
Snippets Groups Projects
util.py 4.45 KiB
Newer Older
rink's avatar
rink committed
import numpy as np
tomrink's avatar
tomrink committed
import datetime
tomrink's avatar
tomrink committed
from datetime import timezone
rink's avatar
rink committed
from metpy.units import units
from metpy.calc import thickness_hydrostatic
tomrink's avatar
tomrink committed
from collections import namedtuple
tomrink's avatar
tomrink committed
import os
tomrink's avatar
tomrink committed

LatLonTuple = namedtuple('LatLonTuple', ['lat', 'lon'])
rink's avatar
rink committed


tomrink's avatar
tomrink committed
class GenericException(Exception):
rink's avatar
rink committed
    def __init__(self, message):
        self.message = message
rink's avatar
rink committed


tomrink's avatar
tomrink committed
def get_time_tuple_utc(timestamp):
    dt_obj = datetime.datetime.fromtimestamp(timestamp, timezone.utc)
    return dt_obj, dt_obj.timetuple()


tomrink's avatar
tomrink committed
def add_time_range_to_filename(pathname, tstart, tend):
    dt_obj, _ = get_time_tuple_utc(tstart)
    str_start = dt_obj.strftime('%Y%m%d%H')

    dt_obj, _ = get_time_tuple_utc(tend)
    str_end = dt_obj.strftime('%Y%m%d%H')

    filename = os.path.split(pathname)[1]
    w_o_ext, ext = os.path.splitext(filename)
    filename = w_o_ext+'_'+str_start+'_'+str_end+ext

    path = os.path.split(pathname)[0]
    path = path+'/'+filename
    return path


rink's avatar
rink committed
def haversine_np(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees)

    (lon1, lat1) must be broadcastable with (lon2, lat2).

    """
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2

    c = 2 * np.arcsin(np.sqrt(a))
    km = 6367 * c
    return km


def bin_data_by(a, b, bin_ranges):
    nbins = len(bin_ranges)
    binned_data = []

    for i in range(nbins):
        rng = bin_ranges[i]
        idxs = (b >= rng[0]) & (b < rng[1])
        binned_data.append(a[idxs])

    return binned_data


def get_bin_ranges(lop, hip, bin_size=100):
    bin_ranges = []
    delp = hip - lop
    nbins = int(delp/bin_size)

    for i in range(nbins):
        rng = [lop + i*bin_size, lop + i*bin_size + bin_size]
        bin_ranges.append(rng)

    return bin_ranges


# t must be monotonic increasing
def get_breaks(t, threshold):
    t_0 = t[0:t.shape[0]-1]
    t_1 = t[1:t.shape[0]]
    d = t_1 - t_0
    idxs = np.nonzero(d > threshold)
    return idxs

rink's avatar
rink committed

tomrink's avatar
tomrink committed
def pressure_to_altitude(pres, temp, prof_pres, prof_temp, sfc_pres=None, sfc_temp=None, sfc_elev=0):
rink's avatar
rink committed
    if not np.all(np.diff(prof_pres) > 0):
tomrink's avatar
tomrink committed
        raise GenericException("target pressure profile must be monotonic increasing")
rink's avatar
rink committed

    if pres < prof_pres[0]:
tomrink's avatar
tomrink committed
        raise GenericException("target pressure less than top of pressure profile")
rink's avatar
rink committed

    if temp is None:
        temp = np.interp(pres, prof_pres, prof_temp)

    i_top = np.argmax(np.extract(prof_pres <= pres, prof_pres)) + 1

    pres_s = prof_pres.tolist()
    temp_s = prof_temp.tolist()

    pres_s = [pres] + pres_s[i_top:]
    temp_s = [temp] + temp_s[i_top:]

tomrink's avatar
tomrink committed
    if sfc_pres is not None:
tomrink's avatar
tomrink committed
        if pres > sfc_pres:  # incoming pressure below surface
tomrink's avatar
tomrink committed
            return -1

        prof_pres = np.array(pres_s)
        prof_temp = np.array(temp_s)
        i_bot = prof_pres.shape[0] - 1

rink's avatar
rink committed
        if sfc_pres > prof_pres[i_bot]:  # surface below profile bottom
tomrink's avatar
tomrink committed
            pres_s = pres_s + [sfc_pres]
            temp_s = temp_s + [sfc_temp]
        else:
            idx = np.argmax(np.extract(prof_pres < sfc_pres, prof_pres))
            if sfc_temp is None:
                sfc_temp = np.interp(sfc_pres, prof_pres, prof_temp)
            pres_s = prof_pres.tolist()
            temp_s = prof_temp.tolist()
            pres_s = pres_s[0:idx+1] + [sfc_pres]
            temp_s = temp_s[0:idx+1] + [sfc_temp]
rink's avatar
rink committed

    prof_pres = np.array(pres_s)
    prof_temp = np.array(temp_s)

rink's avatar
rink committed
    prof_pres = prof_pres[::-1]
    prof_temp = prof_temp[::-1]

rink's avatar
rink committed
    prof_pres = prof_pres * units.hectopascal
    prof_temp = prof_temp * units.kelvin
    sfc_elev = sfc_elev * units.meter

    z = thickness_hydrostatic(prof_pres, prof_temp) + sfc_elev

    return z


# http://fourier.eng.hmc.edu/e176/lectures/NM/node25.html
def minimize_quadratic(xa, xb, xc, ya, yb, yc):
tomrink's avatar
tomrink committed
    x_m = xb + 0.5*(((ya-yb)*(xc-xb)*(xc-xb) - (yc-yb)*(xb-xa)*(xb-xa)) / ((ya-yb)*(xc-xb) + (yc-yb)*(xb-xa)))
tomrink's avatar
tomrink committed
    return x_m


def value_to_index(nda, value):
    diff = np.abs(nda - value)
    idx = np.argmin(diff)
tomrink's avatar
tomrink committed
    return idx


tomrink's avatar
tomrink committed
# array solzen must be degrees, missing values must NaN. For small 50x50km regions only
def is_night(solzen, test_angle=80.0, threshold=0.10):
tomrink's avatar
tomrink committed
    solzen = solzen.flatten()
    solzen = solzen[np.invert(np.isnan(solzen))]
tomrink's avatar
tomrink committed
    if len(solzen) == 0 or (np.sum(solzen > test_angle) / len(solzen)) > threshold:
tomrink's avatar
tomrink committed
        return True
    else:
        return False