Skip to content
Snippets Groups Projects
util.py 4.08 KiB
Newer Older
from itertools import takewhile
import numpy as np
import pandas as pd
def _compute_robust_zscore(frame, window_length):
    median_values = frame.rolling(window=window_length, center=True, min_periods=1).median()
    # Compute the MAD
    mad = abs(frame - median_values).median()
    # standard deviation is proportional to median absolute deviation I'm told
    robust_std = mad * 1.48
    # compute the Mahalanobis distance from rolling median
    return abs((frame - median_values) / robust_std)

def _compute_robust_rate_zscore(frame, window_length=None):
    time_diffs = pd.Series((frame.index.values[1:] - frame.index.values[:-1]).astype(np.int64),
                           index=frame.index[1:])
    changes = frame.diff() / time_diffs
    mad_diff = abs(changes - changes.median()).median() * 1.48
    return abs(frame.diff() / time_diffs)

Coda Phillips's avatar
Coda Phillips committed
def annotate(frame, loc, annotation):
    notes = frame.loc[loc, 'qc_notes']
    if type(notes) == str and len(notes) > 0:
        frame.loc[loc, 'qc_notes'] = ','.join([notes, annotation])
    else:
        frame.loc[loc, 'qc_notes'] = annotation

def update_variable_qc(frame, variable_qcs):

    def proba_update(x,y,conversion=False):
        try:
            return x+y-x*y
        except TypeError:
            return x
    return frame.drop('qc_notes', axis=1).combine(variable_qcs, proba_update, fill_value=0).combine_first(frame[['qc_notes']])

Coda Phillips's avatar
Coda Phillips committed
def annotate_all(frame, mask, annotation):
    for loc in frame.index[mask]:
        annotate(frame, loc, annotation)

def invalidate_records(frame, check_name):
    for index,percent in frame.ix[frame[check_name] > 0, check_name].iteritems():
        invalidate_record(frame, index, check_name, percent)
    return frame
def invalidate_record(frame, loc, check_name, value, annotation=''):
    frame.loc[loc, check_name] = value
Coda Phillips's avatar
Coda Phillips committed
    if annotation:
        if 'qc_notes' not in frame:
            frame['qc_notes'] = None
Coda Phillips's avatar
Coda Phillips committed
        annotate(frame, loc, annotation)
    corrupt_view = frame.loc[loc,'sceneMirrorPosition']
    if corrupt_view in [ord('H'),ord('A')]:
Coda Phillips's avatar
Coda Phillips committed
    
        def invalidate_neighbor(neighbor):
            if frame.sceneMirrorPosition.loc[neighbor] == corrupt_view:
Coda Phillips's avatar
Coda Phillips committed
                # Made one cycle, break
                return True
            elif frame.sceneMirrorPosition.loc[neighbor] in [ord('H'), ord('A')]:
Coda Phillips's avatar
Coda Phillips committed
                # Skip opposite calibration views
                return
            else:
                # Invalidate non-calibration views
                frame.loc[neighbor,check_name] = value
Coda Phillips's avatar
Coda Phillips committed
                annotate(frame, neighbor, 'invalid calibration:{:d}'.format(loc))

        # Corrupt calibration view, must also invalidate neighboring scenes
        # _idx is the iloc
        _idx = frame.index.tolist().index(loc) + 1
        while _idx < len(frame):
            if invalidate_neighbor(frame.index[_idx]):
                break
Coda Phillips's avatar
Coda Phillips committed
            _idx += 1
        # _idx is the iloc
        _idx = frame.index.tolist().index(loc) - 1
        while _idx >= 0:
            if invalidate_neighbor(frame.index[_idx]):
                break
Coda Phillips's avatar
Coda Phillips committed
            _idx -= 1

    return frame

class BaseCheckList:

    def __init__(self, *args, **kwargs):
        self.check_results = {}
        self.parameters = {}

    def set_params(self, parameters):
        self.parameters = parameters

    def update_qc_percent(self, frame):
        for check_func in self.checks:
            name = check_func.__name__
            if name in frame.columns:
                results = frame[name].fillna(0)
                # Compute P(A U B)
                previous_percent = frame['qc_percent']
                frame['qc_percent'] = previous_percent + results - previous_percent*results
        return frame

    def compute(self, frame):
Coda Phillips's avatar
Coda Phillips committed
        # Filter bad records from previous level
        filtered_frame = frame.ix[frame.qc_percent < 1].copy()
        for check in self.checks:
            original_shape = filtered_frame.shape
Coda Phillips's avatar
Coda Phillips committed
            filtered_frame = check(filtered_frame, self.parameters)
            assert filtered_frame.shape[0] == original_shape[0]
        return self.update_qc_percent(filtered_frame.combine_first(frame))