diff --git a/main.py b/main.py index b26ff432bced00902043ce9f2f36bac205c2c32a..c5d147352303cd2233a1f6fe41a230041dc98819 100644 --- a/main.py +++ b/main.py @@ -29,37 +29,65 @@ levels = [ ] def save_quality(frame, qc_path): + """ + Save the DataFrame (frame) to a netCDF at (qc_path) + """ + # First select only rows corresponding to records from the sum file frame = frame.ix[pd.notnull(frame.sum_index)].set_index('sum_index').sort_index() + + # Define the netcdf ncdf = netCDF4.Dataset(qc_path, 'w') time = ncdf.createDimension('time', len(frame)) base_time = ncdf.createVariable('base_time', 'i8', ()) time_offset = ncdf.createVariable('time_offset', 'i8', ('time',)) qc_percent = ncdf.createVariable('qc_percent', 'f4', ('time',)) qc_notes = ncdf.createVariable('qc_notes', str, ('time',)) + + # Write the columns ending in _check (aggregate tests) for check_mask in frame.filter(like='_check'): ncdf.createVariable(check_mask, 'f4', ('time',))[:] = frame[check_mask].values + # Write the columns starting with qc_ (tests applied directly to variables) for variable_qc in frame.filter(like='qc_'): if variable_qc not in ['qc_notes','qc_percent']: ncdf.createVariable(variable_qc, 'f4', ('time',))[:] = frame[variable_qc].values + + # Write time information base_time[:] = frame.datetime.dropna().iloc[0].to_datetime64() time_offset[:] = (frame.datetime - frame.datetime.dropna().iloc[0]).values + + # Write the summary qc_percent[:] = frame['qc_percent'].values qc_notes[:] = frame['qc_notes'].fillna('').values ncdf.close() def read_frame(cxs_file, sum_file): + """ + Read housekeeping from CXS file and SUM file together + + Returns DataFrame with range index, datetime column, and sum_index column, and housekeeping data + """ + # Get CXS housekeeping as dataframe cxs = get_all_housekeeping(cxs_file) + # Save the record numbers for future use cxs['cxs_index'] = np.arange(len(cxs)) + # missing records will appear as rows with NaT index, clear them cxs = cxs.ix[pd.notnull(cxs.index)] + + # Read SUM as well sum_ = get_all_housekeeping(sum_file) sum_['sum_index'] = np.arange(len(sum_)) sum_ = sum_.ix[pd.notnull(sum_.index)] + + # Combine extra data from SUM into CXS, many columns will have during calibration views hk = cxs.combine_first(sum_) hk.index.name = 'datetime' return hk.reset_index() def read_igms(spc_zip_path): + """ + Read a zip file that archives Igm files, yield dictionaries containing interferograms and index info + """ if spc_zip_path is not None: # Open zip file with ZipFile(spc_zip_path) as spc_zip: @@ -70,7 +98,7 @@ def read_igms(spc_zip_path): with spc_zip.open(name) as igm: inS = BytesIO(igm.read()) for index, subfile in enumerate(read_stream(inS)): - # yield name, data pair + # yield row yield { 'datetime':datetime.utcfromtimestamp(subfile['UTCTime']), 'DataA':subfile['DataA'].squeeze(), @@ -80,6 +108,9 @@ def read_igms(spc_zip_path): } def check_frame(frame, parameters): + """ + Start with housekeeping DataFrame and iteratively run checks to compute quality + """ frame['qc_percent'] = 0 frame['qc_notes'] = None for level in levels: @@ -88,43 +119,66 @@ def check_frame(frame, parameters): return frame def update_all(ftp_dir, sci_dir, parameters=None): - # check for newer sum or cxs file + """ + Given the root directories for ftp and sci, find all days lacking an up-to-date qc file and generate new qc + """ + # Find all CXS files cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS')) + # For each CXS file find a matching SUM file and possible QC filename for qc_file, cxs_file, sum_file in files_to_update(cxs_files): print('Performing quality control for {}'.format(cxs_file)) # Find the Igms for these times YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1) - zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip') - if not os.path.isfile(zip_file): + if sci_dir is None: zip_file = None + else: + zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip') + if not os.path.isfile(zip_file): + zip_file = None + # First read the housekeeping dataframe frame = read_frame(cxs_file, sum_file) + # read the interferograms igms = pd.DataFrame.from_records(read_igms(zip_file)) if parameters is None: parameters = {} + # check for spikes in interferograms and add that quality column to housekeeping frame + # merging by datetimes is not expected to work, will probably interleave with SUM records frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True) + # Propogate spike data to surrounding records # Only propogate presence of spikes, not abscence frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1) + # Reindex back to housekeeping frame (union of sum and cxs records), removing interleaved spike data frame_with_spikes = frame_with_spikes.ix[frame.index] + # Perform qc on housekeeping frame frame_with_spikes = check_frame(frame_with_spikes, parameters) + save_quality(frame_with_spikes, qc_file) def files_to_update(cxs_files, update_only=True): + """ + Find a matching SUM file and determine the QC filename for each CXS file in sequence + """ for cxs_file in cxs_files: + # Determine names of possible files possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM')) possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc')) - cxs_file + # Check if those files exist if os.path.isfile(possible_sum): sum_file = possible_sum + # If QC file already exists, also check that it is newer than the sum and cxs file if os.path.isfile(possible_qc): qc_file = possible_qc if max(os.path.getmtime(sum_file), os.path.getmtime(cxs_file)) > os.path.getmtime(qc_file): + # Regenerate QC if it is older yield (qc_file, cxs_file, sum_file) elif not update_only: + # update_only=False will always regenerate yield (qc_file, cxs_file, sum_file) else: + # if qc doesn't exist, generate yield (possible_qc, cxs_file, sum_file) if __name__ == '__main__': @@ -136,7 +190,7 @@ if __name__ == '__main__': default_sci = 'C:\\' else: default_sci = '/cygdrive/c/' - parser.add_argument('sci', default=default_sci) + parser.add_argument('sci', default=None, nargs='?') args = parser.parse_args()