diff --git a/aeri_qc/igm_checks.py b/aeri_qc/igm_checks.py index 364a8cb909c7e5be25445d3066598e0a9a97e6e0..f867ca4a08592dc19c83224ba9c254fde6a5e27b 100644 --- a/aeri_qc/igm_checks.py +++ b/aeri_qc/igm_checks.py @@ -36,9 +36,9 @@ def spike_check(record_generator, mirror_beg): deviation = abs(wings_combined - np.median(wings_combined)) mean = np.mean(deviation) if (deviation > (10*mean)).any(): - yield cxs_index, True + yield cxs_index, igm_record['name'], True else: - yield cxs_index, False + yield cxs_index, igm_record['name'], False diff --git a/aeri_qc/main.py b/aeri_qc/main.py index 3dbb489cdd7b5734ff1e8e10ec514b4692ad5e2c..44a78a6b433c1f17f14b1639e9947b68c65efb7b 100644 --- a/aeri_qc/main.py +++ b/aeri_qc/main.py @@ -1,4 +1,6 @@ import os +import shelve +import tempfile from glob import glob import re from collections import defaultdict, OrderedDict @@ -137,7 +139,7 @@ def read_frame(cxs_file, sum_file): hk.calibration_graph = compute_calibration_graph(hk.sceneMirrorPosition) return hk -def read_igms(spc_zip_path): +def read_igms(spc_zip_path, cache=()): """ Read a zip file that archives Igm files, yield dictionaries containing interferograms and index info """ @@ -146,7 +148,7 @@ def read_igms(spc_zip_path): with ZipFile(spc_zip_path) as spc_zip: # Find all members with .Igm suffix for name in spc_zip.namelist(): - if name.endswith('.Igm'): + if name.endswith('.Igm') and name not in cache: for index, subfile in enumerate(bomem_file.read_zip(spc_zip, name)): # yield row yield { @@ -155,7 +157,8 @@ def read_igms(spc_zip_path): 'DataB':subfile['DataB'].squeeze(), 'sceneMirrorPosition':ord(name[0]), 'subfile':index, - 'scene_index':int(re.search('[A-Z]([0-9]+)M_[0-9]{8}_[0-9]{6}_.*[.]Igm', name).group(1)) + 'scene_index':int(re.search('[A-Z]([0-9]+)M_[0-9]{8}_[0-9]{6}_.*[.]Igm', name).group(1)), + 'name':name } def check_frame(frame, parameters, checklist): @@ -167,6 +170,26 @@ def check_frame(frame, parameters, checklist): frame = checklist.check_everything(frame, parameters) return frame +def get_cache(): + tempdir = tempfile.gettempdir() + return os.path.join(tempdir,'aeri_quality_control_cache') + +def get_cached_spike_check(zip_file): + cache = get_cache() + cached_data = {} + with shelve.open(cache) as shlv: + with ZipFile(zip_file) as spc_zip: + for name in set(shlv.keys()) & set(spc_zip.namelist()): + cached_data[name] = shlv[name] + data = pd.DataFrame(cached_data).T + data.index.name = 'name' + return data.reset_index() + +def save_cached_spike_check(dataframe): + cache = get_cache() + with shelve.open(cache) as shlv: + shlv.update(dataframe.set_index('name').to_dict(orient='index')) + def prepare_frame(cxs_file, sum_file, sci_dir): # First read the housekeeping dataframe frame = read_frame(cxs_file, sum_file) @@ -179,15 +202,25 @@ def prepare_frame(cxs_file, sum_file, sci_dir): if os.path.isfile(possible_zip_file) and os.path.isfile(possible_mirror): # read the interferograms print('Found igm file') - igms = pd.DataFrame.from_records(spike_check(read_igms(possible_zip_file), read_mirror(possible_mirror)), columns=['cxs_index','spike_check']) + cached_frame = get_cached_spike_check(possible_zip_file) + if 'name' in cached_frame.columns: + cached_names = set(cached_frame['name']) + else: + cached_names = [] + igms = pd.DataFrame.from_records(spike_check(read_igms(possible_zip_file, cache=cached_names), read_mirror(possible_mirror)), + columns=['cxs_index','name','spike_check']) + igms = pd.concat([igms,cached_frame], axis=0) if igms.empty: return frame - igms = igms.groupby('cxs_index').any().reset_index() + igms = igms.groupby('cxs_index').agg({'spike_check':np.any, 'name':lambda x: x.iloc[0]}).reset_index() + save_cached_spike_check(igms) + # Add columns from igms, notably DataA, DataB cal_graph = frame.calibration_graph frame = frame.merge(igms, on=['cxs_index'], how='left', suffixes=('','_igm')) frame.spike_check.fillna(False, inplace=True) frame.calibration_graph = cal_graph + print('Processed igm') return frame