-
Coda Phillips authoredCoda Phillips authored
main.py 7.71 KiB
import os
from glob import glob
import re
import netCDF4
from aeri_tools.io.dmv.housekeeping import get_all_housekeeping
import pandas as pd
import numpy as np
from zipfile import ZipFile
from io import BytesIO
from electronic_checks import electronic_checks
from global_checks import global_checks
from radiometric_checks import radiometric_checks
from scene_checks import scene_checks
from state_checks import state_checks
from thermal_checks import thermal_checks
from bomem_file import read_stream
from igm_checks import spike_check
from datetime import datetime
levels = [
global_checks,
scene_checks,
state_checks,
electronic_checks,
radiometric_checks,
thermal_checks
]
def save_quality(frame, qc_path):
"""
Save the DataFrame (frame) to a netCDF at (qc_path)
"""
# First select only rows corresponding to records from the sum file
frame = frame.ix[pd.notnull(frame.sum_index)].set_index('sum_index').sort_index()
# Define the netcdf
ncdf = netCDF4.Dataset(qc_path, 'w')
time = ncdf.createDimension('time', len(frame))
base_time = ncdf.createVariable('base_time', 'i8', ())
time_offset = ncdf.createVariable('time_offset', 'i8', ('time',))
qc_percent = ncdf.createVariable('qc_percent', 'f4', ('time',))
qc_notes = ncdf.createVariable('qc_notes', str, ('time',))
# Write the columns ending in _check (aggregate tests)
for check_mask in frame.filter(like='_check'):
ncdf.createVariable(check_mask, 'f4', ('time',))[:] = frame[check_mask].values
# Write the columns starting with qc_ (tests applied directly to variables)
for variable_qc in frame.filter(like='qc_'):
if variable_qc not in ['qc_notes','qc_percent']:
ncdf.createVariable(variable_qc, 'f4', ('time',))[:] = frame[variable_qc].values
# Write time information
base_time[:] = frame.datetime.dropna().iloc[0].to_datetime64()
time_offset[:] = (frame.datetime - frame.datetime.dropna().iloc[0]).values
# Write the summary
qc_percent[:] = frame['qc_percent'].values
qc_notes[:] = frame['qc_notes'].fillna('').values
ncdf.close()
def read_frame(cxs_file, sum_file):
"""
Read housekeeping from CXS file and SUM file together
Returns DataFrame with range index, datetime column, and sum_index column, and housekeeping data
"""
# Get CXS housekeeping as dataframe
cxs = get_all_housekeeping(cxs_file)
# Save the record numbers for future use
cxs['cxs_index'] = np.arange(len(cxs))
# missing records will appear as rows with NaT index, clear them
cxs = cxs.ix[pd.notnull(cxs.index)]
# Read SUM as well
sum_ = get_all_housekeeping(sum_file)
sum_['sum_index'] = np.arange(len(sum_))
sum_ = sum_.ix[pd.notnull(sum_.index)]
# Combine extra data from SUM into CXS, many columns will have during calibration views
hk = cxs.combine_first(sum_)
hk.index.name = 'datetime'
return hk.reset_index()
def read_igms(spc_zip_path):
"""
Read a zip file that archives Igm files, yield dictionaries containing interferograms and index info
"""
if spc_zip_path is not None:
# Open zip file
with ZipFile(spc_zip_path) as spc_zip:
# Find all members with .Igm suffix
for name in spc_zip.namelist():
if name.endswith('.Igm'):
# Read the data
with spc_zip.open(name) as igm:
inS = BytesIO(igm.read())
for index, subfile in enumerate(read_stream(inS)):
# yield row
yield {
'datetime':datetime.utcfromtimestamp(subfile['UTCTime']),
'DataA':subfile['DataA'].squeeze(),
'DataB':subfile['DataB'].squeeze(),
'sceneMirrorPosition':ord(name[0]),
'subfile':index
}
def check_frame(frame, parameters):
"""
Start with housekeeping DataFrame and iteratively run checks to compute quality
"""
frame['qc_percent'] = 0
frame['qc_notes'] = None
for level in levels:
level.set_params(parameters)
frame = level.compute(frame)
return frame
def update_all(ftp_dir, sci_dir, parameters=None):
"""
Given the root directories for ftp and sci, find all days lacking an up-to-date qc file and generate new qc
"""
# Find all CXS files
cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS'))
# For each CXS file find a matching SUM file and possible QC filename
for qc_file, cxs_file, sum_file in files_to_update(cxs_files):
print('Performing quality control for {}'.format(cxs_file))
# Find the Igms for these times
YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1)
if sci_dir is None:
zip_file = None
else:
zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip')
if not os.path.isfile(zip_file):
zip_file = None
# First read the housekeeping dataframe
frame = read_frame(cxs_file, sum_file)
# read the interferograms
igms = pd.DataFrame.from_records(read_igms(zip_file))
if parameters is None:
parameters = {}
# check for spikes in interferograms and add that quality column to housekeeping frame
# merging by datetimes is not expected to work, will probably interleave with SUM records
frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True)
# Propogate spike data to surrounding records
# Only propogate presence of spikes, not abscence
frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan
frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1)
# Reindex back to housekeeping frame (union of sum and cxs records), removing interleaved spike data
frame_with_spikes = frame_with_spikes.ix[frame.index]
# Perform qc on housekeeping frame
frame_with_spikes = check_frame(frame_with_spikes, parameters)
save_quality(frame_with_spikes, qc_file)
def files_to_update(cxs_files, update_only=True):
"""
Find a matching SUM file and determine the QC filename for each CXS file in sequence
"""
for cxs_file in cxs_files:
# Determine names of possible files
possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM'))
possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc'))
# Check if those files exist
if os.path.isfile(possible_sum):
sum_file = possible_sum
# If QC file already exists, also check that it is newer than the sum and cxs file
if os.path.isfile(possible_qc):
qc_file = possible_qc
if max(os.path.getmtime(sum_file), os.path.getmtime(cxs_file)) > os.path.getmtime(qc_file):
# Regenerate QC if it is older
yield (qc_file, cxs_file, sum_file)
elif not update_only:
# update_only=False will always regenerate
yield (qc_file, cxs_file, sum_file)
else:
# if qc doesn't exist, generate
yield (possible_qc, cxs_file, sum_file)
if __name__ == '__main__':
import argparse
import os
parser = argparse.ArgumentParser()
parser.add_argument('ftp')
if 'nt' in os.name:
default_sci = 'C:\\'
else:
default_sci = '/cygdrive/c/'
parser.add_argument('sci', default=None, nargs='?')
args = parser.parse_args()
update_all(args.ftp, args.sci)