import numpy as np import pandas as pd from aeri_qc.all_checks import checklist from scipy.signal import periodogram WIN_SIZE = 40 DETECTOR_TEMP_KEY = 'detectorTemp' TIME_WINDOW = (60,600) #seconds MIN_AMP = .02 WIN_TYPE = 'blackmanharris' def sig_finder_function(rolling_window,**kwargs): """Function applied to detectorTemp dataset with pd.rolling_apply Calculates the dominant period and """ #calculate the time difference and average frequency nsamples = rolling_window.size times = pd.Series(rolling_window.index) win_type = kwargs.get('win_type',WIN_TYPE) try: #get the time span covered by the window dt = (times.iloc[-1] - times.iloc[0]).seconds except AttributeError: #catch NaT indicies return np.nan else: #compute the dominant frequency/period fs = nsamples/dt f, Pxx_den = periodogram(rolling_window,fs,window=win_type) max_i = Pxx_den.argmax() if f[max_i] != 0: primary_T = 1./f[max_i] else: primary_T = float('inf') return primary_T def series_rolling_apply(series,window,func,**kwargs): """rolling applies function to series by passing subsection of series, rather than np array """ #lambda to map the passed function to a rolling window map_f=lambda i:np.nan if i<window else func(series.iloc[i-window:i],**kwargs) return pd.Series(map(map_f,range(series.size)),index = series.index) def flag_detector_temp_oscillations(detector_temp,window_size = WIN_SIZE,std=10,**kwargs): """Performs power spectral density analysis on detector temperature, and flags the data if a consistent period is found """ #detrend by subtracting the day's mean detrend_dtemp = detector_temp - detector_temp.mean() #approximate periodicity in data using a rolling periodogram periods = series_rolling_apply(detrend_dtemp,window_size,sig_finder_function,**kwargs) #check for big spikes in the data (these throw off the periodogram) #currently disabled dtt_roller = detrend_dtemp.rolling(window_size) amps = False & ((dtt_roller.max()-dtt_roller.min()) > .2) #look for low standard deviation in the period (= a consistent periodicity) stds = series_rolling_apply(periods,window_size,lambda x:x.std() < std) #flag as bad if there's a consistent periodicity or large jump in the data flags = stds | amps return flags, periods def flatten_out(original_flags, pds): '''incrementing windows to help flatten out the 0/1 value as either 0 or 1; removes small blips ''' flags = original_flags.copy() num_half_hour = int(len(flags)/48) std_data = {} std_data['mean_std'] = [] std_data['mean'] = [] std_data['std_mean_std'] = [] lower = 0 upper = 1 while upper <= len(flags): values_dict = original_flags[lower:upper].value_counts() if True in values_dict.keys(): mean = values_dict[True] / len(flags[lower:upper]) else: mean = 1 - values_dict[False] / len(flags[lower:upper]) mean_std = np.mean(pds[lower:upper]) std_data['mean'].append(mean) std_data['mean_std'].append(mean_std) std_mean_std = np.std(std_data['mean_std'][lower:upper]) std_data['std_mean_std'].append(std_mean_std) if std_mean_std < 20 or np.isnan(std_mean_std): flags[lower:upper] = flags[lower:upper].replace(False, True) else: flags[lower:upper] = flags[lower:upper].replace(True, False) upper += 1 if upper - lower > num_half_hour: lower += 1 new_flags = flags & (np.asarray(std_data['mean']) > 0.35) return new_flags @checklist.add_check(depends=['detectorTemp', 'datetime'], affects_calibration=True, description='test for strong periodicity in temperature on the order of 2-10 mins') def detector_temp_check(frame, parameters): """Test function that displays detectorTemp alongside the computed flag using different windowing functions """ detector_temp = pd.Series(data=frame[DETECTOR_TEMP_KEY].data, index=frame['datetime']) flags,pds = flag_detector_temp_oscillations(detector_temp,win_type=WIN_TYPE) flags = flatten_out(flags.copy(), pds) return flags.replace(True, 0.4) ''' @checklist.add_check(depends=['detectorTemp', 'datetime'], affects_calibration=False, description='test for strong periodicity in temperature on the order of 2-10 mins - intermediary value') def detector_temp_check_intermediary(frame, parameters): """Test function that displays detectorTemp alongside the computed flag using different windowing functions """ detector_temp = pd.Series(data=frame[DETECTOR_TEMP_KEY].data, index=frame['datetime']) pds_total = {} for win_type in 'bartlett','parzen','hann','nuttall': flags,pds = flag_detector_temp_oscillations(detector_temp,win_type=win_type) pds_total[win_type] = pds return pds_total '''