Skip to content
Snippets Groups Projects
Commit 4ebb8861 authored by Geoff Cureton's avatar Geoff Cureton
Browse files

Added QC for AIRS/MODIS and CrIS/VIIRS Fusion product.

parent a82d3921
No related branches found
No related tags found
No related merge requests found
......@@ -44,13 +44,14 @@ Licensed under GNU GPLv3.
"""
import os
from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, join as pjoin
from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, splitext, join as pjoin
import sys
import string
import shutil
import logging
import traceback
from glob import glob
from itertools import chain
from subprocess import check_output, check_call, CalledProcessError
from netCDF4 import Dataset
from pyhdf.SD import SD, SDC
......@@ -64,6 +65,7 @@ from glutil.software import delivered_software, support_software, runscript
from glutil.catalogs import dawg_catalog
from utils import create_dir
from detect_bad_fusion import SFX, detect
# every module should have a LOG object
LOG = logging.getLogger(__file__)
......@@ -324,6 +326,7 @@ class FUSION_MATLAB(Computation):
matlab_file_dt_filespec = kwargs['matlab_file_dt_filespec']
env = kwargs['env']
granule = kwargs['granule']
satellite = kwargs['satellite']
rc_fusion = 0
......@@ -349,8 +352,6 @@ class FUSION_MATLAB(Computation):
LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t')))
rc_fusion = 0
runscript(cmd, requirements=[], env=env)
#shutil.copy('/mnt/sdata/geoffc/fusion_matlab/work/test_for_gala/CrIS_VIIRS-VNP02MOD/outputs/tmpeajDcu/fusion_viirs_15107.1754.001.2.mat',current_dir) # DEBUG
#shutil.copy('/mnt/sdata/geoffc/fusion_matlab/work/test_for_gala/AIRS_MODIS/fusion_modis_2015107.1755.mat',current_dir) # DEBUG
except CalledProcessError as err:
rc_fusion = err.returncode
LOG.error("Matlab binary {} returned a value of {}".format(fusion_binary, rc_fusion))
......@@ -451,18 +452,18 @@ class FUSION_MATLAB(Computation):
LOG.debug('Current dir is {}'.format(os.getcwd()))
# Move the HDF4/NetCDF4 file to it's new filename
# Move the HDF4/NetCDF4 file to its new filename
fused_l1b_file_new = fused_l1b_file_new.replace('CTIME', dt_create.strftime('%Y%j%H%M%S'))
LOG.debug('Moving "{}" to "{}" ...'.format(fused_l1b_file, pjoin(current_dir, fused_l1b_file_new)))
shutil.move(fused_l1b_file, pjoin(current_dir, fused_l1b_file_new))
fused_l1b_file = glob(pjoin(current_dir, fused_l1b_file_new))[0]
# Move the matlab file to it's new filename
# Move the matlab file to its new filename
matlab_file_new = basename(matlab_file).replace(
'.mat', '{}.mat'.format(dt_create.strftime('.%Y%j%H%M%S')))
LOG.debug('Moving "{}" to {}...'.format(matlab_file, pjoin(dirname(current_dir), matlab_file_new)))
shutil.move(matlab_file, pjoin(dirname(current_dir), matlab_file_new))
matlab_file = glob(pjoin(dirname(current_dir), matlab_file_new))[0]
LOG.debug('Moving "{}" to {}...'.format(matlab_file, pjoin(current_dir, matlab_file_new)))
shutil.move(matlab_file, pjoin(current_dir, matlab_file_new))
matlab_file = glob(pjoin(current_dir, matlab_file_new))[0]
# Remove the fused_outputs directory
LOG.debug('Removing the fused_outputs dir {} ...'.format(out_dir))
......@@ -470,6 +471,31 @@ class FUSION_MATLAB(Computation):
return rc_fusion, fused_l1b_file
def output_QC(self, l1b_file, fused_l1b_file, band=None, input_rms=0.2, **kwargs):
satellite = kwargs['satellite']
LOG.debug('satellite = {}'.format(satellite))
band_default = {'aqua':32, 'snpp':15}
if band is None:
band = band_default[satellite]
input_files = [fused_l1b_file] if satellite=='snpp' else [l1b_file, fused_l1b_file]
generators = list(SFX[splitext(p)[-1].lower()](p, band) for p in input_files)
stuff = list(chain(*generators))
if len(stuff) != 2:
raise AssertionError("requires original + fusion input")
passfail, rms = detect(input_rms, *stuff)
if passfail:
LOG.info("Output QC PASS (rms {} < threshold {})".format(rms, input_rms))
else:
LOG.warn("Output QC FAIL (rms {} > threshold {})".format(rms, input_rms))
return 0 if passfail else 1
def update_global_attrs(self, netcdf_file, readme_file, **kwargs):
satellite = kwargs['satellite']
......@@ -654,6 +680,12 @@ class FUSION_MATLAB(Computation):
readme_file = pjoin(delivery.path, 'README.txt')
self.update_global_attrs(basename(fused_l1b_file), readme_file, **kwargs)
# Run a QC check on the output file
rc_qc = self.output_QC(l1b, fused_l1b_file, **kwargs)
LOG.debug('output_QC() return value: {}'.format(rc_qc))
if rc_qc != 0:
raise WorkflowNotReady('Output fusion file {} failed RMS error QC check, output aborted.'.format(fused_l1b_file))
# The staging routine assumes that the output file is located in the work directory
# "tmp******", and that the output path is to be prepended, so return the basename.
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Detect obviously-bad fusion data
"""
import os
import sys
import numpy as np
import netCDF4 as nc4
#from scipy.io import loadmat
import logging
from itertools import chain
LOG = logging.getLogger(__name__)
DEFAULT_BAND = 32
DEFAULT_THRESHOLD = 0.2
def detect(threshold, tru, tst):
"""Compute RMS error and return pass/fail boolean"""
yisq = (tst.ravel() - tru.ravel()) ** 2.0
rms = np.sqrt(np.sum(yisq) / float(len(yisq)))
LOG.info("RMS error is {} (threshold {})".format(rms, threshold))
return bool(rms <= threshold), rms
#def mat_band(filename, band_number=DEFAULT_BAND):
#"""Yield a single F from a MAT file output from fusion science code"""
#LOG.info("reading band {} from {} Fusion MAT".format(band_number, filename))
#mat = loadmat(filename)
#bandex = np.argwhere(mat['imager_bands'].squeeze() == band_number)[0][0]
#rad = mat['fusion_rad'][bandex].squeeze()
#yield rad
def modis_band(filename, band_number=DEFAULT_BAND):
"""Yield M or F from a single MODIS or MODIS-AIRS fusion file"""
LOG.info("reading band {} from {} MODIS HDF".format(band_number, filename))
nc = nc4.Dataset(filename)
em = nc.variables['EV_1KM_Emissive']
bn,sc,of = dict(((b,i) for (i,b) in enumerate(map(int, em.band_names.split(','))))), em.radiance_scales, em.radiance_offsets
band = lambda v, b: sc[bn[b]] * (v[bn[b]] - of[bn[b]])
yield band(em, band_number)
def viirs_band(filename, band_number=DEFAULT_BAND):
"""Yield both Mband and Fband from a single file"""
LOG.info("reading band {} (M and Fusion) from {} VIIRS NetCDF".format(band_number, filename))
nc = nc4.Dataset(filename)
ncob = nc['observation_data']
m = ncob['M%02d' % band_number][:]
yield m
f = ncob['Fusion%02d' % band_number][:]
yield f
def _debug(type_, value, tb):
"""enable with sys.excepthook = debug"""
if not sys.stdin.isatty():
sys.__excepthook__(type_, value, tb)
else:
import traceback, pdb
traceback.print_exception(type_, value, tb)
# …then start the debugger in post-mortem mode.
pdb.post_mortem(tb) # more “modern”
#SFX = {'.mat': mat_band, '.hdf': modis_band, '.nc': viirs_band}
SFX = {'.hdf': modis_band, '.nc': viirs_band}
def file_QC(input_rms, band, input_files):
generators = list(SFX[os.path.splitext(p)[-1].lower()](p, band) for p in input_files)
stuff = list(chain(*generators))
if len(stuff) != 2:
raise AssertionError("requires original + fusion input")
passfail, rms = detect(input_rms, *stuff)
print("> pass (rms {} < threshold {})".format(rms, input_rms) if passfail else "> FAIL (rms {} > threshold {})".format(
rms, input_rms))
return 0 if passfail else 1
def main():
import argparse
parser = argparse.ArgumentParser(
description="",
epilog="",
fromfile_prefix_chars='@')
parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
help="enable interactive PDB debugger on exception")
parser.add_argument('-B', '--band', dest='band', type=int, default=DEFAULT_BAND,
help="choose band number")
parser.add_argument('-R', '--rms', dest='rms', type=float, default=DEFAULT_THRESHOLD,
help="maximum tolerable RMS error")
parser.add_argument('inputs', nargs='*',
help="input files to process (MAT or HDF), requires 2 files")
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, args.verbosity)])
if args.debug:
sys.excepthook = _debug
# if not args.inputs:
# unittest.main()
# return 0
# Creates file reading generators for each file type in the inputs.
generators = list(SFX[os.path.splitext(p)[-1].lower()](p, args.band) for p in args.inputs)
stuff = list(chain(*generators))
if len(stuff) != 2:
raise AssertionError("requires original + fusion input")
passfail, rms = detect(args.rms, *stuff)
print("> pass (rms {} < threshold {})".format(rms, args.rms) if passfail else "> FAIL (rms {} > threshold {})".format(
rms, args.rms))
return 0 if passfail else 1
if __name__ == '__main__':
sys.exit(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment