import datetime, os from datetime import timezone import glob import fnmatch import numpy as np import re import pickle from netCDF4 import Dataset from pathlib import Path from util.util import GenericException from util.lon_lat_grid import LonLatGrid from util.geos_nav import GEOSNavigation from util.setup import ancillary_path # def get_parameters_clavrx(filename=homedir+'data/clavrx/clavrx_OR_ABI-L1b-RadF-M6C01_G16_s20192930000343.level2.nc'): def get_parameters_clavrx(filename=ancillary_path+'clavrx_parameters.pkl'): # rg = Dataset(filename, 'r') # var_s = rg.variables # var_names = list(var_s.keys()) # var_names_2d = [] # # for str in var_names: # v = var_s[str] # if len(v.shape) == 2: # if not (str.find('latitude') != -1 or str.find('longitude') != -1): # var_names_2d.append(str) # # rg.close() var_names_2d = pickle.load(open(filename, 'rb')) return var_names_2d caliop_clavrx_params = None def get_parameters_caliop_clavrx(filename='/data/Personal/rink/clavrx_calipso/g16_s20201050200_06kmCLay.matchup.calipso.h5'): global caliop_clavrx_params if caliop_clavrx_params is not None: return caliop_clavrx_params rg = Dataset(filename, 'r') var_s = rg.variables var_names = list(var_s.keys()) var_names_keep = [] for str in var_names: v = var_s[str] if len(v.shape) == 1: if not (str.find('latitude') != -1 or str.find('longitude') != -1): var_names_keep.append(str) rg.close() caliop_clavrx_params = var_names_keep return var_names_keep def get_parameters_fmwk_amvs(filename='/ships19/cloud/scratch/4TH_AMV_INTERCOMPARISON/FMWK2_AMV/GOES16_ABI_2KM_FD_2019293_0020_34_WINDS_AMV_EN-14CT.nc'): rg = Dataset(filename, 'r') var_s = rg.variables var_names = list(var_s.keys()) var_names_to_keep = [] lon = var_s['Longitude'] num_amvs = lon.shape[0] for str in var_names: v = var_s[str] if len(v.shape) == 1 and v.shape[0] == num_amvs: var_names_to_keep.append(str) rg.close() return var_names_to_keep class Files: def __init__(self, files_path, file_time_span, pattern): self.flist = [] self.ftimes = [] self.dto_s = [] for path in glob.glob(files_path + pattern, recursive=True): self.flist.append(path) if len(self.flist) == 0: raise GenericException('no matching files found in: ' + files_path + ' matching: ' + pattern) self.span_seconds = datetime.timedelta(minutes=file_time_span).seconds for pname in self.flist: dto = self.get_datetime(pname) dto_start = dto dto_end = dto + datetime.timedelta(minutes=file_time_span) self.ftimes.append((dto_start.timestamp(), dto_end.timestamp())) self.dto_s.append(dto) self.ftimes = np.array(self.ftimes) self.flist = np.array(self.flist) self.dto_s = np.array(self.dto_s) sidxs = np.argsort(self.ftimes[:, 0]) # sort on start time self.ftimes = self.ftimes[sidxs, :] self.flist = self.flist[sidxs] self.dto_s = self.dto_s[sidxs] self._current_index = 0 def get_datetime(self, pathname): """ :param pathname: The full-path of the file. :return: The file's time label as a datetime object. """ pass def get_file_containing_time(self, timestamp): """ :param timestamp: seconds since the epoch :return: the file whose time range contains timestamp. """ k = -1 for i in range(self.ftimes.shape[0]): if (timestamp >= self.ftimes[i, 0]) and (timestamp < self.ftimes[i, 1]): k = i break if k < 0: return None, None, None return self.flist[k], self.ftimes[k, 0], k def get_file(self, timestamp, window=None): """ :param timestamp: seconds since the epoch. :param window: the duration (minutes) of files in this object. Defaults to the constructor time span. :return: the file whose duration contains the timestamp. """ if window is None: window = self.span_seconds else: window = datetime.timedelta(minutes=window).seconds diff = self.ftimes[:, 0] - timestamp midx = np.argmin(np.abs(diff)) if np.abs(self.ftimes[midx, 0] - timestamp) < window: return self.flist[midx], self.ftimes[midx, 0], midx else: return None, None, None def get_file_in_range(self, timestamp, t_lo_minutes, t_hi_minutes): """ :param timestamp: :param t_lo_minutes: can be negative or positive :param t_hi_minutes: can be negative or positive, but must be greater than t_lo_minutes. :return: the file whose time label is within the range (timestamp + t_lo_minutes, timestamp + t_hi_minutes). If more than one, return the first occurrence. """ if t_hi_minutes <= t_lo_minutes: raise ValueError('t_hi_minutes must be greater than t_lo_minutes') t_lo = timestamp + datetime.timedelta(minutes=t_lo_minutes).seconds t_hi = timestamp + datetime.timedelta(minutes=t_hi_minutes).seconds m_idx = np.where(np.logical_and(self.ftimes[:, 0] >= t_lo, self.ftimes[:, 0] < t_hi))[0] if len(m_idx) > 0: m_idx = m_idx[0] # the first occurrence return self.flist[m_idx], self.ftimes[m_idx, 0], m_idx else: return None, None, None def get_parameters(self): pass def __iter__(self): return self def __next__(self): if self._current_index < len(self.flist): fname = self.flist[self._current_index] t_start = self.ftimes[self._current_index, 0] t_stop = self.ftimes[self._current_index, 1] dto = self.dto_s[self._current_index] self._current_index += 1 return fname, t_start, t_stop, dto self._current_index = 0 raise StopIteration class GOESL1B(Files): def __init__(self, files_path, band='14', pattern='OR_ABI-L1b-RadC-M*C14'+'*.nc'): super().__init__(files_path, 10, pattern) def get_datetime(self, pathname): filename = os.path.split(pathname)[1] so = re.search('_s\\d{11}', filename) dt_str = so.group() dto = datetime.datetime.strptime(dt_str, '_s%Y%j%H%M').replace(tzinfo=timezone.utc) return dto # GOES-16, CONUS. TODO: Generalize to G-16 and FD, MESO class CLAVRx(Files): def __init__(self, files_path, pattern='clavrx_OR_ABI-L1b*.level2.nc', file_time_span=10): super().__init__(files_path, file_time_span, pattern) self.params = get_parameters_clavrx() def get_datetime(self, pathname): filename = os.path.split(pathname)[1] so = re.search('_s\\d{11}', filename) dt_str = so.group() dto = datetime.datetime.strptime(dt_str, '_s%Y%j%H%M').replace(tzinfo=timezone.utc) return dto def get_parameters(self): return self.params def get_navigation(self, h5f): return GEOSNavigation(sub_lon=-75.0, CFAC=5.6E-05, COFF=-0.101332, LFAC=-5.6E-05, LOFF=0.128212, num_elems=2500, num_lines=1500) # filename must begin with 'clavrx_H08_YYYYMMDD_HHMM' class CLAVRx_H08(Files): def __init__(self, files_path, pattern='clavrx_H08*.level2.nc'): super().__init__(files_path, 10, pattern) self.params = get_parameters_clavrx() def get_datetime(self, pathname): filename = os.path.split(pathname)[1] dt_str = filename[11:24] dto = datetime.datetime.strptime(dt_str, '%Y%m%d_%H%M').replace(tzinfo=timezone.utc) return dto def get_parameters(self): return self.params def get_navigation(self, h5f): return None class CLAVRx_H09(Files): def __init__(self, files_path, pattern='clavrx_H09*.level2.nc'): super().__init__(files_path, 10, pattern) self.params = get_parameters_clavrx() def get_datetime(self, pathname): filename = os.path.split(pathname)[1] dt_str = filename[11:24] dto = datetime.datetime.strptime(dt_str, '%Y%m%d_%H%M').replace(tzinfo=timezone.utc) return dto def get_parameters(self): return self.params def get_navigation(self, h5f): return None class CLAVRx_CALIPSO(Files): def __init__(self, files_path, pattern='*_06kmCLay.matchup.calipso.h5'): super().__init__(files_path, 10, pattern) self.params = ['cld_opd_acha', 'cld_reff_acha', 'cld_temp_acha', 'cld_press_acha', 'cld_height_acha'] # self.params = get_parameters_clavrx() def get_datetime(self, pathname): filename = os.path.split(pathname)[1] dto = datetime.datetime.strptime(filename, 'g16_s%Y%j%H%M_06kmCLay.matchup.calipso.h5').replace(tzinfo=timezone.utc) return dto def get_parameters(self): return self.params def get_navigation(self, h5f): return None class CLAVRx_VIIRS(Files): def __init__(self, files_path, pattern='clavrx*viirs*level2.??'): super().__init__(files_path, 5, pattern) self.params = get_parameters_clavrx() def get_datetime(self, pathname): filename = os.path.split(pathname)[1] toks = filename.split('.') # so = re.search('\\d{11}', toks[4]) # dt_str = so.group() dt_str = toks[1] + toks[2] dto = datetime.datetime.strptime(dt_str, 'A%Y%j%H%M').replace(tzinfo=timezone.utc) return dto def get_parameters(self): return self.params def get_navigation(self, h5f): lons = h5f['longitude'][:, :] lats = h5f['latitude'][:, :] lons = np.where(lons < 0, lons + 360, lons) ll_grid = LonLatGrid(lons, lats, reduce=2) return ll_grid class RAOBfiles(Files): def __init__(self, files_path, file_time_span=10, pattern='raob_soundings*.cdf'): super().__init__(files_path, file_time_span, pattern) def get_datetime(self, pathname): filename = os.path.split(pathname)[1] dt_str = (((filename.split('raob_soundings'))[1]).split('.'))[0] dto = datetime.datetime.strptime(dt_str, '%Y%m%d_%H%M').replace(tzinfo=timezone.utc) return dto class GFSfiles(Files): def __init__(self, files_path, pattern='gfs*.h5'): super().__init__(files_path, 10, pattern) def get_datetime(self, pathname): filename = os.path.split(pathname)[1] dto = datetime.datetime.strptime(filename, 'gfs.%y%m%d%H_F012.h5').replace(tzinfo=timezone.utc) dto += datetime.timedelta(hours=12) return dto class CrIS_Retrieval(Files): def __init__(self, files_path, file_time_span=8, pattern='CrIS_*atm_prof_rtv.h5'): super().__init__(files_path, file_time_span, pattern) def get_datetime(self, pathname): filename = os.path.split(pathname)[1] dt_str = re.search('_d.{14}', filename).group(0) dto = datetime.datetime.strptime(dt_str, '_d%Y%m%d_t%H%M').replace(tzinfo=timezone.utc) return dto class AMVFiles: def __init__(self, files_path, file_time_span, pattern, band='14', elem_name=None, line_name=None, lat_name=None, lon_name=None, press_name=None, params=None, out_params=None, meta_dict=None, qc_params=None): self.flist = glob.glob(files_path + pattern) if len(self.flist) == 0: raise GenericException('no matching files found in: ' + files_path) self.band = band self.ftimes = [] self.span_seconds = datetime.timedelta(minutes=file_time_span).seconds for pname in self.flist: dto = self.get_datetime(pname) dto_start = dto dto_end = dto + datetime.timedelta(minutes=file_time_span) self.ftimes.append((dto_start.timestamp(), dto_end.timestamp())) self.ftimes = np.array(self.ftimes) self.flist = np.array(self.flist) sidxs = np.argsort(self.ftimes[:, 0]) self.ftimes = self.ftimes[sidxs, :] self.flist = self.flist[sidxs] self.elem_name = elem_name self.line_name = line_name self.lon_name = lon_name self.lat_name = lat_name self.press_name = press_name self.params = params self.out_params = params if out_params is not None: self.out_params = out_params self.meta_dict = meta_dict self.qc_params = qc_params def get_datetime(self, pathname): pass def get_navigation(self): pass def get_file_containing_time(self, timestamp): k = -1 for i in range(self.ftimes.shape[0]): if (timestamp >= self.ftimes[i, 0]) and (timestamp < self.ftimes[i, 1]): k = i break if k < 0: return None, None, None return self.flist[k], self.ftimes[k, 0], k def get_file(self, timestamp, window=None): if window is None: window = self.span_seconds else: window = datetime.timedelta(minutes=window).seconds diff = self.ftimes[:, 0] - timestamp midx = np.argmin(np.abs(diff)) if np.abs(self.ftimes[midx, 0] - timestamp) < window: return self.flist[midx], self.ftimes[midx, 0], midx else: return None, None, None def get_parameters(self): return self.params def get_out_parameters(self): return self.out_params def get_meta_dict(self): return self.meta_dict def get_qc_params(self): return self.qc_params def filter(self, qc_param): pass class Framework(AMVFiles): def __init__(self, files_path, file_time_span, band='14', pattern='*WINDS_AMV_EN-'+'14'+'*.nc'): elem_name = 'Element' line_name = 'Line' lon_name = 'Longitude' lat_name = 'Latitude' params = get_parameters_fmwk_amvs() params.remove(elem_name) params.remove(line_name) params.remove(lon_name) params.remove(lat_name) params.remove('MedianPress') params.remove('Wind_Speed') params.remove('Wind_Dir') params = ['MedianPress', 'Wind_Speed', 'Wind_Dir'] + params out_params = ['Lon', 'Lat', 'Element', 'Line', 'pressure', 'wind_speed', 'wind_direction'] meta_dict = {'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4'), 'pressure': ('hPa', 'f4'), 'wind_speed': ('m s-1', 'f4'), 'wind_direction': ('degrees', 'f4')} #qc_param = 'QI' qc_param = 'Flag' super().__init__(files_path, file_time_span, pattern, band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict, qc_params=qc_param) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto def filter(self, qc_param): # good = qc_param > 50 good = qc_param == 0 return good class FrameworkCloudHeight(AMVFiles): def __init__(self, files_path, file_time_span, pattern='*_CLOUD_HEIGHT_EN'+'*.nc'): elem_name = 'Element' line_name = 'Line' lon_name = 'Longitude' lat_name = 'Latitude' out_params = ['CldTopPres', 'CldTopHght', 'CldOptDpth'] params = ['CldTopPres', 'CldTopHght', 'CldOptDpth'] meta_dict = {'CldTopPres': ('hPa', 'f4'), 'CldTopHght': ('km', 'f4'), 'CldOptDpth': ('km', 'f4')} super().__init__(files_path, file_time_span, pattern, band=None, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class FrameworkCloudPhase(AMVFiles): def __init__(self, files_path, file_time_span, pattern='*_CLOUD_PHASE_EN'+'*.nc'): elem_name = 'Element' line_name = 'Line' lon_name = 'Longitude' lat_name = 'Latitude' out_params = ['CloudPhase', 'CloudType'] params = ['CloudPhase', 'CloudType'] meta_dict = {'CloudPhase': (None, 'i1'), 'CloudType': (None, 'i1')} super().__init__(files_path, file_time_span, pattern, band=None, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class OpsCloudPhase(AMVFiles): def __init__(self, files_path, file_time_span, pattern='OR_ABI-L2-ACTPF'+'*.nc'): elem_name = None line_name = None lon_name = None lat_name = None out_params = ['Phase'] params = ['Phase'] meta_dict = {'Phase': (None, 'i1')} super().__init__(files_path, file_time_span, pattern, band=None, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dtstr = dtstr[:-3] dto = datetime.datetime.strptime(dtstr, 's%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class OPS(AMVFiles): def __init__(self, files_path, file_time_span, band='14', pattern='OR_ABI-L2-DMWF*'+'C'+'14'+'*.nc'): elem_name = None line_name = None lon_name = 'lon' lat_name = 'lat' params = ['pressure', 'wind_speed', 'wind_direction', 'local_zenith_angle', 'DQF'] out_params = ['Lon', 'Lat', 'Element', 'Line'] + params meta_dict = {'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4'), 'pressure': ('hPa', 'f4'), 'wind_speed': ('m s-1', 'f4'), 'wind_direction': ('degrees', 'f4')} super().__init__(files_path, file_time_span, pattern, band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): # return GEOSNavigation(sub_lon=-75.2) ? return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dtstr = dtstr[:-3] dto = datetime.datetime.strptime(dtstr, 's%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class AMV_Intercompare(AMVFiles): def __init__(self, files_path, file_time_span, band='14', pattern='ASCII_AMV*.CT'): elem_name = None line_name = None lon_name = 'lon' lat_name = 'lat' press_name = 'PW' params = ['BOX','SRCH','SPD','DIR','PW','LLCM','SPDG','DIRG','TBALB','MAXC','TRKM','PERR','HAMD','QINF','QIWF','QIC'] out_params = ['Lon', 'Lat', 'Element', 'Line'] + params meta_dict = {'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4'), 'pressure': ('hPa', 'f4'), 'wind_speed': ('m s-1', 'f4'), 'wind_direction': ('degrees', 'f4')} super().__init__(files_path, file_time_span, pattern, band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, press_name=press_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('.') dtstr = toks[2] + '.' + toks[3] dto = datetime.datetime.strptime(dtstr, '%Y%j.%H%M').replace(tzinfo=timezone.utc) return dto class CarrStereo(AMVFiles): def __init__(self, files_path, file_time_span, band='14', pattern='tdw_qc_GOES*'+'ch_'+'14'+'.nc'): elem_name = 'Element' line_name = 'Line' lon_name = 'Lon' lat_name = 'Lat' out_params = ['Lon', 'Lat', 'Element', 'Line', 'V_3D_u', 'V_3D_v', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen', 'InversionFlag', 'CloudPhase', 'CloudType'] params = ['V_3D', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen', 'InversionFlag', 'CloudPhase', 'CloudType'] meta_dict = {'H_3D': ('m', 'f4'), 'pres': ('hPa', 'f4'), 'Fcst_Spd': ('m s-1', 'f4'), 'Fcst_Dir': ('degree', 'f4'), 'SatZen': ('degree', 'f4'), 'InversionFlag': (None, 'u1'), 'CloudPhase': (None, 'u1'), 'CloudType': (None, 'u1'), 'V_3D_u': ('m s-1', 'f4'), 'V_3D_v': ('m s-1', 'f4'), 'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4')} super().__init__(files_path, file_time_span, pattern, band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-137.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dto = datetime.datetime.strptime(dtstr, '%Y%j.%H%M.ch').replace(tzinfo=timezone.utc) return dto def get_datasource(files_path, source, file_time_span=10, band='14'): if source == 'OPS': return OPS(files_path, file_time_span, band=band) elif source == 'AMV_TXT': return AMV_Intercompare(files_path, file_time_span, band=band) elif source == 'FMWK': return Framework(files_path, file_time_span, band=band) elif source == 'CARR': return CarrStereo(files_path, file_time_span, band=band) elif source == 'FMWK_CLD_HGT': return FrameworkCloudHeight(files_path, file_time_span) elif source == 'FMWK_CLD_PHASE': return FrameworkCloudPhase(files_path, file_time_span) elif source == 'OPS_CLD_PHASE': return OpsCloudPhase(files_path, file_time_span) elif source == 'GFS': return GFSfiles(files_path) elif source == 'RAOB': return RAOBfiles(files_path, file_time_span) else: raise GenericException('Unknown data source type')