-
Alex Diebold authoredAlex Diebold authored
detector_temp_flag.py 7.07 KiB
from __future__ import print_function
from scipy.signal import periodogram
import pandas as pd
import numpy as np
from aeri_tools.io.dmv import housekeeping
from matplotlib import pyplot as plt
import os
WIN_SIZE = 40
DETECTOR_TEMP_KEY = 'detectorTemp'
TIME_WINDOW = (60,600) #seconds
MIN_AMP = .02
WIN_TYPE = 'blackmanharris'
def sig_finder_function(rolling_window,**kwargs):
"""Function applied to detectorTemp dataset with pd.rolling_apply
Calculates the dominant period and
"""
#calculate the time difference and average frequency
nsamples = rolling_window.size
times = pd.Series(rolling_window.index)
win_type = kwargs.get('win_type',WIN_TYPE)
try:
#get the time span covered by the window
dt = (times.iloc[-1] - times.iloc[0]).seconds
except AttributeError:
#catch NaT indicies
return np.nan
else:
#compute the dominant frequency/period
fs = nsamples/dt
f, Pxx_den = periodogram(rolling_window,fs,window=win_type)
max_i = Pxx_den.argmax()
if f[max_i] != 0:
primary_T = 1./f[max_i]
else:
primary_T = float("inf")
return primary_T
def series_rolling_apply(series,window,func,**kwargs):
"""rolling applies function to series by passing subsection of series,
rather than np array
"""
#lambda to map the passed function to a rolling window
map_f=lambda i:np.nan if i<window else func(series.iloc[i-window:i],**kwargs)
return pd.Series(map(map_f,range(series.size)),index = series.index)
def flag_detector_temp_oscillations(path_or_dmv,window_size = WIN_SIZE,std=10,**kwargs):
"""Performs power spectral density analysis on detector temperature,
and flags the data if a consistent period is found
"""
#read in the housekeeping dataframe
housekeeping_df = housekeeping.get_all_housekeeping(path_or_dmv)
#get the detector temp variable
detector_temp = housekeeping_df[DETECTOR_TEMP_KEY]
#detrend by subtracting the day's mean
detrend_dtemp = detector_temp - detector_temp.mean()
#approximate periodicity in data using a rolling periodogram
periods = series_rolling_apply(detrend_dtemp,window_size,sig_finder_function,**kwargs)
#check for big spikes in the data (these throw off the periodogram)
#currently disabled
dtt_roller = detrend_dtemp.rolling(window_size)
amps = False & ((dtt_roller.max()-dtt_roller.min()) > .2)
#look for low standard deviation in the period (= a consistent periodicity)
stds = series_rolling_apply(periods,window_size,lambda x:x.std() < std)
#flag as bad if there's a consistent periodicity or large jump in the data
flags = stds | amps
return flags, periods
def flatten_out(original_flags, pds):
'''incrementing windows to help flatten out the 0/1 value as either
0 or 1; removes small blips
'''
flags = original_flags.copy()
values_dict = flags.value_counts()
num_half_hour = int(len(flags)/48)
curr_flags = []
std_data = {}
std_data['curr_std'] = []
std_data['mean_std'] = []
std_data['mean'] = []
std_data['std_curr_std'] = []
std_data['std_mean_std'] = []
lower = 0
upper = 1
while upper <= len(flags):
values_dict = original_flags[lower:upper].value_counts()
if True in values_dict.keys():
mean = values_dict[True] / len(flags[lower:upper])
else:
mean = 1 - (values_dict[False] / len(flags[lower:upper]))
curr_std = np.std(pds[lower:upper])
mean_std = np.mean(pds[lower:upper])
std_data['mean'].append(mean)
std_data['curr_std'].append(curr_std)
std_data['mean_std'].append(mean_std)
std_curr_std = np.std(std_data['curr_std'][lower:upper])
std_data['std_curr_std'].append(std_curr_std)
std_mean_std = np.std(std_data['mean_std'][lower:upper])
std_data['std_mean_std'].append(std_mean_std)
if std_mean_std < 20 or np.isnan(std_mean_std):
flags[lower:upper] = flags[lower:upper].replace(False, True)
else:
flags[lower:upper] = flags[lower:upper].replace(True, False)
upper += 1
if upper - lower > num_half_hour:
lower += 1
new_flags = flags & (np.asarray(std_data['mean']) > 0.35)
# new_flags = np.asarray(std_data['mean']) > 0.8
# fig, ax = plt.subplots(2, figsize=(15,10),sharex=True)
# plt.plot(flags)
# plt.plot(new_flags)
# plt.show()
# plt.clf()
return new_flags, std_data
def main(path):
"""Test function that displays detectorTemp alongside the computed flag
using different windowing functions
"""
dmv = housekeeping.get_all_housekeeping(path)
detector_temp = dmv[DETECTOR_TEMP_KEY]
fig,ax = plt.subplots(5,figsize=(15,10),sharex=True)
plt.sca(ax[0])
plt.plot(dmv['Time'],detector_temp,'r')
# plt.ylim(76.9,77.6)
plt.ylabel("Detector Temperature (K)")
plt.grid()
flags,pds = flag_detector_temp_oscillations(path,win_type=WIN_TYPE)
flags, std_data = flatten_out(flags.copy(), pds)
plt.sca(ax[1])
plt.plot(dmv['Time'],pds,lw=2,label=WIN_TYPE)
plt.grid(True)
plt.ylabel("Dominant Period (s)")
#plt.legend()
plt.sca(ax[2])
plt.plot(dmv['Time'],flags,lw=2,label=WIN_TYPE)
plt.grid(True)
plt.ylabel("Standard Deviation Flag")
plt.xlabel("Time (Hours UTC)")
plt.ylim(-0.1,1.1)
plt.sca(ax[3])
plt.plot(dmv['Time'], np.asarray(std_data['mean']))
title = os.path.split(path)[-1][:6]
plt.suptitle(title,fontsize=14)
plt.sca(ax[4])
plt.plot(dmv['Time'], std_data['curr_std']) #blue
plt.plot(dmv['Time'], std_data['mean_std']) #green
plt.plot(dmv['Time'], std_data['std_curr_std']) #red
plt.plot(dmv['Time'], std_data['std_mean_std'])
print('title: ', title)
plt.savefig("sgp_detect_pngs/detector_temp_flag_test_%s.png"%title,bbox_inches='tight')
# plt.savefig("detector_temp_flag_test_%s_2.png"%title,bbox_inches='tight')
# plt.show()
plt.clf()
plt.close()
if __name__ == '__main__':
from sys import argv
import os
import time
start_time = time.time()
pathname = argv[1]
skip_num = 0
curr_num = 0
print('skip_num is ', skip_num)
if os.path.isdir(pathname):
for filename in os.listdir(pathname):
full_path = pathname + '/' + filename
if os.path.isdir(full_path):
for fname in os.listdir(full_path):
fuller_path = full_path + '/' + fname
if os.path.isfile(fuller_path) and 'B1.CXS' in fname:
if curr_num >= skip_num:
main(fuller_path)
curr_num += 1
print(curr_num, ': ', fuller_path)
elif os.path.isfile(full_path) and 'B1.CXS' in filename:
if curr_num >= skip_num:
main(full_path)
curr_num += 1
print(curr_num, ': ', filename)
else:
main(pathname)
print('execution time: %d minute(s), %.2f second(s)' % ((time.time()-start_time)//60, (time.time()-start_time)%60))