Skip to content
Snippets Groups Projects
main.py 7.71 KiB
import os
from glob import glob
import re
import netCDF4
from aeri_tools.io.dmv.housekeeping import get_all_housekeeping
import pandas as pd
import numpy as np
from zipfile import ZipFile
from io import BytesIO


from electronic_checks import electronic_checks
from global_checks import global_checks
from radiometric_checks import radiometric_checks
from scene_checks import scene_checks
from state_checks import state_checks
from thermal_checks import thermal_checks
from bomem_file import read_stream
from igm_checks import spike_check
from datetime import datetime

levels = [
    global_checks,
    scene_checks,
    state_checks,
    electronic_checks,
    radiometric_checks,
    thermal_checks
]

def save_quality(frame, qc_path):
    """
    Save the DataFrame (frame) to a netCDF at (qc_path)
    """
    # First select only rows corresponding to records from the sum file
    frame = frame.ix[pd.notnull(frame.sum_index)].set_index('sum_index').sort_index()

    # Define the netcdf
    ncdf = netCDF4.Dataset(qc_path, 'w')
    time = ncdf.createDimension('time', len(frame))
    base_time = ncdf.createVariable('base_time', 'i8', ())
    time_offset = ncdf.createVariable('time_offset', 'i8', ('time',))
    qc_percent = ncdf.createVariable('qc_percent', 'f4', ('time',))
    qc_notes = ncdf.createVariable('qc_notes', str, ('time',))
    
    # Write the columns ending in _check (aggregate tests)
    for check_mask in frame.filter(like='_check'):
        ncdf.createVariable(check_mask, 'f4', ('time',))[:] = frame[check_mask].values
    # Write the columns starting with qc_ (tests applied directly to variables)
    for variable_qc in frame.filter(like='qc_'):
        if variable_qc not in ['qc_notes','qc_percent']:
            ncdf.createVariable(variable_qc, 'f4', ('time',))[:] = frame[variable_qc].values

    # Write time information
    base_time[:] = frame.datetime.dropna().iloc[0].to_datetime64()
    time_offset[:] = (frame.datetime - frame.datetime.dropna().iloc[0]).values

    # Write the summary
    qc_percent[:] = frame['qc_percent'].values
    qc_notes[:] = frame['qc_notes'].fillna('').values
    ncdf.close()
    

def read_frame(cxs_file, sum_file):
    """
    Read housekeeping from CXS file and SUM file together

    Returns DataFrame with range index, datetime column, and sum_index column, and housekeeping data
    """
    # Get CXS housekeeping as dataframe
    cxs = get_all_housekeeping(cxs_file)
    # Save the record numbers for future use
    cxs['cxs_index'] = np.arange(len(cxs))
    # missing records will appear as rows with NaT index, clear them
    cxs = cxs.ix[pd.notnull(cxs.index)]

    # Read SUM as well
    sum_ = get_all_housekeeping(sum_file)
    sum_['sum_index'] = np.arange(len(sum_))
    sum_ = sum_.ix[pd.notnull(sum_.index)]
    
    # Combine extra data from SUM into CXS, many columns will have during calibration views
    hk = cxs.combine_first(sum_)
    hk.index.name = 'datetime'
    return hk.reset_index()

def read_igms(spc_zip_path):
    """
    Read a zip file that archives Igm files, yield dictionaries containing interferograms and index info
    """
    if spc_zip_path is not None:
        # Open zip file
        with ZipFile(spc_zip_path) as spc_zip:
            # Find all members with .Igm suffix
            for name in spc_zip.namelist():
                if name.endswith('.Igm'):
                    # Read the data
                    with spc_zip.open(name) as igm:
                        inS = BytesIO(igm.read())
                    for index, subfile in enumerate(read_stream(inS)):
                        # yield row
                        yield {
                            'datetime':datetime.utcfromtimestamp(subfile['UTCTime']),
                            'DataA':subfile['DataA'].squeeze(),
                            'DataB':subfile['DataB'].squeeze(),
                            'sceneMirrorPosition':ord(name[0]),
                            'subfile':index
                        }

def check_frame(frame, parameters):
    """
    Start with housekeeping DataFrame and iteratively run checks to compute quality
    """
    frame['qc_percent'] = 0
    frame['qc_notes'] = None
    for level in levels:
        level.set_params(parameters)
        frame = level.compute(frame)
    return frame

def update_all(ftp_dir, sci_dir, parameters=None):
    """
    Given the root directories for ftp and sci, find all days lacking an up-to-date qc file and generate new qc
    """
    # Find all CXS files
    cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS'))
    # For each CXS file find a matching SUM file and possible QC filename
    for qc_file, cxs_file, sum_file in files_to_update(cxs_files):
        print('Performing quality control for {}'.format(cxs_file))
        # Find the Igms for these times
        YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1)
        if sci_dir is None:
            zip_file = None
        else:
            zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip')
            if not os.path.isfile(zip_file):
                zip_file = None
        # First read the housekeeping dataframe
        frame = read_frame(cxs_file, sum_file)
        # read the interferograms
        igms = pd.DataFrame.from_records(read_igms(zip_file))
        if parameters is None:
            parameters = {}
        # check for spikes in interferograms and add that quality column to housekeeping frame
        # merging by datetimes is not expected to work, will probably interleave with SUM records
        frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True)
        # Propogate spike data to surrounding records
        # Only propogate presence of spikes, not abscence
        frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan
        frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1)
        # Reindex back to housekeeping frame (union of sum and cxs records), removing interleaved spike data
        frame_with_spikes = frame_with_spikes.ix[frame.index]
        # Perform qc on housekeeping frame
        frame_with_spikes = check_frame(frame_with_spikes, parameters)

        save_quality(frame_with_spikes, qc_file)

def files_to_update(cxs_files, update_only=True):
    """
    Find a matching SUM file and determine the QC filename for each CXS file in sequence
    """
    for cxs_file in cxs_files:
        # Determine names of possible files
        possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM'))
        possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc'))

        # Check if those files exist
        if os.path.isfile(possible_sum):
            sum_file = possible_sum

            # If QC file already exists, also check that it is newer than the sum and cxs file
            if os.path.isfile(possible_qc):
                qc_file = possible_qc
                if max(os.path.getmtime(sum_file), os.path.getmtime(cxs_file)) > os.path.getmtime(qc_file):
                    # Regenerate QC if it is older
                    yield (qc_file, cxs_file, sum_file)
                elif not update_only:
                    # update_only=False will always regenerate
                    yield (qc_file, cxs_file, sum_file)
            else:
                # if qc doesn't exist, generate
                yield (possible_qc, cxs_file, sum_file)

if __name__ == '__main__':
    import argparse
    import os
    parser = argparse.ArgumentParser()
    parser.add_argument('ftp')
    if 'nt' in os.name:
        default_sci = 'C:\\'
    else:
        default_sci = '/cygdrive/c/'
    parser.add_argument('sci', default=None, nargs='?')

    args = parser.parse_args()

    update_all(args.ftp, args.sci)