Skip to content
Snippets Groups Projects
Commit 918dbc3e authored by Coda Phillips's avatar Coda Phillips
Browse files

Update documentation in main

parent d0fbfee2
No related branches found
No related tags found
No related merge requests found
...@@ -29,37 +29,65 @@ levels = [ ...@@ -29,37 +29,65 @@ levels = [
] ]
def save_quality(frame, qc_path): def save_quality(frame, qc_path):
"""
Save the DataFrame (frame) to a netCDF at (qc_path)
"""
# First select only rows corresponding to records from the sum file
frame = frame.ix[pd.notnull(frame.sum_index)].set_index('sum_index').sort_index() frame = frame.ix[pd.notnull(frame.sum_index)].set_index('sum_index').sort_index()
# Define the netcdf
ncdf = netCDF4.Dataset(qc_path, 'w') ncdf = netCDF4.Dataset(qc_path, 'w')
time = ncdf.createDimension('time', len(frame)) time = ncdf.createDimension('time', len(frame))
base_time = ncdf.createVariable('base_time', 'i8', ()) base_time = ncdf.createVariable('base_time', 'i8', ())
time_offset = ncdf.createVariable('time_offset', 'i8', ('time',)) time_offset = ncdf.createVariable('time_offset', 'i8', ('time',))
qc_percent = ncdf.createVariable('qc_percent', 'f4', ('time',)) qc_percent = ncdf.createVariable('qc_percent', 'f4', ('time',))
qc_notes = ncdf.createVariable('qc_notes', str, ('time',)) qc_notes = ncdf.createVariable('qc_notes', str, ('time',))
# Write the columns ending in _check (aggregate tests)
for check_mask in frame.filter(like='_check'): for check_mask in frame.filter(like='_check'):
ncdf.createVariable(check_mask, 'f4', ('time',))[:] = frame[check_mask].values ncdf.createVariable(check_mask, 'f4', ('time',))[:] = frame[check_mask].values
# Write the columns starting with qc_ (tests applied directly to variables)
for variable_qc in frame.filter(like='qc_'): for variable_qc in frame.filter(like='qc_'):
if variable_qc not in ['qc_notes','qc_percent']: if variable_qc not in ['qc_notes','qc_percent']:
ncdf.createVariable(variable_qc, 'f4', ('time',))[:] = frame[variable_qc].values ncdf.createVariable(variable_qc, 'f4', ('time',))[:] = frame[variable_qc].values
# Write time information
base_time[:] = frame.datetime.dropna().iloc[0].to_datetime64() base_time[:] = frame.datetime.dropna().iloc[0].to_datetime64()
time_offset[:] = (frame.datetime - frame.datetime.dropna().iloc[0]).values time_offset[:] = (frame.datetime - frame.datetime.dropna().iloc[0]).values
# Write the summary
qc_percent[:] = frame['qc_percent'].values qc_percent[:] = frame['qc_percent'].values
qc_notes[:] = frame['qc_notes'].fillna('').values qc_notes[:] = frame['qc_notes'].fillna('').values
ncdf.close() ncdf.close()
def read_frame(cxs_file, sum_file): def read_frame(cxs_file, sum_file):
"""
Read housekeeping from CXS file and SUM file together
Returns DataFrame with range index, datetime column, and sum_index column, and housekeeping data
"""
# Get CXS housekeeping as dataframe
cxs = get_all_housekeeping(cxs_file) cxs = get_all_housekeeping(cxs_file)
# Save the record numbers for future use
cxs['cxs_index'] = np.arange(len(cxs)) cxs['cxs_index'] = np.arange(len(cxs))
# missing records will appear as rows with NaT index, clear them
cxs = cxs.ix[pd.notnull(cxs.index)] cxs = cxs.ix[pd.notnull(cxs.index)]
# Read SUM as well
sum_ = get_all_housekeeping(sum_file) sum_ = get_all_housekeeping(sum_file)
sum_['sum_index'] = np.arange(len(sum_)) sum_['sum_index'] = np.arange(len(sum_))
sum_ = sum_.ix[pd.notnull(sum_.index)] sum_ = sum_.ix[pd.notnull(sum_.index)]
# Combine extra data from SUM into CXS, many columns will have during calibration views
hk = cxs.combine_first(sum_) hk = cxs.combine_first(sum_)
hk.index.name = 'datetime' hk.index.name = 'datetime'
return hk.reset_index() return hk.reset_index()
def read_igms(spc_zip_path): def read_igms(spc_zip_path):
"""
Read a zip file that archives Igm files, yield dictionaries containing interferograms and index info
"""
if spc_zip_path is not None: if spc_zip_path is not None:
# Open zip file # Open zip file
with ZipFile(spc_zip_path) as spc_zip: with ZipFile(spc_zip_path) as spc_zip:
...@@ -70,7 +98,7 @@ def read_igms(spc_zip_path): ...@@ -70,7 +98,7 @@ def read_igms(spc_zip_path):
with spc_zip.open(name) as igm: with spc_zip.open(name) as igm:
inS = BytesIO(igm.read()) inS = BytesIO(igm.read())
for index, subfile in enumerate(read_stream(inS)): for index, subfile in enumerate(read_stream(inS)):
# yield name, data pair # yield row
yield { yield {
'datetime':datetime.utcfromtimestamp(subfile['UTCTime']), 'datetime':datetime.utcfromtimestamp(subfile['UTCTime']),
'DataA':subfile['DataA'].squeeze(), 'DataA':subfile['DataA'].squeeze(),
...@@ -80,6 +108,9 @@ def read_igms(spc_zip_path): ...@@ -80,6 +108,9 @@ def read_igms(spc_zip_path):
} }
def check_frame(frame, parameters): def check_frame(frame, parameters):
"""
Start with housekeeping DataFrame and iteratively run checks to compute quality
"""
frame['qc_percent'] = 0 frame['qc_percent'] = 0
frame['qc_notes'] = None frame['qc_notes'] = None
for level in levels: for level in levels:
...@@ -88,43 +119,66 @@ def check_frame(frame, parameters): ...@@ -88,43 +119,66 @@ def check_frame(frame, parameters):
return frame return frame
def update_all(ftp_dir, sci_dir, parameters=None): def update_all(ftp_dir, sci_dir, parameters=None):
# check for newer sum or cxs file """
Given the root directories for ftp and sci, find all days lacking an up-to-date qc file and generate new qc
"""
# Find all CXS files
cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS')) cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS'))
# For each CXS file find a matching SUM file and possible QC filename
for qc_file, cxs_file, sum_file in files_to_update(cxs_files): for qc_file, cxs_file, sum_file in files_to_update(cxs_files):
print('Performing quality control for {}'.format(cxs_file)) print('Performing quality control for {}'.format(cxs_file))
# Find the Igms for these times # Find the Igms for these times
YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1) YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1)
zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip') if sci_dir is None:
if not os.path.isfile(zip_file):
zip_file = None zip_file = None
else:
zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip')
if not os.path.isfile(zip_file):
zip_file = None
# First read the housekeeping dataframe
frame = read_frame(cxs_file, sum_file) frame = read_frame(cxs_file, sum_file)
# read the interferograms
igms = pd.DataFrame.from_records(read_igms(zip_file)) igms = pd.DataFrame.from_records(read_igms(zip_file))
if parameters is None: if parameters is None:
parameters = {} parameters = {}
# check for spikes in interferograms and add that quality column to housekeeping frame
# merging by datetimes is not expected to work, will probably interleave with SUM records
frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True) frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True)
# Propogate spike data to surrounding records
# Only propogate presence of spikes, not abscence # Only propogate presence of spikes, not abscence
frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan
frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1) frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1)
# Reindex back to housekeeping frame (union of sum and cxs records), removing interleaved spike data
frame_with_spikes = frame_with_spikes.ix[frame.index] frame_with_spikes = frame_with_spikes.ix[frame.index]
# Perform qc on housekeeping frame
frame_with_spikes = check_frame(frame_with_spikes, parameters) frame_with_spikes = check_frame(frame_with_spikes, parameters)
save_quality(frame_with_spikes, qc_file) save_quality(frame_with_spikes, qc_file)
def files_to_update(cxs_files, update_only=True): def files_to_update(cxs_files, update_only=True):
"""
Find a matching SUM file and determine the QC filename for each CXS file in sequence
"""
for cxs_file in cxs_files: for cxs_file in cxs_files:
# Determine names of possible files
possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM')) possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM'))
possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc')) possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc'))
cxs_file
# Check if those files exist
if os.path.isfile(possible_sum): if os.path.isfile(possible_sum):
sum_file = possible_sum sum_file = possible_sum
# If QC file already exists, also check that it is newer than the sum and cxs file
if os.path.isfile(possible_qc): if os.path.isfile(possible_qc):
qc_file = possible_qc qc_file = possible_qc
if max(os.path.getmtime(sum_file), os.path.getmtime(cxs_file)) > os.path.getmtime(qc_file): if max(os.path.getmtime(sum_file), os.path.getmtime(cxs_file)) > os.path.getmtime(qc_file):
# Regenerate QC if it is older
yield (qc_file, cxs_file, sum_file) yield (qc_file, cxs_file, sum_file)
elif not update_only: elif not update_only:
# update_only=False will always regenerate
yield (qc_file, cxs_file, sum_file) yield (qc_file, cxs_file, sum_file)
else: else:
# if qc doesn't exist, generate
yield (possible_qc, cxs_file, sum_file) yield (possible_qc, cxs_file, sum_file)
if __name__ == '__main__': if __name__ == '__main__':
...@@ -136,7 +190,7 @@ if __name__ == '__main__': ...@@ -136,7 +190,7 @@ if __name__ == '__main__':
default_sci = 'C:\\' default_sci = 'C:\\'
else: else:
default_sci = '/cygdrive/c/' default_sci = '/cygdrive/c/'
parser.add_argument('sci', default=default_sci) parser.add_argument('sci', default=None, nargs='?')
args = parser.parse_args() args = parser.parse_args()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment