import numpy as np import pandas as pd from itertools import cycle, islice def spike_check(record_generator, mirror_beg): """ Check for spikes by computing the z-score of each point, flagging z-scores greater than 10 record_generator yields dictionaries with keys: DataA, DataB, datetime, sceneMirrorPosition:int, scene_index, subfile """ smp_string = ''.join([s[0][0] for s in mirror_beg]) table = {} scene_generator = islice(compute_cxs_index(smp_string, table), 10000) # For each interferogram for igm_record in record_generator: scene, smp = igm_record['scene_index'], igm_record['sceneMirrorPosition'] # Find cxs_index of this record if (scene, smp) not in table: # Compute some more table entries for gen_scene,gen_smp in scene_generator: if gen_scene == scene and gen_smp == smp: break else: raise ValueError cxs_index = table[scene, smp] for data in [igm_record['DataA'], igm_record['DataB']]: # Compute median absolute deviation in wings of igm first_quarter = data[0:int(len(data)/4)] last_quarter = data[-int(len(data)/4):] wings_combined = np.concatenate([first_quarter, last_quarter]).flatten() deviation = abs(wings_combined - np.median(wings_combined)) mean = np.mean(deviation) if (deviation > (10*mean)).any(): yield cxs_index, igm_record['name'], True else: yield cxs_index, igm_record['name'], False def compute_cxs_index(smp_string, table): scene = 1 for i, smp in enumerate(cycle(smp_string)): smp = ord(smp) if (scene,smp) in table: scene += 1 table[scene,smp] = i yield scene,smp #### # Tests ####### def test_spike_check_empty(): ret = spike_check(pd.DataFrame([]), {}) assert ret.empty assert 'datetime' in ret.columns assert 'sceneMirrorPosition' in ret.columns assert 'spike_check' in ret.columns def test_spike_check_ok(): DataA = [np.random.randn(100) for x in range(10)] data = pd.DataFrame({'DataA':DataA,'DataB':DataA, 'datetime':range(10), 'sceneMirrorPosition':range(10)}) ret = spike_check(data, {}) assert 'datetime' in ret.columns assert 'sceneMirrorPosition' in ret.columns assert 'spike_check' in ret.columns assert not ret['spike_check'].any() def test_spike_check_bad(): DataA = [np.random.randn(1000) for x in range(1000)] DataA[5][10] = 20 data = pd.DataFrame({'DataA':DataA,'DataB':DataA, 'datetime':range(1000), 'sceneMirrorPosition':range(1000)}) ret = spike_check(data, {}) assert 'datetime' in ret.columns assert 'sceneMirrorPosition' in ret.columns assert 'spike_check' in ret.columns assert ret['spike_check'].any()