Skip to content
Snippets Groups Projects
Select Git revision
  • 86d88e1999fd19f4b4872ddfee5e55048acf94da
  • master default protected
  • use_flight_altitude
  • distribute
4 results

amv_raob.py

Blame
  • pireps.py 5.43 KiB
    import datetime
    from datetime import timezone
    import re
    
    NO_ICE = '\s*NEG\s*|\s*NONE\s*|\s*NEGATIVE\s*|\s*NO\s*'
    ICE_LVL = '\d+-\d+|FL\d+-FL\d+'
    
    # ICE Intensity: 0:No intensity report, 1:Trace, 2:Light, 3:Light Moderate, 4:Moderate, 5:Moderate Severe, 6:Severe
    # NO/NEG -1
    ICE_TRACE = '\s*TRACE\s*|\s*TRC\s*|\s*TR\s*'
    ICE_LGT = '\s*LGT\s*|\s*LIGHT\s*|\s*LT\s*|\s*LGHT\s*|\s*LIT\s*|\s*LITE\s*|\s*LG\s*'
    ICE_MOD = '\s*MOD\s*|\s*MDT\s*|\s*MODERATE\s*|\s*HEAVY\s*'
    ICE_SVR = '\s*SVR\s*|\s*SEV\s*|\s*SEVERE\s*'
    
    FLT_LVL = '/FL\s{0,1}\d+\s*'
    ICE_RPT = '/IC'
    ATYPE = '/TP'
    RMK = '/RM'
    
    
    # Returns icing, no_icing (no icing observed), neg-icing (icing expected, but not observed)
    def pirep_icing(filename, lon_range=[-180, 180], lat_range=[-63, 63]):
    
        # HEADER: VALID,URGENT,AIRCRAFT,REPORT,LAT,LON
    
        ice_dict = {}
        neg_ice_dict = {}
        no_ice_dict = {}
    
        cnt_ice = 0
        cnt_no_ice = 0
        cnt_neg_ice = 0
    
        with open(filename) as file:
            for idx, line in enumerate(file):
                toks = line.split(',')
                if toks[0] == 'VALID':  # Skip headers (concatenated files)
                    continue
                if len(toks) != 6:  # Check for line format error
                    continue
    
                report = toks[3]
                try:
                    lat = float(toks[4])
                    lon = float(toks[5])
                except Exception:
                    continue
    
                if lon < lon_range[0] or lon > lon_range[1]:
                    continue
                if lat < lat_range[0] or lat > lat_range[1]:
                    continue
    
                # Flight level
                so = re.search(FLT_LVL, report)
                if so is None:
                    continue
                else:
                    fl = so.group()
                    fl = (fl[3:].rstrip()).lstrip()
                    if fl.isnumeric():
                        fl = float(fl) * 100 * 0.3048  # conv flight level to meters
                    else:
                        continue
    
                dto = datetime.datetime.strptime(toks[0], '%Y%m%d%H%M').replace(tzinfo=timezone.utc)
                timestmp = dto.timestamp()
    
                # Icing
                ii = report.find('/IC')
                if ii >= 0:
                    ice_str = report[ii+3:]
                    ri = ice_str.find('/RM')
    
                    if ri >= 0:
                        ice_str = ice_str[:ri]
                    else:
                        ri = ice_str.find(',')
                        if ri >= 0:
                            ice_str = ice_str[:ri]
                    ice_str = ice_str.lstrip()
                    if ice_str.find('N/A') >= 0:
                        continue
    
                    if len(re.findall(NO_ICE, ice_str)) != 0:
                        rpts = neg_ice_dict.get(timestmp)
                        tup = (lat, lon, fl, -1, cnt_neg_ice, ice_str)
                        cnt_neg_ice += 1
                        if rpts is None:
                            rpts = []
                            rpts.append(tup)
                            neg_ice_dict[timestmp] = rpts
                        else:
                            rpts.append(tup)
                    else:
                        so = re.search(ICE_LVL, ice_str)
                        if so is not None:
                            lvl_a, lvl_b = so.group().split('-')
                            if lvl_a.find('FL') >= 0:
                                lvl_a = float(lvl_a[2:]) * 100 * 0.3048
                                lvl_b = float(lvl_b[2:]) * 100 * 0.3048
                            else:
                                lvl_a = float(lvl_a) * 100 * 0.3048
                                lvl_b = float(lvl_b) * 100 * 0.3048
    
                            fl = max(lvl_a, lvl_b)
    
                        if fl > 15000.0:
                            continue
    
                        # Intensity
                        I = 0
                        so = re.search(ICE_TRACE, ice_str)
                        if so is not None:
                            I = 1
                        else:
                            so = re.search(ICE_LGT, ice_str)
                            if so is not None:
                                so = re.search(ICE_MOD, ice_str)
                                if so is not None:
                                    I = 3
                                else:
                                    I = 2
                            else:
                                so = re.search(ICE_MOD, ice_str)
                                if so is not None:
                                    so = re.search(ICE_SVR, ice_str)
                                    if so is not None:
                                        I = 5
                                    else:
                                        I = 4
                                else:
                                    so = re.search(ICE_SVR, ice_str)
                                    if so is not None:
                                        I = 6
    
                        rpts = ice_dict.get(timestmp)
                        tup = (lat, lon, fl, I, cnt_ice, ice_str)
                        cnt_ice += 1
                        if rpts is None:
                            rpts = []
                            rpts.append(tup)
                            ice_dict[timestmp] = rpts
                        else:
                            rpts.append(tup)
                else:
                    rpts = no_ice_dict.get(timestmp)
                    tup = (lat, lon, fl, -1, cnt_no_ice, 'no observed icing')
                    cnt_no_ice += 1
                    if rpts is None:
                        rpts = []
                        rpts.append(tup)
                        no_ice_dict[timestmp] = rpts
                    else:
                        rpts.append(tup)
    
        return ice_dict, no_ice_dict, neg_ice_dict