from itertools import takewhile import numpy as np import pandas as pd def _compute_robust_zscore(frame, window_length): median_values = frame.rolling(window=window_length, center=True, min_periods=1).median() # Compute the MAD mad = abs(frame - median_values).median() # standard deviation is proportional to median absolute deviation I'm told robust_std = mad * 1.48 # compute the Mahalanobis distance from rolling median return abs((frame - median_values) / robust_std) def _compute_robust_rate_zscore(frame, window_length=None): time_diffs = pd.Series((frame.index.values[1:] - frame.index.values[:-1]).astype(np.int64), index=frame.index[1:]) changes = frame.diff() / time_diffs mad_diff = abs(changes - changes.median()).median() * 1.48 return abs(frame.diff() / time_diffs) def annotate(frame, loc, annotation): notes = frame.loc[loc, 'qc_notes'] if type(notes) == str and len(notes) > 0: frame.loc[loc, 'qc_notes'] = ','.join([notes, annotation]) else: frame.loc[loc, 'qc_notes'] = annotation def update_variable_qc(frame, variable_qcs): def proba_update(x,y,conversion=False): try: return x+y-x*y except TypeError: return x return frame.drop('qc_notes', axis=1).combine(variable_qcs, proba_update, fill_value=0).combine_first(frame[['qc_notes']]) def annotate_all(frame, mask, annotation): for loc in frame.index[mask]: annotate(frame, loc, annotation) def invalidate_records(frame, check_name): for index,percent in frame.ix[frame[check_name] > 0, check_name].iteritems(): invalidate_record(frame, index, check_name, percent) return frame def invalidate_record(frame, loc, check_name, value, annotation=''): frame.loc[loc, check_name] = value if annotation: if 'qc_notes' not in frame: frame['qc_notes'] = None annotate(frame, loc, annotation) corrupt_view = frame.loc[loc,'sceneMirrorPosition'] if corrupt_view in [ord('H'),ord('A')]: def invalidate_neighbor(neighbor): if frame.sceneMirrorPosition.loc[neighbor] == corrupt_view: # Made one cycle, break return True elif frame.sceneMirrorPosition.loc[neighbor] in [ord('H'), ord('A')]: # Skip opposite calibration views return else: # Invalidate non-calibration views frame.loc[neighbor,check_name] = value annotate(frame, neighbor, 'invalid calibration:{:d}'.format(loc)) # Corrupt calibration view, must also invalidate neighboring scenes # _idx is the iloc _idx = frame.index.tolist().index(loc) + 1 while _idx < len(frame): if invalidate_neighbor(frame.index[_idx]): break _idx += 1 # _idx is the iloc _idx = frame.index.tolist().index(loc) - 1 while _idx >= 0: if invalidate_neighbor(frame.index[_idx]): break _idx -= 1 return frame class BaseCheckList: def __init__(self, *args, **kwargs): self.check_results = {} self.parameters = {} def set_params(self, parameters): self.parameters = parameters def update_qc_percent(self, frame): for check_func in self.checks: name = check_func.__name__ if name in frame.columns: results = frame[name].fillna(0) # Compute P(A U B) previous_percent = frame['qc_percent'] frame['qc_percent'] = previous_percent + results - previous_percent*results return frame def compute(self, frame): # Filter bad records from previous level filtered_frame = frame.ix[frame.qc_percent < 1].copy() for check in self.checks: original_shape = filtered_frame.shape filtered_frame = check(filtered_frame, self.parameters) assert filtered_frame.shape[0] == original_shape[0] return self.update_qc_percent(filtered_frame.combine_first(frame))