Skip to content
Snippets Groups Projects
util.py 3.32 KiB
from itertools import takewhile
import numpy as np
import pandas as pd

def annotate(frame, loc, annotation):
    notes = frame.loc[loc, 'qc_notes']
    if type(notes) == str and len(notes) > 0:
        frame.loc[loc, 'qc_notes'] = ','.join([notes, annotation])
    else:
        frame.loc[loc, 'qc_notes'] = annotation

def update_variable_qc(frame, variable_qcs):

    def proba_update(x,y,conversion=False):
        try:
            return x+y-x*y
        except TypeError:
            return x
    return frame.drop('qc_notes', axis=1).combine(variable_qcs, proba_update, fill_value=0).combine_first(frame[['qc_notes']])

def annotate_all(frame, mask, annotation):
    for loc in frame.index[mask]:
        annotate(frame, loc, annotation)

def invalidate_records(frame, check_name):
    for index,percent in frame.ix[frame[check_name] > 0, check_name].iteritems():
        invalidate_record(frame, index, check_name, percent)
    return frame

def invalidate_record(frame, loc, check_name, value, annotation=''):
    frame.loc[loc, check_name] = value
    if annotation:
        if 'qc_notes' not in frame:
            frame['qc_notes'] = None
        annotate(frame, loc, annotation)

    corrupt_view = frame.loc[loc,'sceneMirrorPosition']
    if corrupt_view in [ord('H'),ord('A')]:
    
        def invalidate_neighbor(neighbor):
            if frame.sceneMirrorPosition.loc[neighbor] == corrupt_view:
                # Made one cycle, break
                return True
            elif frame.sceneMirrorPosition.loc[neighbor] in [ord('H'), ord('A')]:
                # Skip opposite calibration views
                return
            else:
                # Invalidate non-calibration views
                frame.loc[neighbor,check_name] = value
                annotate(frame, neighbor, 'invalid calibration:{:d}'.format(loc))

        # Corrupt calibration view, must also invalidate neighboring scenes
        # _idx is the iloc
        _idx = frame.index.tolist().index(loc) + 1
        while _idx < len(frame):
            if invalidate_neighbor(frame.index[_idx]):
                break
            _idx += 1

        # _idx is the iloc
        _idx = frame.index.tolist().index(loc) - 1
        while _idx >= 0:
            if invalidate_neighbor(frame.index[_idx]):
                break
            _idx -= 1

    return frame

class BaseCheckList:

    def __init__(self, *args, **kwargs):
        self.check_results = {}
        self.parameters = {}

    def set_params(self, parameters):
        self.parameters = parameters

    def update_qc_percent(self, frame):
        for check_func in self.checks:
            name = check_func.__name__
            if name in frame.columns:
                results = frame[name].fillna(0)
                # Compute P(A U B)
                previous_percent = frame['qc_percent']
                frame['qc_percent'] = previous_percent + results - previous_percent*results
        return frame

    def compute(self, frame):
        # Filter bad records from previous level
        filtered_frame = frame.copy()#.ix[frame.qc_percent < 1].copy()
        for check in self.checks:
            original_shape = filtered_frame.shape
            filtered_frame = check(filtered_frame, self.parameters)
            assert filtered_frame.shape[0] == original_shape[0]
        return self.update_qc_percent(filtered_frame.combine_first(frame))