-
Coda Phillips authoredCoda Phillips authored
all_checks.py 5.01 KiB
import numpy as np
import pandas as pd
class Check(object):
def __init__(self, *, depends, affects_calibration, affected_by_calibration, description, hides, func):
self.depends = depends
self.affects_calibration = affects_calibration
self.affected_by_calibration = affected_by_calibration
self.description = description
assert all(hide.endswith('_check') for hide in hides)
self.hides = list(hides)
self.func = func
self.name = func.__name__
def __call__(self, checklist, frame, parameters):
if self.is_satisfied(frame):
masked_frame = frame.copy()
masked_frame.calibration_graph = frame.calibration_graph
if self.affected_by_calibration:
# Invalidate records in dependencies when they affect calibration
dependencies_that_affect_calibration = set()
for check in checklist.checks:
if check.name in self.depends and check.affects_calibration:
masked_frame[check.name] = self.invalidate_calibration(masked_frame[check.name], masked_frame)
if self.hides != []:
masked_frame = masked_frame.where((masked_frame[self.hides] < .95).all(axis=1))
returned_vector = self.func(masked_frame[self.depends], parameters)
try:
returned_array = returned_vector.values
except AttributeError:
# In case of raw numpy array
returned_array = returned_vector
dtype = returned_array.dtype
self.result = returned_array
frame[self.name] = returned_array * 1
return frame
def is_satisfied(self, frame):
for dependency in self.depends:
if dependency not in frame.columns:
return False
for dependency in self.hides:
if dependency not in frame.columns:
return False
return True
def __repr__(self):
return 'Check({})'.format(self.name)
def invalidate_calibration(self,invalid_records, frame):
invalid_records = invalid_records.copy()
for blackbody_index, affected_indices in frame.calibration_graph.items():
assert chr(int(frame['sceneMirrorPosition'].iloc[blackbody_index])) in 'HA', 'frame["sceneMirrorPosition"][{:d}] = {}'.format(blackbody_index, chr(int(frame['sceneMirrorPosition'].iloc[blackbody_index])))
assert not np.isnan(invalid_records[blackbody_index])
# Update affected records probability
value_of_blackbody_record = invalid_records[blackbody_index]
values_of_affected_records = invalid_records[list(affected_indices)]
invalid_records[list(affected_indices)] = value_of_blackbody_record + values_of_affected_records - (value_of_blackbody_record * values_of_affected_records)
return invalid_records
class Checklist(object):
def __init__(self):
self.checks = []
def add_check(self, *, depends, affects_calibration=False, affected_by_calibration=False, description, hides=()):
def wrap_check(func):
ck = Check(
func=func,
depends=depends,
affects_calibration=affects_calibration,
affected_by_calibration=affected_by_calibration,
description=description,
hides=hides
)
self.checks.append(ck)
return ck
return wrap_check
def check_everything(self, frame, parameters):
another_check_completed = True
uncompleted_checks = list(self.checks)
while another_check_completed:
another_check_completed = False
for check in uncompleted_checks:
if check.is_satisfied(frame):
frame = check(checklist, frame, parameters)
another_check_completed = True
uncompleted_checks.remove(check)
assert uncompleted_checks == []
return frame
# Create global checklist registry
checklist = Checklist()
# then register all the checks using circular import like flask
import aeri_qc.electronic_checks
import aeri_qc.global_checks
import aeri_qc.radiometric_checks
import aeri_qc.scene_checks
import aeri_qc.state_checks
import aeri_qc.thermal_checks
import aeri_qc.bomem_file
import aeri_qc.igm_checks
every_check_so_far = list({check.name for check in checklist.checks})
# Add summary field
@checklist.add_check(depends=every_check_so_far, affected_by_calibration=True, description='Computed summary of all checks to approximate percent probability of invalid data')
def qc_percent(frame, parameters):
# "Prior" of 0 (not actually a prior)
probability_of_invalid = 0
for check in frame.columns:
# P(AUB) = P(A)+P(B)-P(A&B) = P(A)+P(B)-P(A)P(B) assuming independence
probability_of_invalid = probability_of_invalid + frame[check] - (probability_of_invalid * frame[check])
return probability_of_invalid