pireps.py 4.34 KiB
import datetime
from datetime import timezone
import re
import numpy as np
from icing.pirep_goes import get_goes_datasource
NO_ICE = '\s*NEG\s*|\s*NONE\s*|\s*NEGATIVE\s*|\s*NO\s*'
ICE_LVL = '\d+-\d+|FL\d+-FL\d+'
FLT_LVL = '/FL\s{0,1}\d+\s*'
ICE_RPT = '/IC'
ATYPE = '/TP'
RMK = '/RM'
# Returns icing/non-icing
def pirep_icing(filename, lon_range=[-180, 180], lat_range=[-55, 55]):
no_ice_reports = []
ice_reports = []
# HEADER: VALID,URGENT,AIRCRAFT,REPORT,LAT,LON
ice_dict = {}
no_ice_dict = {}
with open(filename) as file:
for idx, line in enumerate(file):
toks = line.split(',')
if toks[0] == 'VALID': # Skip headers (concatenated files)
continue
if len(toks) != 6: # Check for line format error
continue
report = toks[3]
lat = float(toks[4])
lon = float(toks[5])
if lon < lon_range[0] or lon > lon_range[1]:
continue
if lat < lat_range[0] or lat > lat_range[1]:
continue
# Flight level
so = re.search(FLT_LVL, report)
if so is None:
continue
else:
fl = so.group()
fl = (fl[3:].rstrip()).lstrip()
if fl.isnumeric():
fl = float(fl) * 100 * 0.3048 # conv flight level to meters
else:
continue
# Icing
ii = report.find('/IC')
if ii >= 0:
ice_str = report[ii+3:]
ri = ice_str.find('/RM')
if ri >= 0:
ice_str = ice_str[:ri]
else:
ri = ice_str.find(',')
if ri >= 0:
ice_str = ice_str[:ri]
ice_str = ice_str.lstrip()
if ice_str.find('N/A') >= 0:
continue
dto = datetime.datetime.strptime(toks[0], '%Y%m%d%H%M').replace(tzinfo=timezone.utc)
timestmp = dto.timestamp()
if len(re.findall(NO_ICE, ice_str)) != 0:
no_ice_reports.append(ice_str)
rpts = no_ice_dict.get(timestmp)
tup = (lat, lon, fl, ice_str)
if rpts is None:
rpts = []
rpts.append(tup)
no_ice_dict[timestmp] = rpts
else:
rpts.append(tup)
else:
ice_reports.append(ice_str)
so = re.search(ICE_LVL, ice_str)
if so is not None:
lvl_a, lvl_b = so.group().split('-')
if lvl_a.find('FL') >= 0:
lvl_a = float(lvl_a[2:]) * 100 * 0.3048
lvl_b = float(lvl_b[2:]) * 100 * 0.3048
else:
lvl_a = float(lvl_a) * 100 * 0.3048
lvl_b = float(lvl_b) * 100 * 0.3048
fl = max(lvl_a, lvl_b)
if fl > 15000.0:
continue
rpts = ice_dict.get(timestmp)
tup = (lat, lon, fl, ice_str)
if rpts is None:
rpts = []
rpts.append(tup)
ice_dict[timestmp] = rpts
else:
rpts.append(tup)
return ice_dict, no_ice_dict
def analyze(ice_dct, no_ice_dct):
ice_times = np.array(list(ice_dct.keys()))
no_ice_times = np.array(list(no_ice_dct.keys()))
dt_str = '201601010000'
dto = datetime.datetime.strptime(dt_str, '%Y%m%d%H%M').replace(tzinfo=timezone.utc)
t0 = dto.timestamp()
dt_str = '201906300000'
dto = datetime.datetime.strptime(dt_str, '%Y%m%d%H%M').replace(tzinfo=timezone.utc)
t1 = dto.timestamp()
tline = np.arange(t0, t1, 600)
hist_a, edges = np.histogram(no_ice_times, bins=tline)
print(len(np.nonzero(hist_a)[0]))
hist_b, edges = np.histogram(ice_times, bins=tline)
print(len(np.nonzero(hist_b)[0]))
print(np.sum(np.logical_and(hist_a > 0, hist_b > 0)))
for ts in list(no_ice_dct.keys()):
ds = get_goes_datasource(ts)
goes_file = ds.get_file(ts)[0]
print(goes_file)