diff --git a/source/flo/__init__.py b/source/flo/__init__.py index 4eea9867f0124ab2c1d44a262e8f0081ad6e85d1..c729d4e98c326fe7465aa4d7fe8152cd06cf61d7 100644 --- a/source/flo/__init__.py +++ b/source/flo/__init__.py @@ -44,13 +44,14 @@ Licensed under GNU GPLv3. """ import os -from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, join as pjoin +from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, splitext, join as pjoin import sys import string import shutil import logging import traceback from glob import glob +from itertools import chain from subprocess import check_output, check_call, CalledProcessError from netCDF4 import Dataset from pyhdf.SD import SD, SDC @@ -64,6 +65,7 @@ from glutil.software import delivered_software, support_software, runscript from glutil.catalogs import dawg_catalog from utils import create_dir +from detect_bad_fusion import SFX, detect # every module should have a LOG object LOG = logging.getLogger(__file__) @@ -324,6 +326,7 @@ class FUSION_MATLAB(Computation): matlab_file_dt_filespec = kwargs['matlab_file_dt_filespec'] env = kwargs['env'] granule = kwargs['granule'] + satellite = kwargs['satellite'] rc_fusion = 0 @@ -349,8 +352,6 @@ class FUSION_MATLAB(Computation): LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t'))) rc_fusion = 0 runscript(cmd, requirements=[], env=env) - #shutil.copy('/mnt/sdata/geoffc/fusion_matlab/work/test_for_gala/CrIS_VIIRS-VNP02MOD/outputs/tmpeajDcu/fusion_viirs_15107.1754.001.2.mat',current_dir) # DEBUG - #shutil.copy('/mnt/sdata/geoffc/fusion_matlab/work/test_for_gala/AIRS_MODIS/fusion_modis_2015107.1755.mat',current_dir) # DEBUG except CalledProcessError as err: rc_fusion = err.returncode LOG.error("Matlab binary {} returned a value of {}".format(fusion_binary, rc_fusion)) @@ -451,18 +452,18 @@ class FUSION_MATLAB(Computation): LOG.debug('Current dir is {}'.format(os.getcwd())) - # Move the HDF4/NetCDF4 file to it's new filename + # Move the HDF4/NetCDF4 file to its new filename fused_l1b_file_new = fused_l1b_file_new.replace('CTIME', dt_create.strftime('%Y%j%H%M%S')) LOG.debug('Moving "{}" to "{}" ...'.format(fused_l1b_file, pjoin(current_dir, fused_l1b_file_new))) shutil.move(fused_l1b_file, pjoin(current_dir, fused_l1b_file_new)) fused_l1b_file = glob(pjoin(current_dir, fused_l1b_file_new))[0] - # Move the matlab file to it's new filename + # Move the matlab file to its new filename matlab_file_new = basename(matlab_file).replace( '.mat', '{}.mat'.format(dt_create.strftime('.%Y%j%H%M%S'))) - LOG.debug('Moving "{}" to {}...'.format(matlab_file, pjoin(dirname(current_dir), matlab_file_new))) - shutil.move(matlab_file, pjoin(dirname(current_dir), matlab_file_new)) - matlab_file = glob(pjoin(dirname(current_dir), matlab_file_new))[0] + LOG.debug('Moving "{}" to {}...'.format(matlab_file, pjoin(current_dir, matlab_file_new))) + shutil.move(matlab_file, pjoin(current_dir, matlab_file_new)) + matlab_file = glob(pjoin(current_dir, matlab_file_new))[0] # Remove the fused_outputs directory LOG.debug('Removing the fused_outputs dir {} ...'.format(out_dir)) @@ -470,6 +471,31 @@ class FUSION_MATLAB(Computation): return rc_fusion, fused_l1b_file + def output_QC(self, l1b_file, fused_l1b_file, band=None, input_rms=0.2, **kwargs): + + satellite = kwargs['satellite'] + LOG.debug('satellite = {}'.format(satellite)) + + band_default = {'aqua':32, 'snpp':15} + if band is None: + band = band_default[satellite] + + input_files = [fused_l1b_file] if satellite=='snpp' else [l1b_file, fused_l1b_file] + + generators = list(SFX[splitext(p)[-1].lower()](p, band) for p in input_files) + stuff = list(chain(*generators)) + if len(stuff) != 2: + raise AssertionError("requires original + fusion input") + passfail, rms = detect(input_rms, *stuff) + + if passfail: + LOG.info("Output QC PASS (rms {} < threshold {})".format(rms, input_rms)) + else: + LOG.warn("Output QC FAIL (rms {} > threshold {})".format(rms, input_rms)) + + return 0 if passfail else 1 + + def update_global_attrs(self, netcdf_file, readme_file, **kwargs): satellite = kwargs['satellite'] @@ -654,6 +680,12 @@ class FUSION_MATLAB(Computation): readme_file = pjoin(delivery.path, 'README.txt') self.update_global_attrs(basename(fused_l1b_file), readme_file, **kwargs) + # Run a QC check on the output file + rc_qc = self.output_QC(l1b, fused_l1b_file, **kwargs) + LOG.debug('output_QC() return value: {}'.format(rc_qc)) + if rc_qc != 0: + raise WorkflowNotReady('Output fusion file {} failed RMS error QC check, output aborted.'.format(fused_l1b_file)) + # The staging routine assumes that the output file is located in the work directory # "tmp******", and that the output path is to be prepended, so return the basename. diff --git a/source/flo/detect_bad_fusion.py b/source/flo/detect_bad_fusion.py new file mode 100644 index 0000000000000000000000000000000000000000..139cb76b22734817e1a799d2490dbca9e005e953 --- /dev/null +++ b/source/flo/detect_bad_fusion.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Detect obviously-bad fusion data +""" +import os +import sys +import numpy as np +import netCDF4 as nc4 +#from scipy.io import loadmat +import logging +from itertools import chain + +LOG = logging.getLogger(__name__) + +DEFAULT_BAND = 32 +DEFAULT_THRESHOLD = 0.2 + + +def detect(threshold, tru, tst): + """Compute RMS error and return pass/fail boolean""" + yisq = (tst.ravel() - tru.ravel()) ** 2.0 + rms = np.sqrt(np.sum(yisq) / float(len(yisq))) + LOG.info("RMS error is {} (threshold {})".format(rms, threshold)) + return bool(rms <= threshold), rms + + +#def mat_band(filename, band_number=DEFAULT_BAND): + #"""Yield a single F from a MAT file output from fusion science code""" + #LOG.info("reading band {} from {} Fusion MAT".format(band_number, filename)) + #mat = loadmat(filename) + #bandex = np.argwhere(mat['imager_bands'].squeeze() == band_number)[0][0] + #rad = mat['fusion_rad'][bandex].squeeze() + #yield rad + + +def modis_band(filename, band_number=DEFAULT_BAND): + """Yield M or F from a single MODIS or MODIS-AIRS fusion file""" + LOG.info("reading band {} from {} MODIS HDF".format(band_number, filename)) + nc = nc4.Dataset(filename) + em = nc.variables['EV_1KM_Emissive'] + bn,sc,of = dict(((b,i) for (i,b) in enumerate(map(int, em.band_names.split(','))))), em.radiance_scales, em.radiance_offsets + band = lambda v, b: sc[bn[b]] * (v[bn[b]] - of[bn[b]]) + yield band(em, band_number) + + +def viirs_band(filename, band_number=DEFAULT_BAND): + """Yield both Mband and Fband from a single file""" + LOG.info("reading band {} (M and Fusion) from {} VIIRS NetCDF".format(band_number, filename)) + nc = nc4.Dataset(filename) + ncob = nc['observation_data'] + m = ncob['M%02d' % band_number][:] + yield m + f = ncob['Fusion%02d' % band_number][:] + yield f + + +def _debug(type_, value, tb): + """enable with sys.excepthook = debug""" + if not sys.stdin.isatty(): + sys.__excepthook__(type_, value, tb) + else: + import traceback, pdb + traceback.print_exception(type_, value, tb) + # …then start the debugger in post-mortem mode. + pdb.post_mortem(tb) # more “modern” + + +#SFX = {'.mat': mat_band, '.hdf': modis_band, '.nc': viirs_band} +SFX = {'.hdf': modis_band, '.nc': viirs_band} + +def file_QC(input_rms, band, input_files): + generators = list(SFX[os.path.splitext(p)[-1].lower()](p, band) for p in input_files) + stuff = list(chain(*generators)) + if len(stuff) != 2: + raise AssertionError("requires original + fusion input") + passfail, rms = detect(input_rms, *stuff) + print("> pass (rms {} < threshold {})".format(rms, input_rms) if passfail else "> FAIL (rms {} > threshold {})".format( + rms, input_rms)) + return 0 if passfail else 1 + + +def main(): + import argparse + parser = argparse.ArgumentParser( + description="", + epilog="", + fromfile_prefix_chars='@') + parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0, + help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') + parser.add_argument('-d', '--debug', dest='debug', action='store_true', + help="enable interactive PDB debugger on exception") + parser.add_argument('-B', '--band', dest='band', type=int, default=DEFAULT_BAND, + help="choose band number") + parser.add_argument('-R', '--rms', dest='rms', type=float, default=DEFAULT_THRESHOLD, + help="maximum tolerable RMS error") + parser.add_argument('inputs', nargs='*', + help="input files to process (MAT or HDF), requires 2 files") + args = parser.parse_args() + levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] + logging.basicConfig(level=levels[min(3, args.verbosity)]) + if args.debug: + sys.excepthook = _debug + # if not args.inputs: + # unittest.main() + # return 0 + + # Creates file reading generators for each file type in the inputs. + generators = list(SFX[os.path.splitext(p)[-1].lower()](p, args.band) for p in args.inputs) + stuff = list(chain(*generators)) + if len(stuff) != 2: + raise AssertionError("requires original + fusion input") + passfail, rms = detect(args.rms, *stuff) + print("> pass (rms {} < threshold {})".format(rms, args.rms) if passfail else "> FAIL (rms {} > threshold {})".format( + rms, args.rms)) + return 0 if passfail else 1 + + +if __name__ == '__main__': + sys.exit(main())