import datetime, os from datetime import timezone import glob import numpy as np from aeolus.geos_nav import GEOSNavigation from util.util import GenericException class AMVFiles: def __init__(self, files_path, file_time_span, pattern, band='14', elem_name=None, line_name=None, lat_name=None, lon_name=None, params=None, out_params=None, meta_dict=None, qc_params=None): self.flist = glob.glob(files_path + pattern) if len(self.flist) == 0: raise GenericException('no matching files found in: ' + files_path) self.band = band self.ftimes = [] self.span_seconds = datetime.timedelta(minutes=file_time_span).seconds for pname in self.flist: dto = self.get_datetime(pname) dto_start = dto dto_end = dto + datetime.timedelta(minutes=file_time_span) self.ftimes.append((dto_start.timestamp(), dto_end.timestamp())) self.ftimes = np.array(self.ftimes) self.flist = np.array(self.flist) sidxs = np.argsort(self.ftimes[:, 0]) self.ftimes = self.ftimes[sidxs, :] self.flist = self.flist[sidxs] self.elem_name = elem_name self.line_name = line_name self.lon_name = lon_name self.lat_name = lat_name self.params = params self.out_params = params if out_params is not None: self.out_params = out_params self.meta_dict = meta_dict self.qc_params = qc_params def get_datetime(self, pathname): pass def get_navigation(self): pass def get_file_containing_time(self, timestamp): k = -1 for i in range(len(self.ftimes.shape[0])): if (timestamp >= self.ftimes[i, 0]) and (timestamp < self.ftimes[i, 1]): k = i break if k < 0: return None, None, None return self.flist[k], self.ftimes[k, 0], k def get_file(self, timestamp): diff = self.ftimes[:, 0] - timestamp midx = np.argmin(np.abs(diff)) if np.abs(self.ftimes[midx, 0] - timestamp) < self.span_seconds: return self.flist[midx], self.ftimes[midx, 0], midx else: return None, None, None def get_parameters(self): return self.params def get_out_parameters(self): return self.out_params def get_meta_dict(self): return self.meta_dict def get_qc_params(self): return self.qc_params def filter(self, qc_param): pass class Framework(AMVFiles): def __init__(self, files_path, file_time_span, band='14'): elem_name = 'Element' line_name = 'Line' lon_name = 'Longitude' lat_name = 'Latitude' params = ['MedianPress', 'Wind_Speed', 'Wind_Dir'] out_params = ['Lon', 'Lat', 'Element', 'Line', 'pressure', 'wind_speed', 'wind_direction'] meta_dict = {'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4'), 'pressure': ('hPa', 'f4'), 'wind_speed': ('m s-1', 'f4'), 'wind_direction': ('degrees', 'f4')} qc_param = 'QI' super().__init__(files_path, file_time_span, '*WINDS_AMV_EN-'+band+'*.nc', band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict, qc_params=qc_param) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto def filter(self, qc_param): good = qc_param > 50 return good class FrameworkCloudHeight(AMVFiles): def __init__(self, files_path, file_time_span): elem_name = 'Element' line_name = 'Line' lon_name = 'Longitude' lat_name = 'Latitude' out_params = ['CldTopPres', 'CldTopHght', 'CldOptDpth'] params = ['CldTopPres', 'CldTopHght', 'CldOptDpth'] meta_dict = {'CldTopPres': ('hPa', 'f4'), 'CldTopHght': ('km', 'f4'), 'CldOptDpth': ('km', 'f4')} super().__init__(files_path, file_time_span, '*_CLOUD_HEIGHT_EN'+'*.nc', band=None, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class FrameworkCloudPhase(AMVFiles): def __init__(self, files_path, file_time_span): elem_name = 'Element' line_name = 'Line' lon_name = 'Longitude' lat_name = 'Latitude' out_params = ['CloudPhase', 'CloudType'] params = ['CloudPhase', 'CloudType'] meta_dict = {'CloudPhase': (None, 'i1'), 'CloudType': (None, 'i1')} super().__init__(files_path, file_time_span, '*_CLOUD_PHASE_EN'+'*.nc', band=None, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dstr = toks[4] tstr = toks[5] dtstr = dstr + tstr dto = datetime.datetime.strptime(dtstr, '%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class OpsCloudPhase(AMVFiles): def __init__(self, files_path, file_time_span): elem_name = None line_name = None lon_name = None lat_name = None out_params = ['Phase'] params = ['Phase'] meta_dict = {'Phase': (None, 'i1')} super().__init__(files_path, file_time_span, 'OR_ABI-L2-ACTPF'+'*.nc', band=None, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dtstr = dtstr[:-3] dto = datetime.datetime.strptime(dtstr, 's%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class OPS(AMVFiles): def __init__(self, files_path, file_time_span, band='14'): elem_name = None line_name = None lon_name = 'lon' lat_name = 'lat' out_params = ['Lon', 'Lat', 'Element', 'Line', 'pressure', 'wind_speed', 'wind_direction'] params = ['pressure', 'wind_speed', 'wind_direction'] meta_dict = {'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4'), 'pressure': ('hPa', 'f4'), 'wind_speed': ('m s-1', 'f4'), 'wind_direction': ('degrees', 'f4')} super().__init__(files_path, file_time_span, 'OR_ABI-L2-DMWF*'+'C'+band+'*.nc', band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): # return GEOSNavigation(sub_lon=-75.2) ? return GEOSNavigation(sub_lon=-75.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dtstr = dtstr[:-3] dto = datetime.datetime.strptime(dtstr, 's%Y%j%H%M').replace(tzinfo=timezone.utc) return dto class CarrStereo(AMVFiles): def __init__(self, files_path, file_time_span, band='14'): elem_name = 'Element' line_name = 'Line' lon_name = 'Lon' lat_name = 'Lat' out_params = ['Lon', 'Lat', 'Element', 'Line', 'V_3D_u', 'V_3D_v', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen', 'InversionFlag', 'CloudPhase', 'CloudType'] params = ['V_3D', 'H_3D', 'pres', 'Fcst_Spd', 'Fcst_Dir', 'SatZen', 'InversionFlag', 'CloudPhase', 'CloudType'] meta_dict = {'H_3D': ('m', 'f4'), 'pres': ('hPa', 'f4'), 'Fcst_Spd': ('m s-1', 'f4'), 'Fcst_Dir': ('degree', 'f4'), 'SatZen': ('degree', 'f4'), 'InversionFlag': (None, 'u1'), 'CloudPhase': (None, 'u1'), 'CloudType': (None, 'u1'), 'V_3D_u': ('m s-1', 'f4'), 'V_3D_v': ('m s-1', 'f4'), 'Lon': ('degrees east', 'f4'), 'Lat': ('degrees north', 'f4'), 'Element': (None, 'i4'), 'Line': (None, 'i4')} super().__init__(files_path, file_time_span, 'tdw_qc_GOES*'+'ch_'+band+'.nc', band=band, elem_name=elem_name, params=params, line_name=line_name, lat_name=lat_name, lon_name=lon_name, out_params=out_params, meta_dict=meta_dict) def get_navigation(self): return GEOSNavigation(sub_lon=-137.0) def get_datetime(self, pathname): fname = os.path.split(pathname)[1] toks = fname.split('_') dtstr = toks[3] dto = datetime.datetime.strptime(dtstr, '%Y%j.%H%M.ch').replace(tzinfo=timezone.utc) return dto def get_datasource(files_path, file_time_span, source, band='14'): if source == 'OPS': return OPS(files_path, file_time_span, band=band) elif source == 'FMWK': return Framework(files_path, file_time_span, band=band) elif source == 'CARR': return CarrStereo(files_path, file_time_span, band=band) elif source == 'FMWK_CLD_HGT': return FrameworkCloudHeight(files_path, file_time_span) elif source == 'FMWK_CLD_PHASE': return FrameworkCloudPhase(files_path, file_time_span) elif source == 'OPS_CLD_PHASE': return OpsCloudPhase(files_path, file_time_span) else: raise GenericException('Unknown data source type')