-
tomrink authored12bfd222
pireps.py 11.47 KiB
import datetime
from datetime import timezone
import numpy as np
import re
from pathlib import Path
import h5py
NO_ICE = '\s*NEG\s*|\s*NONE\s*|\s*NEGATIVE\s*|\s*NO\s*'
ICE_LVL = '\d+-\d+|FL\d+-FL\d+'
# ICE Intensity: 0:No intensity report, 1:Trace, 2:Light, 3:Light Moderate, 4:Moderate, 5:Moderate Severe, 6:Severe
# NO/NEG -1
# For Binary Observations, NO/NEG: -1, Any Icing: 1
ICE_TRACE = '\s*TRACE\s*|\s*TRC\s*|\s*TR\s*'
ICE_LGT = '\s*LGT\s*|\s*LIGHT\s*|\s*LT\s*|\s*LGHT\s*|\s*LIT\s*|\s*LITE\s*|\s*LG\s*'
ICE_MOD = '\s*MOD\s*|\s*MDT\s*|\s*MODERATE\s*|\s*HEAVY\s*'
ICE_SVR = '\s*SVR\s*|\s*SEV\s*|\s*SEVERE\s*'
FLT_LVL = '/FL\s{0,1}\d+\s*'
ICE_RPT = '/IC'
ATYPE = '/TP'
RMK = '/RM'
def time_sort_pirep_dict(the_dict):
new_dict = {}
keys = []
tidx = []
for ts in list(the_dict.keys()):
rpts = the_dict[ts]
for idx, tup in enumerate(rpts):
keys.append(ts)
tidx.append(idx)
keys = np.array(keys)
tidx = np.array(tidx)
sidxs = np.argsort(keys)
keys = keys[sidxs]
tidx = tidx[sidxs]
for idx, key in enumerate(keys):
rpts = the_dict[key]
tup = rpts[tidx[idx]]
n_rpts = new_dict.get(key)
if n_rpts is None:
n_rpts = []
new_dict[key] = n_rpts
n_rpts.append(tup)
return new_dict
# Returns icing, no_icing (no icing observed), neg-icing (icing expected, but not observed)
# from Iowa Mesonet Archived PIREP reports obtained in csv format
def pirep_icing(filename, lon_range=[-180, 180], lat_range=[-63, 63]):
# HEADER: VALID,URGENT,AIRCRAFT,REPORT,LAT,LON
ice_dict = {}
neg_ice_dict = {}
no_ice_dict = {}
cnt_ice = 0
cnt_no_ice = 0
cnt_neg_ice = 0
with open(filename) as file:
for idx, line in enumerate(file):
toks = line.split(',')
if toks[0] == 'VALID': # Skip headers (concatenated files)
continue
# if len(toks) != 6: # Check for line format error
if len(toks) != 9: # Check for line format error
continue
report = toks[3]
try:
# lat = float(toks[4])
# lon = float(toks[5])
lat = float(toks[7])
lon = float(toks[8])
except Exception:
continue
if lon < lon_range[0] or lon > lon_range[1]:
continue
if lat < lat_range[0] or lat > lat_range[1]:
continue
# Flight level
so = re.search(FLT_LVL, report)
if so is None:
continue
else:
fl = so.group()
fl = (fl[3:].rstrip()).lstrip()
if fl.isnumeric():
fl = float(fl) * 100 * 0.3048 # conv flight level to meters
else:
continue
dto = datetime.datetime.strptime(toks[0], '%Y%m%d%H%M').replace(tzinfo=timezone.utc)
timestmp = dto.timestamp()
# Icing
ii = report.find('/IC')
if ii >= 0:
ice_str = report[ii+3:]
ri = ice_str.find('/RM')
if ri >= 0:
ice_str = ice_str[:ri]
else:
ri = ice_str.find(',')
if ri >= 0:
ice_str = ice_str[:ri]
ice_str = ice_str.lstrip()
if ice_str.find('N/A') >= 0:
continue
if len(re.findall(NO_ICE, ice_str)) != 0:
rpts = neg_ice_dict.get(timestmp)
tup = (lat, lon, fl, -1, cnt_neg_ice, ice_str)
cnt_neg_ice += 1
if rpts is None:
rpts = []
rpts.append(tup)
neg_ice_dict[timestmp] = rpts
else:
rpts.append(tup)
else:
so = re.search(ICE_LVL, ice_str)
if so is not None:
lvl_a, lvl_b = so.group().split('-')
if lvl_a.find('FL') >= 0:
lvl_a = float(lvl_a[2:]) * 100 * 0.3048
lvl_b = float(lvl_b[2:]) * 100 * 0.3048
else:
lvl_a = float(lvl_a) * 100 * 0.3048
lvl_b = float(lvl_b) * 100 * 0.3048
fl = max(lvl_a, lvl_b)
if fl > 15000.0:
continue
# Intensity
I = 0
so = re.search(ICE_TRACE, ice_str)
if so is not None:
I = 1
else:
so = re.search(ICE_LGT, ice_str)
if so is not None:
so = re.search(ICE_MOD, ice_str)
if so is not None:
I = 3
else:
I = 2
else:
so = re.search(ICE_MOD, ice_str)
if so is not None:
so = re.search(ICE_SVR, ice_str)
if so is not None:
I = 5
else:
I = 4
else:
so = re.search(ICE_SVR, ice_str)
if so is not None:
I = 6
rpts = ice_dict.get(timestmp)
tup = (lat, lon, fl, I, cnt_ice, ice_str)
cnt_ice += 1
if rpts is None:
rpts = []
rpts.append(tup)
ice_dict[timestmp] = rpts
else:
rpts.append(tup)
else:
rpts = no_ice_dict.get(timestmp)
tup = (lat, lon, fl, -1, cnt_no_ice, 'no observed icing')
cnt_no_ice += 1
if rpts is None:
rpts = []
rpts.append(tup)
no_ice_dict[timestmp] = rpts
else:
rpts.append(tup)
ice_dict = time_sort_pirep_dict(ice_dict)
no_ice_dict = time_sort_pirep_dict(no_ice_dict)
neg_ice_dict = time_sort_pirep_dict(neg_ice_dict)
return ice_dict, no_ice_dict, neg_ice_dict
# Returns icing, no_icing (no icing observed)
def pirep_icing_boeing_txt(filename, lon_range=[-180, 180], lat_range=[-63, 63]):
# year, month, day, hour, minute, sec, lat, lon, alt, degC, airspeed, icing
ice_dict = {}
no_ice_dict = {}
cnt_ice = 0
cnt_no_ice = 0
with open(filename) as file:
for idx, line in enumerate(file):
toks = line.split(' ')
if len(toks) != 12: # Check for line format error
continue
year = 2000 + int(toks[0])
month = int(toks[1])
day = int(toks[2])
hour = int(toks[3])
minute = int(toks[4])
second = int(toks[5])
try:
lat = float(toks[6])
lon = float(toks[7])
except Exception:
continue
if lon < lon_range[0] or lon > lon_range[1]:
continue
if lat < lat_range[0] or lat > lat_range[1]:
continue
flt_alt = float(toks[8]) * 0.3048 # convert feet to meters
degK = float(toks[9]) + 273.0
spd = float(toks[10])
ice_bool = int(toks[11])
dto = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second)
dto = dto.replace(tzinfo=timezone.utc)
ts = dto.timestamp()
if ice_bool == 1:
tup = (lat, lon, flt_alt, 1, 0, 'None', degK, spd)
rpts = ice_dict.get(ts)
if rpts is None:
rpts = []
rpts.append(tup)
ice_dict[ts] = rpts
else:
print('duplicate: ', tup)
print('duplicate: ', rpts[0])
print('Skip -------------')
# rpts.append(tup)
cnt_ice += 1
else:
tup = (lat, lon, flt_alt, -1, 0, 'None', degK, spd)
rpts = no_ice_dict.get(ts)
if rpts is None:
rpts = []
rpts.append(tup)
no_ice_dict[ts] = rpts
else:
print('duplicate: ', tup)
print('duplicate: ', rpts[0])
print('Skip -------------')
# rpts.append(tup)
cnt_no_ice += 1
print('icing, no icing cnt: ', cnt_ice, cnt_no_ice)
ice_dict = time_sort_pirep_dict(ice_dict)
no_ice_dict = time_sort_pirep_dict(no_ice_dict)
return ice_dict, no_ice_dict, None
def pirep_icing_from_boeing_nc(dirname='/ships19/cloud/scratch/ICING/BOEING/icing/2020/', lon_range=[-180, 180], lat_range=[-63, 63]):
flist = []
for path in Path(dirname).rglob('*.nc'):
flist.append(path)
ice_dict = {}
no_ice_dict = {}
cnt_ice = 0
cnt_no_ice = 0
for fname in flist:
try:
h5f = h5py.File(fname, 'r')
ice_bool = h5f['ice_bool'][:]
lons = h5f['longitude'][:]
lats = h5f['latitude'][:]
years = 2000 + h5f['year'][:]
mons = h5f['month'][:]
days = h5f['day'][:]
hours = h5f['hour'][:]
mins = h5f['minute'][:]
secs = h5f['second'][:]
flt_levs = (h5f['altitude_ft'][:]).astype('float32')
flt_levs *= 0.3048 # convert feet to meters
except Exception as e:
print(e)
h5f.close()
continue
nrpts = ice_bool.shape[0]
if nrpts > 0:
for k in range(nrpts):
if lons[k] < lon_range[0] or lons[k] > lon_range[1]:
continue
if lats[k] < lat_range[0] or lats[k] > lat_range[1]:
continue
dto = datetime.datetime(year=years[k], month=mons[k], day=days[k], hour=hours[k], minute=mins[k], second=secs[k])
dto = dto.replace(tzinfo=timezone.utc)
ts = dto.timestamp()
if ice_bool[k] == 1:
tup = (lats[k], lons[k], flt_levs[k], 1, 0, 'None')
rpts = ice_dict.get(ts)
if rpts is None:
rpts = []
rpts.append(tup)
ice_dict[ts] = rpts
else:
rpts.append(tup)
cnt_ice += 1
else:
tup = (lats[k], lons[k], flt_levs[k], -1, 0, 'None')
rpts = no_ice_dict.get(ts)
if rpts is None:
rpts = []
rpts.append(tup)
no_ice_dict[ts] = rpts
else:
rpts.append(tup)
cnt_no_ice += 1
h5f.close()
print('num icing/no icing: ', cnt_ice, cnt_no_ice)
ice_dict = time_sort_pirep_dict(ice_dict)
no_ice_dict = time_sort_pirep_dict(no_ice_dict)
return ice_dict, no_ice_dict, None