diff --git a/igm_checks.py b/igm_checks.py index df3054bf87dec1c825e1ed2556f13b87c467f594..7df2a82f473b0c5c7669aa53a2d918a807d0d96e 100644 --- a/igm_checks.py +++ b/igm_checks.py @@ -2,24 +2,63 @@ import numpy as np import pandas as pd def spike_check(igms, parameters): - + """ + Check for spikes by computing the z-score of each point, flagging z-scores greater than 10 + """ if igms.empty: - return pd.DataFrame({'spike_check':[], 'sceneMirrorPositioni':[], 'datetime':[]}) + return pd.DataFrame({'spike_check':[], 'sceneMirrorPosition':[], 'datetime':[]}) + # Compute statistics data_a_mean = igms.DataA.mean(axis=0) data_b_mean = igms.DataB.mean(axis=0) - data_a_std = np.vstack(igms.DataA.values).std(axis=0) data_b_std = np.vstack(igms.DataB.values).std(axis=0) + # Check z-scores in both DataA and DataB any_spikes_in_data_a = igms.DataA.apply(lambda data_a: (abs((data_a - data_a_mean)/data_a_std) > 10).any()) any_spikes_in_data_b = igms.DataB.apply(lambda data_b: (abs((data_b - data_b_mean)/data_b_std) > 10).any()) + # Create DataFrame with flags igms = igms.drop(['DataA','DataB'], axis=1) igms['spike_check'] = any_spikes_in_data_a | any_spikes_in_data_b datetime_grouped = igms.groupby('datetime') + # Each Igm file usually has two subfiles (one for each scan) + # each scan has the same time and sceneMirrorPosition + # reduce down to one row per datetime return pd.concat([ datetime_grouped[['spike_check']].any() * 1.0, datetime_grouped[['sceneMirrorPosition']].first() ], axis=1).reset_index() + + +#### +# Tests +####### + +def test_spike_check_empty(): + ret = spike_check(pd.DataFrame([]), {}) + assert ret.empty + assert 'datetime' in ret.columns + assert 'sceneMirrorPosition' in ret.columns + assert 'spike_check' in ret.columns + + +def test_spike_check_ok(): + DataA = [np.random.randn(100) for x in range(10)] + data = pd.DataFrame({'DataA':DataA,'DataB':DataA, 'datetime':range(10), 'sceneMirrorPosition':range(10)}) + ret = spike_check(data, {}) + assert 'datetime' in ret.columns + assert 'sceneMirrorPosition' in ret.columns + assert 'spike_check' in ret.columns + assert not ret['spike_check'].any() + +def test_spike_check_bad(): + DataA = [np.random.randn(1000) for x in range(1000)] + DataA[5][10] = 20 + data = pd.DataFrame({'DataA':DataA,'DataB':DataA, 'datetime':range(1000), 'sceneMirrorPosition':range(1000)}) + ret = spike_check(data, {}) + assert 'datetime' in ret.columns + assert 'sceneMirrorPosition' in ret.columns + assert 'spike_check' in ret.columns + assert ret['spike_check'].any()