Skip to content
Snippets Groups Projects
__init__.py 27.21 KiB
#!/usr/bin/env python
# encoding: utf-8
"""

Purpose: Run the fusion_matlab package found at:

    https://gitlab.ssec.wisc.edu/eweisz/modis-airs-fusion
    https://gitlab.ssec.wisc.edu/eweisz/viirs-cris-fusion

and the CF-compliance converter found at:

    https://gitlab.ssec.wisc.edu/sounder-imager-fusion/fusion_glue/tree/develop/py/fusion_l1b

For Aqua AIRS/MODIS fusion, example inputs are...

    * NASA MODISS L1B geolocation and radiometric files:
        MYD03.A2015107.1435.006.2015108161923.hdf
        MYD021KM.A2015107.1435.006.2015108162652.hdf

    * NASA AIRS L1B files:
        AIRS.2015.04.17.145.L1B.AIRS_Rad.v5.0.23.0.G15173170714.hdf
        AIRS.2015.04.17.146.L1B.AIRS_Rad.v5.0.23.0.G15173170305.hdf

    * AIRS/MODIS collocation files
        colloc.airs_20150417T1429.modis_20150417T1435.nc
        colloc.airs_20150417T1435.modis_20150417T1435.nc

For Suomi-NPP CrIS/VIIRS fusion, example inputs are...

    * NASA VIIRS L1B geolocation and radiometric files:
        VNP03MOD.A2015107.1436.001.2017314224025.uwssec.nc
        VNP02MOD.A2015107.1436.001.2017314223846.uwssec.nc

    * NASA CrIS L1B files
        SNDR.SNPP.CRIS.20150417T1436.m06.g147.L1B_NSR.std.v01_00_00.T.161217000158.nc

    * CrIS/VIIRS collocation files
        colloc.cris_20150417T1436.viirs_20150417T1436.nc



Copyright (c) 2017 University of Wisconsin Regents.
Licensed under GNU GPLv3.
"""

import os
from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, splitext, join as pjoin
import sys
import string
import shutil
import logging
import traceback
from glob import glob
from itertools import chain
from subprocess import check_output, check_call, CalledProcessError
from netCDF4 import Dataset
from pyhdf.SD import SD, SDC

from flo.computation import Computation
from flo.builder import WorkflowNotReady
from timeutil import TimeInterval, datetime, timedelta
from flo.util import symlink_inputs_to_working_dir

from glutil.software import delivered_software, support_software, runscript
from glutil.catalogs import dawg_catalog

from utils import create_dir
from detect_bad_fusion import SFX, detect

# every module should have a LOG object
LOG = logging.getLogger(__file__)

class FUSION_MATLAB(Computation):

    parameters = ['granule', 'satellite', 'version']

    outputs = ['fused_l1b']

    def find_contexts(self, time_interval, satellite, delivery_id):
        '''
        Here we assume that the granule boundaries fall along 6-minute (snpp) or 5-minute (aqua)
        increments, starting at the top of the hour:

        SNPP: [0.,   6.,  12.,  18.,  24.,  30.,  36.,  42.,  48.,  54.]
        AQUA: [0.,   5.,  10.,  15.,  20.,  25.,  30.,  35.,  40.,  45.,  50., 55.]
        '''

        if satellite=='snpp':
            granule_length = timedelta(minutes=6)
        elif satellite=='aqua':
            granule_length = timedelta(minutes=5)
        else:
            return []

        return [{'satellite': satellite, 'version': delivery_id, 'granule': g}
                    for g in time_interval.contained_series(granule_length)]

    def _add_modis_l1b_geo_input(self, context, task):
        satellite = context['satellite']
        granule = context['granule']
        granule_length = timedelta(minutes=5)

        myd03_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
        myd03 = dawg_catalog.files('aqua','MOD03', myd03_interval, version='c6')
        if myd03 == []:
            raise WorkflowNotReady('Unable to find matching MYD03 granule for interval {}'.format(myd03_interval))
        myd03_file = myd03[0]
        LOG.debug('MYD03 granule path: {}'.format(myd03_file.path))
        #if not exists(myd03_file.path):
            #raise WorkflowNotReady('Nominally valid MYD03 path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd03_file.path, granule, myd03_interval))

        task.input('geo', myd03_file)

    def _add_modis_l1b_m_input(self, context, task):
        satellite = context['satellite']
        granule = context['granule']
        granule_length = timedelta(minutes=5)

        myd021km_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
        myd021km = dawg_catalog.files('aqua','MOD021KM', myd021km_interval, version='c6')
        if myd021km == []:
            raise WorkflowNotReady('Unable to find matching MYD021KM granule for interval {}'.format(myd021km_interval))
        myd021km_file = myd021km[0]
        LOG.debug('MYD021KM granule path: {}'.format(myd021km_file.path))
        #if not exists(myd021km_file.path):
            #raise WorkflowNotReady('Nominally valid MYD021KM path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd021km_file.path, granule, myd021km_interval))

        task.input('l1b', myd021km_file)

    def _add_airs_l1b_input(self, context, task):
        satellite = context['satellite']
        granule = context['granule']
        granule_length = timedelta(minutes=5)

        myd03_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
        myd03 = dawg_catalog.files('aqua','MOD03', myd03_interval, version='c6')
        if myd03 == []:
            raise WorkflowNotReady('Unable to find matching MYD03 granule for interval {}'.format(myd03_interval))
        myd03_file = myd03[0]
        LOG.debug('MYD03 granule path: {}'.format(myd03_file.path))
        #if not exists(myd03_file.path):
            #raise WorkflowNotReady('Nominally valid MYD03 path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd03_file.path, granule, myd03_interval))

        buf = 0 # seconds
        airs_begin = myd03_file.begin_time - timedelta(seconds=buf)
        airs_end = myd03_file.end_time + timedelta(seconds=buf)
        airs_interval = TimeInterval(airs_begin, airs_end)
        airs = dawg_catalog.files('aqua', 'AIRIBRAD', airs_interval)
        if airs == []:
            raise WorkflowNotReady('Unable to find matching AIRS granules for interval {}'.format(airs_interval))

        for idx, airs_file in enumerate(airs):
            LOG.debug('AIRS granule {}: {} -> {}'.format(idx, airs_file.begin_time, airs_file.end_time))
            task.input('sounder_{}'.format(idx),  airs_file)

    def build_task_aqua(self, context, task):
        '''
        Build up a set of inputs for a single context
        '''

        LOG.debug("context = {}".format(context))

        self._add_modis_l1b_geo_input(context, task)
        self._add_modis_l1b_m_input(context, task)
        self._add_airs_l1b_input(context, task)
        #self._add_v2c_inputs(viirstpw, context, task)

    def _add_viirs_l1b_geo_input(self, context, task):
        satellite = context['satellite']
        granule = context['granule']
        vgeom = dawg_catalog.file(satellite, 'VNP03MOD', granule, version='2.0.2')
        task.input('geo', vgeom)

    def _add_viirs_l1b_m_input(self, context, task):
        satellite = context['satellite']
        granule = context['granule']
        vl1b = dawg_catalog.file(satellite, 'VNP02MOD', granule, version='2.0.2')
        task.input('l1b', vl1b)

    def _add_cris_l1b_input(self, context, task):
        satellite = context['satellite']
        granule = context['granule']
        granule_length = timedelta(minutes=6)
        cris_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
        cris = dawg_catalog.files(satellite, 'CL1B', cris_interval, version='v1.0rc8')
        if cris == []:
            raise WorkflowNotReady('Unable to find matching CrIS granules for interval {}'.format(cris_interval))

        for idx, cris_file in enumerate(cris):
            LOG.debug('CrIS granule {}: {} -> {}'.format(idx, cris_file.begin_time, cris_file.end_time))
            task.input('sounder_{}'.format(idx),  cris_file)

    #def _add_cris_viirs_collocation_input(self, context, task):
        #satellite = context['satellite']
        #granule = context['granule']
        #granule_length = timedelta(minutes=6)
        #cris_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
        #cris = dawg_catalog.files(satellite, 'CL1B', cris_interval, version='v1.0rc8')
        #if cris == []:
            #raise WorkflowNotReady('Unable to find matching CrIS granules for interval {}'.format(cris_interval))

        #for idx, cris_file in enumerate(cris):
            #LOG.debug('CrIS granule {}: {} -> {}'.format(idx, cris_file.begin_time, cris_file.end_time))
            #task.input('sounder_{}'.format(idx),  cris_file)

    #def _add_v2c_inputs(self, product, context, task):
        #"""
        #Create and add inputs for c2v transposed inputs overlapping granule
        #6-min interval.
        #"""
        #collo = product.input('collopak')
        #nucaps = product.input('cspp-nucaps')
        ## XXX: SDR types are based on other inputs
        #cris_sdr_type = '{}-{}'.format(nucaps.options['sdr_type'],
                                       #nucaps.options['cris_version'])
        ## Here we use the viirs version from our other input
        #viirs = product.input('viirs_l1')
        #viirs_sdr_type = 'l1-{}'.format(viirs.version)

        #v2c = ViirsCrisCollocation()
        #ctx = v2c.find_contexts(l1b_interval(context['granule']),
                                #cris_sdr_type=cris_sdr_type,
                                #viirs_sdr_type=viirs_sdr_type,
                                #version=collo.version)[0]
        #if ctx['viirs_granule'] != ctx['cris_granule']:
            #raise WorkflowNotReady('TPW: Granule times must be the same', ctx)

        #task.input('v2c', v2c.dataset('out').product(ctx))

    def build_task_snpp(self, context, task):
        '''
        Build up a set of inputs for a single context
        '''

        LOG.debug("context = {}".format(context))

        self._add_viirs_l1b_geo_input(context, task)
        self._add_viirs_l1b_m_input(context, task)
        self._add_cris_l1b_input(context, task)

    def build_task(self, context, task):

        satellite = context['satellite']

        if satellite == 'snpp':
            self.build_task_snpp(context, task)
        elif satellite == 'aqua':
            self.build_task_aqua(context, task)
        else:
            raise ValueError('Invalid satellite: {}'.format(satellite))

    def mend_viirs_l1b(self, geo, l1b):

        out_fn = basename(l1b).replace('.nc', '.bowtie_restored.nc')
        LOG.info('out_fn = {}'.format(out_fn))
        shutil.copy(l1b, out_fn)

        viirsmend = support_software.lookup('viirsmend', version='1.2.12')
        py_exe = pjoin(viirsmend.path,'bin/python')
        viirsmend_exe = pjoin(viirsmend.path,'bin/viirsl1mend')
        cmd = '{} {} {} {}'.format(py_exe, viirsmend_exe, out_fn, geo)

        LOG.info('cmd = {}'.format(cmd))
        runscript(cmd, requirements=[viirsmend])

        return out_fn

    def cris_viirs_collocation(self, inputs):

        LOG.info('inputs = {}'.format(inputs))
        input_keys = inputs.keys()
        input_keys.sort()

        sounder_keys = [key for key in input_keys if 'sounder' in key]
        cris_files = [inputs[key] for key in sounder_keys]

        vgeom_file = inputs['geo']

        crisviirs = support_software.lookup('collopak')
        crisviirs_exe = pjoin(crisviirs.path,'bin/crisviirs')
        for cris_file in cris_files:
            cmd = '{} {} {} > /dev/null'.format(crisviirs_exe, cris_file, vgeom_file)

            LOG.info('cmd = {}'.format(cmd))
            runscript(cmd, requirements=[])

        collo_files = glob('colloc.*.nc')
        collo_files.sort()

        return collo_files

    def airs_modis_collocation(self, inputs):

        LOG.info('inputs = {}'.format(inputs))
        input_keys = inputs.keys()
        input_keys.sort()

        sounder_keys = [key for key in input_keys if 'sounder' in key]
        airs_files = [inputs[key] for key in sounder_keys]

        modis_file = inputs['geo']

        airsmodis = support_software.lookup('collopak')
        airsmodis_exe = pjoin(airsmodis.path,'bin/airsmod')
        for airs_file in airs_files:
            cmd = '{} {} {} > /dev/null'.format(airsmodis_exe, airs_file, modis_file)

            LOG.info('cmd = {}'.format(cmd))
            runscript(cmd, requirements=[])

        collo_files = glob('colloc.*.nc')
        collo_files.sort()

        return collo_files

    def run_fusion_matlab(self, geo_file, l1b_file, sounder_files, collo_files, **kwargs):
        '''
        Run the Matlab fusion binary on the input level-1b files to generate the *.mat output file.
        '''

        bin_dir = kwargs['bin_dir']
        anc_dir = kwargs['anc_dir']
        fusion_binary = kwargs['fusion_binary']
        out_dir = kwargs['out_dir']
        matlab_file_glob = kwargs['matlab_file_glob']
        matlab_file_dt_filespec = kwargs['matlab_file_dt_filespec']
        env = kwargs['env']
        granule = kwargs['granule']
        satellite = kwargs['satellite']

        rc_fusion = 0

        #run matlab
        cmd = '{}/{} {} {} {} {} {} {}'.format(
            bin_dir,
            fusion_binary,
            support_software.lookup('matlab', '2015b').path,
            geo_file,
            l1b_file,
            ' '.join(sounder_files),
            ' '.join(collo_files),
            anc_dir
            )

        # Create the output directory
        current_dir = os.getcwd()
        LOG.debug('The current directory is {}'.format(current_dir))
        create_dir(out_dir)

        # Run the Matlab Fusion code
        try:
            LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t')))
            rc_fusion = 0
            runscript(cmd, requirements=[], env=env)
        except CalledProcessError as err:
            rc_fusion = err.returncode
            LOG.error("Matlab binary {} returned a value of {}".format(fusion_binary, rc_fusion))
            return rc_fusion, []

        # Move matlab file to the output directory
        matlab_file = glob(matlab_file_glob)
        if len(matlab_file) != 0:
            matlab_file = matlab_file[0]
            LOG.debug('Found Matlab file "{}", moving to {}...'.format(matlab_file, out_dir))
            new_matlab_name = datetime.strftime(granule, matlab_file_dt_filespec)
            shutil.move(matlab_file, pjoin(out_dir, new_matlab_name))
            matlab_file = glob(pjoin(out_dir, new_matlab_name))[0]
        else:
            LOG.error('There are no Matlab files "{}" to convert, aborting'.format(matlab_file_glob))
            rc_fusion = 1
            return rc_fusion, []

        return rc_fusion, matlab_file


    def convert_matlab_to_netcdf(self, matlab_file, l1b_file, **kwargs):
        '''
        Transcode the Matlab *.mat file into a CF-compliant HDF4 (aqua) or NetCDF4 (snpp) file.
        '''

        py_interp = kwargs['py_interp']
        bin_dir = kwargs['bin_dir']
        anc_dir = kwargs['anc_dir']
        out_dir = kwargs['out_dir']
        conversion_bin = kwargs['conversion_bin']
        env = kwargs['env']
        satellite = kwargs['satellite']

        rc_fusion = 0

        # Create the output directory
        current_dir = os.getcwd()

        # Copy the un-fused level-1b file to the work directory as a template...
        unfused_l1b_file = pjoin(out_dir, 'unfused', basename(l1b_file))
        unfused_l1b_dir = dirname(unfused_l1b_file)
        create_dir(unfused_l1b_dir)

        if exists(unfused_l1b_file):
            LOG.debug('{} exists, removing...'.format(unfused_l1b_file))
            os.remove(unfused_l1b_file)

        LOG.debug('Copying {} to {}'.format(l1b_file, unfused_l1b_file))
        shutil.copy(l1b_file, unfused_l1b_file)

        # Removing the fused file if it exists
        fused_l1b_file = pjoin(out_dir, basename(unfused_l1b_file))
        if exists(fused_l1b_file):
            LOG.debug('{} exists, removing...'.format(fused_l1b_file))
            os.remove(fused_l1b_file)

        cmd = '{} {}  {} {} {}'.format(
                py_interp,
                conversion_bin,
                unfused_l1b_file,
                matlab_file,
                out_dir
                )

        # Convert the Matlab file to the desired format...
        try:
            LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t')))
            rc_fusion = 0
            runscript(cmd, requirements=[], env=env)
        except CalledProcessError as err:
            rc_fusion = err.returncode
            LOG.error("CF converter {} returned a value of {}".format(conversion_bin, rc_fusion))
            return rc_fusion, []

        # Determine success...
        fused_l1b_file = glob(fused_l1b_file)
        if len(fused_l1b_file) != 0:
            fused_l1b_file = fused_l1b_file[0]
            LOG.debug('Found final fused output file "{}"'.format(fused_l1b_file))
        else:
            LOG.error('There is no fused file {}, aborting'.format(fused_l1b_file))
            rc_fusion = 1
            return rc_fusion, []

        # Remove the unfused dir...
        LOG.debug('Removing the unfused level-1b dir {} ...'.format(unfused_l1b_dir))
        shutil.rmtree(unfused_l1b_dir)

        # Move the final fused file to the work directory
        if satellite=='snpp':
            fused_l1b_file_new = 'VNP02FSN.{}.CTIME.nc'.format('.'.join(l1b_file.split('.')[1:4]))
        if satellite=='aqua':
            fused_l1b_file_new = 'MYD02FSN.{}.CTIME.hdf'.format('.'.join(l1b_file.split('.')[1:4]))

        # Create a common creation timestamp for the matlab and HDF4/NetCDF4 files
        dt_create = datetime.utcnow()

        LOG.debug('Current dir is {}'.format(os.getcwd()))

        # Move the HDF4/NetCDF4 file to its new filename
        fused_l1b_file_new = fused_l1b_file_new.replace('CTIME', dt_create.strftime('%Y%j%H%M%S'))
        LOG.debug('Moving "{}" to "{}" ...'.format(fused_l1b_file, pjoin(current_dir, fused_l1b_file_new)))
        shutil.move(fused_l1b_file, pjoin(current_dir, fused_l1b_file_new))
        fused_l1b_file = glob(pjoin(current_dir, fused_l1b_file_new))[0]

        # Move the matlab file to its new filename
        matlab_file_new = basename(matlab_file).replace(
                '.mat', '{}.mat'.format(dt_create.strftime('.%Y%j%H%M%S')))
        LOG.debug('Moving "{}" to {}...'.format(matlab_file, pjoin(current_dir, matlab_file_new)))
        shutil.move(matlab_file, pjoin(current_dir, matlab_file_new))
        matlab_file = glob(pjoin(current_dir, matlab_file_new))[0]

        # Remove the fused_outputs directory
        LOG.debug('Removing the fused_outputs dir {} ...'.format(out_dir))
        shutil.rmtree(out_dir)

        return rc_fusion, fused_l1b_file

    def output_QC(self, l1b_file, fused_l1b_file, band=None, input_rms=0.2, **kwargs):

        satellite = kwargs['satellite']
        LOG.debug('satellite = {}'.format(satellite))

        band_default = {'aqua':32, 'snpp':15}
        if band is None:
            band = band_default[satellite]

        input_files = [fused_l1b_file] if satellite=='snpp' else [l1b_file, fused_l1b_file]

        generators = list(SFX[splitext(p)[-1].lower()](p, band) for p in input_files)
        stuff = list(chain(*generators))
        if len(stuff) != 2:
            raise AssertionError("requires original + fusion input")
        passfail, rms = detect(input_rms, *stuff)

        if passfail:
            LOG.info("Output QC PASS (rms {} < threshold {})".format(rms, input_rms))
        else:
            LOG.warn("Output QC FAIL (rms {} > threshold {})".format(rms, input_rms))

        return 0 if passfail else 1


    def update_global_attrs(self, netcdf_file, readme_file, **kwargs):

        satellite = kwargs['satellite']

        # Get the git repo information
        repo_attrs = []
        try:
            LOG.debug('Opening {}...'.format(readme_file))
            readme_obj = open(readme_file, 'ro')
            line_obj = readme_obj.readlines()
            for idx, line in enumerate(line_obj):
                if '.git' in line:
                    repo_line = line.lstrip(' -*,*').rstrip(' -*,;')
                    commit_line = line_obj[idx+1].lstrip(' -*,;').rstrip(' -*,;')
                    git_line = '{}; {}'.format(repo_line, commit_line).replace('\n', '')
                    LOG.debug('{}'.format(git_line))
                    repo_attrs.append(git_line)
        except Exception:
            LOG.debug(traceback.format_exc())

        readme_obj.close()

        # Update the various file global attributes
        LOG.debug('Adding attributes to {} ...'.format(netcdf_file))
        if netcdf_file.split('.')[-1] == 'nc':
            args = (netcdf_file, "a")
            kwargs = {'format': "NETCDF4"}
            file_open = Dataset
        elif netcdf_file.split('.')[-1] == 'hdf':
            args = (netcdf_file, SDC.WRITE)
            kwargs = {}
            file_open = SD

        try:

            file_obj = file_open(*args, **kwargs)

            # Update the attributes, moving to the end
            for idx, attr in enumerate(repo_attrs):
                LOG.debug('{}'.format(attr))
                setattr(file_obj, 'source_git_repo_{}'.format(idx), attr)

            setattr(file_obj, 'SIPS_version', basename(dirname(readme_file)))

        except Exception:
            LOG.warning("\tProblem setting attributes in output file {}".format(netcdf_file))
            LOG.debug(traceback.format_exc())

        if netcdf_file.split('.')[-1] == 'nc':
            file_obj.close()
        elif netcdf_file.split('.')[-1] == 'hdf':
            file_obj.end()

        return

    def prepare_env(self, dist_root, inputs, context):
        LOG.debug("Running prepare_env()...")

        LOG.debug("package_root = {}".format(self.package_root))
        LOG.debug("dist_root = {}".format(dist_root))

        env = dict(os.environ)
        envroot = pjoin(dist_root, 'env')

        LOG.debug("envroot = {}".format(envroot))

        env['LD_LIBRARY_PATH'] = ':'.join([pjoin(envroot, 'lib'),
                                           pjoin(dist_root, 'lib')])
        env['PATH'] = ':'.join([pjoin(envroot, 'bin'),
                                pjoin(dist_root, 'bin'),
                                '/usr/bin:/bin'])

        LOG.debug("env['PATH'] = \n\t{}".format(env['PATH'].replace(':','\n\t')))
        LOG.debug("env['LD_LIBRARY_PATH'] = \n\t{}".format(env['LD_LIBRARY_PATH'].replace(':','\n\t')))

        return env

    def run_task(self, inputs, context):

        LOG.debug("Running run_task()...")

        for key in context.keys():
            LOG.debug("run_task() context['{}'] = {}".format(key, context[key]))

        granule = context['granule']
        satellite = context['satellite']
        delivery_id = context['version']

        # Get the location of the binary package
        delivery = delivered_software.lookup('fusion_matlab', delivery_id=delivery_id)
        dist_root = pjoin(delivery.path, 'dist')
        envroot = pjoin(dist_root, 'env')

        # Get the required  environment variables
        env = self.prepare_env(dist_root, inputs, context)

        # What is the path of the python interpreter
        py_interp = "{}/bin/python".format(envroot)
        LOG.debug("py_interp = '{}'".format(py_interp))

        # Where are we running the package
        work_dir = abspath(curdir)
        LOG.debug("working dir = {}".format(work_dir))

        # What are out inputs?
        for input in inputs.keys():
            inputs_dir = dirname(inputs[input])
            LOG.debug("inputs['{}'] = {}".format(input,inputs[input]))
        LOG.debug("Inputs dir = {}".format(inputs_dir))

        # Run viirsmend on the viirs level-1b, and generate the CrIS/VIIRS collocation
        if satellite == 'snpp':
            geo = inputs['geo']
            l1b = self.mend_viirs_l1b(inputs['geo'], inputs['l1b'])
            sounder_keys = [key for key in inputs.keys() if 'sounder' in key]
            sounder = [inputs[key] for key in sounder_keys]
            collo = self.cris_viirs_collocation(inputs)

        # Generate the AIRS/MODIS collocation
        if satellite == 'aqua':
            geo = inputs['geo']
            l1b = inputs['l1b']
            sounder_keys = [key for key in inputs.keys() if 'sounder' in key]
            sounder = [inputs[key] for key in sounder_keys]
            collo = self.airs_modis_collocation(inputs)

        LOG.info('geo = {}'.format(geo))
        LOG.info('l1b = {}'.format(l1b))
        LOG.info('sounder = {}'.format(sounder))
        LOG.info('collo = {}'.format(collo))

        bin_dir = pjoin(dist_root, 'bin')
        anc_dir = pjoin(dist_root, 'luts')
        out_dir = pjoin(work_dir, 'fused_outputs')

        # Setup the require keyword arguments for the fusion_matlab package
        kwargs = {}
        kwargs['py_interp'] = py_interp
        kwargs['bin_dir'] = bin_dir
        kwargs['anc_dir'] = anc_dir
        kwargs['env'] = env
        kwargs['out_dir'] = out_dir
        kwargs['satellite'] = satellite
        kwargs['granule'] = granule

        if satellite=='snpp':
            kwargs['fusion_binary'] = 'run_imagersounderfusion_V.sh'
            kwargs['matlab_file_glob'] = 'fusion_viirs_*.mat'
            kwargs['matlab_file_dt_str'] = '%Y%m%d_t%H%M%S'
            kwargs['matlab_file_dt_filespec'] = 'fusion_viirs_{}.mat'.format(kwargs['matlab_file_dt_str'])
            kwargs['conversion_bin'] = pjoin(envroot, 'bin', 'l1b-fusion-viirs-cris')
        elif satellite=='aqua':
            kwargs['fusion_binary'] = 'run_imagersounderfusion_M.sh'
            kwargs['matlab_file_glob'] = 'fusion_modis_*.mat'
            kwargs['matlab_file_dt_str'] = '%Y%j.%H%M'
            kwargs['matlab_file_dt_filespec'] = 'fusion_modis_{}.mat'.format(kwargs['matlab_file_dt_str'])
            kwargs['conversion_bin'] = pjoin(envroot, 'bin', 'l1b-fusion-modis-airs')
        else:
            return {}

        # Run the fusion_matlab package
        rc_fusion, matlab_file = self.run_fusion_matlab(
                                                   geo,
                                                   l1b,
                                                   sounder,
                                                   collo,
                                                   **kwargs
                                                  )

        LOG.debug('run_fusion_matlab() return value: {}'.format(rc_fusion))
        LOG.info('run_fusion_matlab() generated {}'.format(matlab_file))

        # Now that we've computed the Matlab file, convert to a NetCDF file...
        rc_fusion, fused_l1b_file = self.convert_matlab_to_netcdf(matlab_file,
                                                                  l1b,
                                                                  **kwargs)

        LOG.debug('convert_matlab_to_netcdf() return value: {}'.format(rc_fusion))
        LOG.info('convert_matlab_to_netcdf() generated {}'.format(fused_l1b_file))

        # Update some global attributes in the output file
        readme_file =  pjoin(delivery.path, 'README.txt')
        self.update_global_attrs(basename(fused_l1b_file), readme_file, **kwargs)

        # Run a QC check on the output file
        rc_qc = self.output_QC(l1b, fused_l1b_file, **kwargs)
        LOG.debug('output_QC() return value: {}'.format(rc_qc))
        if rc_qc != 0:
            raise WorkflowNotReady('Output fusion file {} failed RMS error QC check, output aborted.'.format(fused_l1b_file))

        # The staging routine assumes that the output file is located in the work directory
        # "tmp******", and that the output path is to be prepended, so return the basename.

        out_fn = basename(fused_l1b_file)

        return {'fused_l1b': out_fn}