from util import BaseCheckList, annotate_all, invalidate_records, update_variable_qc, _compute_robust_zscore, _compute_robust_rate_zscore import numpy as np import pandas as pd import itertools electronic_checks = BaseCheckList() def find_bb_outliers(frame, parameters, bb): window_length = parameters.get('window_length', 50) if bb == 'HBB': delta_thresh = .001 bbb_outliers = np.array(list(_scan_for_outliers(frame, '{}bottomTemp'.format(bb), delta_thresh))) bba_outliers = np.array(list(_scan_for_outliers(frame, '{}apexTemp'.format(bb), delta_thresh))) bbt_outliers = np.array(list(_scan_for_outliers(frame, '{}topTemp'.format(bb), delta_thresh))) else: delta_thresh = .01 bbb_outliers = np.array(list(_scan_for_outliers(frame, '{}bottomTemp'.format(bb), delta_thresh))) bba_outliers = np.array(list(_scan_for_outliers(frame, '{}apexTemp'.format(bb), delta_thresh))) bbt_outliers = np.array(list(_scan_for_outliers(frame, '{}topTemp'.format(bb), delta_thresh))) variable_qcs = pd.DataFrame({ 'qc_{}bottomTemp'.format(bb) : bbb_outliers * 1, 'qc_{}topTemp'.format(bb) : bbt_outliers * 1, 'qc_{}apexTemp'.format(bb) : bba_outliers * 1 }, index=frame.index) frame = update_variable_qc(frame, variable_qcs) annotate_all(frame, bbb_outliers, '{} bottom temperature outlier'.format(bb)) annotate_all(frame, bba_outliers, '{} apex temperature outlier'.format(bb)) annotate_all(frame, bbt_outliers, '{} top temperature outlier'.format(bb)) frame['{}_temp_outlier_check'.format(bb.lower())] = ( bbb_outliers | bba_outliers | bbt_outliers ) * 1 frame = invalidate_records(frame, '{}_temp_outlier_check'.format(bb.lower())) return frame @electronic_checks.check( depends=[ 'HBBapexTemp', 'HBBbottomTemp', 'HBBtopTemp' ], updates=['HBBbottomTemp', 'HBBapexTemp','HBBtopTemp'] ) def hbb_temp_outlier_check(frame, parameters): return find_bb_outliers(frame, parameters, 'HBB') @electronic_checks.check( depends=[ 'ABBapexTemp', 'ABBbottomTemp', 'ABBtopTemp' ], updates=['ABBbottomTemp', 'ABBapexTemp','ABBtopTemp'] ) def abb_temp_outlier_check(frame, parameters): return find_bb_outliers(frame, parameters, 'ABB') def _find_6sigma_outliers(frame, window_length, estimation_func=_compute_robust_zscore): # Find outliers with deviation greater than 6 sigma return estimation_func(frame, window_length) > 6 def _scan_for_outliers(frame, variable, delta_thresh): last = None for i, row in frame.iterrows(): if last is None: yield False last = row else: time_diff = (row.datetime - last.datetime).total_seconds() variable_diff = row[variable] - last[variable] if abs(variable_diff / time_diff) < delta_thresh: yield False last = row else: yield True #### TESTS #### def test_hbb_temp_outlier_check(): frame = pd.DataFrame({ 'HBBapexTemp':[0,1,10,1], 'HBBbottomTemp':[1,1,1,1], 'HBBtopTemp':[0,1,10,1], 'qc_notes':'', 'sceneMirrorPosition':[ord(x) for x in 'HASA'] }) assert hbb_temp_outlier_check(frame, {})['hbb_temp_outlier_check'].values.tolist() == [0,0,1,0]