from util import BaseCheckList, annotate_all, invalidate_records, update_variable_qc import numpy as np import pandas as pd def find_bb_outliers(frame, parameters, bb): if not np.in1d(['{}bottomTemp'.format(bb),'{}apexTemp'.format(bb),'{}topTemp'.format(bb)], frame.columns).all(): return frame window_length = parameters.get('window_length', 100) bbb_outliers = _find_6sigma_outliers(frame['{}bottomTemp'.format(bb)], window_length) bba_outliers = _find_6sigma_outliers(frame['{}apexTemp'.format(bb)], window_length) bbt_outliers = _find_6sigma_outliers(frame['{}topTemp'.format(bb)], window_length) variable_qcs = pd.DataFrame({ 'qc_{}bottomTemp'.format(bb) : bbb_outliers * 1, 'qc_{}topTemp'.format(bb) : bbt_outliers * 1, 'qc_{}apexTemp'.format(bb) : bba_outliers * 1 }, index=frame.index) frame = update_variable_qc(frame, variable_qcs) annotate_all(frame, bbb_outliers, '{} bottom temperature outlier'.format(bb)) annotate_all(frame, bba_outliers, '{} apex temperature outlier'.format(bb)) annotate_all(frame, bbt_outliers, '{} top temperature outlier'.format(bb)) frame['{}_temp_outlier_check'.format(bb.lower())] = ( bbb_outliers | bba_outliers | bbt_outliers ) * 1 frame = invalidate_records(frame, '{}_temp_outlier_check'.format(bb.lower())) return frame def hbb_temp_outlier_check(frame, parameters): return find_bb_outliers(frame, parameters, 'HBB') def abb_temp_outlier_check(frame, parameters): return find_bb_outliers(frame, parameters, 'ABB') def calibrationambienttemp_outlier_check(frame, parameters): if 'calibrationAmbientTemp' not in frame.columns: return frame window_length = parameters.get('window_length', 100) temp_outliers = _find_6sigma_outliers(frame['calibrationAmbientTemp'], window_length, use_mean=True) frame['calibrationambienttemp_outlier_check'] = temp_outliers * 1 frame = update_variable_qc(frame, pd.DataFrame({'qc_calibrationAmbientTemp':temp_outliers*1}, index=frame.index)) annotate_all(frame, temp_outliers, 'calibrationAmbientTemp outlier') return frame class CheckList(BaseCheckList): checks = [ hbb_temp_outlier_check , abb_temp_outlier_check, calibrationambienttemp_outlier_check ] def _compute_robust_zscore(frame, window_length, use_mean=False): use_mean = False if use_mean: robust_rolling_std = frame.rolling(window=window_length, center=True, min_periods=1).std() return abs((frame - frame.rolling(window=window_length, center=True, min_periods=1).mean()) / robust_rolling_std) else: # Compute a centered rolling MAD over window_length rolling_mad = abs(frame - frame.rolling(window=window_length, center=True, min_periods=1).median() ).rolling(window=window_length, center=True, min_periods=1).median() # standard deviation is proportional to median absolute deviation I'm told robust_rolling_std = rolling_mad * 1.48 return abs((frame - frame.rolling(window=window_length, center=True, min_periods=1).median()) / robust_rolling_std) def _find_6sigma_outliers(frame, window_length, use_mean=False): # Find outliers with deviation greater than 6 sigma outlier_mask = _compute_robust_zscore(frame, window_length, use_mean) > 6 return outlier_mask #### TESTS #### def test_hbb_temp_outlier_check(): frame = pd.DataFrame({ 'HBBapexTemp':[0,1,10,1], 'HBBbottomTemp':[1,1,1,1], 'HBBtopTemp':[0,1,10,1], 'qc_notes':'', 'sceneMirrorPosition':[ord(x) for x in 'HASA'] }) assert hbb_temp_outlier_check(frame, {})['hbb_temp_outlier_check'].values.tolist() == [0,0,1,0] def test_abb_temp_outlier_check(): frame = pd.DataFrame({ 'ABBapexTemp':[0,1,10,1], 'ABBbottomTemp':[1,1,1,1], 'ABBtopTemp':[0,1,10,1], 'qc_notes':'', 'sceneMirrorPosition':[ord(x) for x in 'HASA'] }) assert abb_temp_outlier_check(frame, {})['abb_temp_outlier_check'].values.tolist() == [0,0,1,0] def test_calibrationambienttemp_temp_outlier_check(): frame = pd.DataFrame({ 'calibrationAmbientTemp':[0,1,10,1], 'qc_notes':'', 'sceneMirrorPosition':[ord(x) for x in 'HASA'] }) assert calibrationambienttemp_outlier_check(frame, {})['calibrationambienttemp_outlier_check'].values.tolist() == [0,0,1,0]