-
Coda Phillips authoredCoda Phillips authored
util.py 3.32 KiB
from itertools import takewhile
import numpy as np
import pandas as pd
def annotate(frame, loc, annotation):
notes = frame.loc[loc, 'qc_notes']
if type(notes) == str and len(notes) > 0:
frame.loc[loc, 'qc_notes'] = ','.join([notes, annotation])
else:
frame.loc[loc, 'qc_notes'] = annotation
def update_variable_qc(frame, variable_qcs):
def proba_update(x,y,conversion=False):
try:
return x+y-x*y
except TypeError:
return x
return frame.drop('qc_notes', axis=1).combine(variable_qcs, proba_update, fill_value=0).combine_first(frame[['qc_notes']])
def annotate_all(frame, mask, annotation):
for loc in frame.index[mask]:
annotate(frame, loc, annotation)
def invalidate_records(frame, check_name):
for index,percent in frame.ix[frame[check_name] > 0, check_name].iteritems():
invalidate_record(frame, index, check_name, percent)
return frame
def invalidate_record(frame, loc, check_name, value, annotation=''):
frame.loc[loc, check_name] = value
if annotation:
if 'qc_notes' not in frame:
frame['qc_notes'] = None
annotate(frame, loc, annotation)
corrupt_view = frame.loc[loc,'sceneMirrorPosition']
if corrupt_view in [ord('H'),ord('A')]:
def invalidate_neighbor(neighbor):
if frame.sceneMirrorPosition.loc[neighbor] == corrupt_view:
# Made one cycle, break
return True
elif frame.sceneMirrorPosition.loc[neighbor] in [ord('H'), ord('A')]:
# Skip opposite calibration views
return
else:
# Invalidate non-calibration views
frame.loc[neighbor,check_name] = value
annotate(frame, neighbor, 'invalid calibration:{:d}'.format(loc))
# Corrupt calibration view, must also invalidate neighboring scenes
# _idx is the iloc
_idx = frame.index.tolist().index(loc) + 1
while _idx < len(frame):
if invalidate_neighbor(frame.index[_idx]):
break
_idx += 1
# _idx is the iloc
_idx = frame.index.tolist().index(loc) - 1
while _idx >= 0:
if invalidate_neighbor(frame.index[_idx]):
break
_idx -= 1
return frame
class BaseCheckList:
def __init__(self, *args, **kwargs):
self.check_results = {}
self.parameters = {}
def set_params(self, parameters):
self.parameters = parameters
def update_qc_percent(self, frame):
for check_func in self.checks:
name = check_func.__name__
if name in frame.columns:
results = frame[name].fillna(0)
# Compute P(A U B)
previous_percent = frame['qc_percent']
frame['qc_percent'] = previous_percent + results - previous_percent*results
return frame
def compute(self, frame):
# Filter bad records from previous level
filtered_frame = frame.copy()#.ix[frame.qc_percent < 1].copy()
for check in self.checks:
original_shape = filtered_frame.shape
filtered_frame = check(filtered_frame, self.parameters)
assert filtered_frame.shape[0] == original_shape[0]
return self.update_qc_percent(filtered_frame.combine_first(frame))