Skip to content
Snippets Groups Projects
Commit 68647efc authored by Geoff Cureton's avatar Geoff Cureton
Browse files

Updated the fusion_matlab glue code to use the latest fusion_matlab delivery,...

Updated the fusion_matlab glue code to use the latest fusion_matlab delivery, which includes a computation to compute the daily fusion quicklooks.
parent 214997f3
No related branches found
No related tags found
No related merge requests found
fusion_matlab: Python code required to run the fusion_matlab package in the Atmosphere-SIPS.
Copyright (C) 2017 Geoff P. Cureton
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
...@@ -23,7 +23,8 @@ comp = FUSION_MATLAB() ...@@ -23,7 +23,8 @@ comp = FUSION_MATLAB()
satellite = 'snpp' satellite = 'snpp'
#satellite = 'aqua' #satellite = 'aqua'
delivery_id = '20180225-1' #delivery_id = '20180225-1'
#delivery_id = '20180620-1'
version = '1.0dev2' version = '1.0dev2'
......
import sys
import traceback
import logging
from timeutil import TimeInterval, datetime, timedelta
from flo.ui import local_prepare, local_execute
from flo.sw.fusion_matlab import FUSION_MATLAB_QL
from flo.sw.fusion_matlab.utils import setup_logging
# every module should have a LOG object
LOG = logging.getLogger(__name__)
comp = FUSION_MATLAB_QL()
#
# Local execution
#
# General information
#granule = datetime(2018, 2, 2, 0)
#interval = TimeInterval(granule, granule+timedelta(minutes=0))
satellite = 'snpp'
#satellite = 'aqua'
delivery_id = '20180620-1'
version = '1.0dev2'
def local_execute_example(interval, satellite, version, skip_prepare=False, skip_execute=False, verbosity=2):
setup_logging(verbosity)
if satellite == 'snpp':
LOG.info("We are doing NPP...")
elif satellite == 'aqua':
LOG.info("We are doing AQUA...")
else:
LOG.error("Invalid satellite.")
# Get the required context...
contexts = comp.find_contexts(interval, satellite, version)
if len(contexts) != 0:
LOG.info("Candidate contexts in interval...")
for context in contexts:
print("\t{}".format(context))
try:
if not skip_prepare:
LOG.info("Running fusion_matlab local_prepare()...")
LOG.info("Preparing context... {}".format(contexts[0]))
local_prepare(comp, contexts[0])
if not skip_execute:
LOG.info("Running local_execute()...")
LOG.info("Running context... {}".format(contexts[0]))
local_execute(comp, contexts[0])
except Exception, err:
LOG.error("{}".format(err))
LOG.debug(traceback.format_exc())
else:
LOG.error("There are no valid {} contexts for the interval {}.".format(satellite, interval))
def print_contexts(interval, satellite, version, verbosity=2):
setup_logging(verbosity)
contexts = comp.find_contexts(interval, satellite, version)
for context in contexts:
print context
...@@ -651,6 +651,8 @@ class FUSION_MATLAB(Computation): ...@@ -651,6 +651,8 @@ class FUSION_MATLAB(Computation):
dist_root = pjoin(delivery.path, 'dist') dist_root = pjoin(delivery.path, 'dist')
envroot = pjoin(dist_root, 'env') envroot = pjoin(dist_root, 'env')
LOG.info("dist_root = '{}'".format(dist_root))
# Get the required environment variables # Get the required environment variables
env = self.prepare_env(dist_root, inputs, context) env = self.prepare_env(dist_root, inputs, context)
...@@ -778,21 +780,275 @@ class FUSION_MATLAB(Computation): ...@@ -778,21 +780,275 @@ class FUSION_MATLAB(Computation):
if satellite == 'aqua': if satellite == 'aqua':
out_compress = hdf_compress out_compress = hdf_compress
#l1_version = 'ingest'
#input_fns = [l1b, geo]
#lut_version = ''
#lut_created = granule
#ancillary_fns = []
LOG.debug('We are in {}'.format(os.getcwd())) LOG.debug('We are in {}'.format(os.getcwd()))
LOG.debug('Compressing {}'.format(out_fn)) LOG.debug('Compressing {}'.format(out_fn))
return {'fused_l1b': out_compress(out_fn)} return {'fused_l1b': out_compress(out_fn)}
#return {'fused_l1b': out_fn}
#return { class FUSION_MATLAB_QL(Computation):
#'fused_l1b': {
#'file': out_fn, parameters = ['granule', 'satellite', 'version']
#'extra_attrs': extra_attrs,
#}, outputs = ['fused_l1b_ql_band27_asc', 'fused_l1b_ql_band27_desc', 'fused_l1b_ql_band33_asc', 'fused_l1b_ql_band33_desc']
#}
def find_contexts(self, time_interval, satellite, version):
'''
Takes the input time interval, and returns whatever 0Z datetimes fall within the interval.
'''
if satellite=='snpp':
granule_length = timedelta(minutes=6)
elif satellite=='aqua':
granule_length = timedelta(minutes=5)
else:
return []
LOG.debug('interval = {}'.format(time_interval))
LOG.debug('satellite = {}'.format(satellite))
LOG.debug('version = {}'.format(version))
days = [day.left for day in time_interval.overlapping_interval_series(timedelta(days=1))]
return [{'satellite': satellite, 'version': version, 'granule': day} for day in days]
def _add_viirs_l1b_geo_input(self, product, context, task):
satellite = context['satellite']
granule = context['granule']
version = product.input('viirs_l1').version
input_name = sipsprod.satellite_esdt('V03MOD', satellite)
interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version))
vgeom = dawg_catalog.files(satellite, input_name, interval, version=version)
if vgeom == []:
raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format(
input_name, version, interval))
for idx, geo_file in enumerate(vgeom):
LOG.debug('V03MOD granule {}: {} -> {}'.format(idx, geo_file.begin_time, geo_file.end_time))
task.input('geo_{}'.format(idx), geo_file)
def _add_cris_viirs_fusion_l1b_input(self, product, context, task):
satellite = context['satellite']
granule = context['granule']
version = product.input('V02FSN').version
input_name = sipsprod.satellite_esdt('V02FSN', satellite)
interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version))
vl1b = dawg_catalog.files(satellite, input_name, interval, version=version)
if vl1b == []:
raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format(
input_name, version, interval))
for idx, l1b_file in enumerate(vl1b):
LOG.debug('V02FSN granule {}: {} -> {}'.format(idx, l1b_file.begin_time, l1b_file.end_time))
task.input('l1b_{}'.format(idx), l1b_file)
def build_task_snpp(self, context, task):
'''
Build up a set of inputs for a single context
'''
LOG.debug("Ingesting inputs for V02FSN_DailyQL version {} ...".format(context['version']))
# Get the product definition for 'V02FSN'
product = sipsprod.lookup_product_recurse('V02FSN_DailyQL', version=context['version'])
# Ingest the required inputs, defined in the VNP02 product definition for context['version']
self._add_viirs_l1b_geo_input(product, context, task)
self._add_cris_viirs_fusion_l1b_input(product, context, task)
# Make the product definition available to build_task()
task.option('product', product)
@reraise_as(WorkflowNotReady, FileNotFound, prefix='FUSION_MATLAB_QL')
def build_task(self, context, task):
satellite = context['satellite']
if satellite == 'snpp':
self.build_task_snpp(context, task)
elif satellite == 'aqua':
raise ValueError('Satellite option "{}" not yet implemented.'.format(satellite))
else:
raise ValueError('Invalid satellite: {}'.format(satellite))
def prepare_env(self, dist_root, inputs, context):
LOG.debug("Running prepare_env()...")
LOG.debug("package_root = {}".format(self.package_root))
LOG.debug("dist_root = {}".format(dist_root))
env = dict(os.environ)
envroot = pjoin(dist_root, 'env')
LOG.debug("envroot = {}".format(envroot))
env['LD_LIBRARY_PATH'] = ':'.join([pjoin(envroot, 'lib'),
pjoin(dist_root, 'lib')])
env['PATH'] = ':'.join([pjoin(envroot, 'bin'),
pjoin(dist_root, 'bin'),
'/usr/bin:/bin'])
LOG.debug("env['PATH'] = \n\t{}".format(env['PATH'].replace(':','\n\t')))
LOG.debug("env['LD_LIBRARY_PATH'] = \n\t{}".format(env['LD_LIBRARY_PATH'].replace(':','\n\t')))
return env
def run_fusion_quicklooks(self, fsn_dir, geo_dir, **kwargs):
bin_dir = kwargs['bin_dir']
fusion_ql_binary = kwargs['fusion_ql_binary']
granule = kwargs['granule']
satellite = kwargs['satellite']
env = kwargs['env']
dt = granule
year = dt.utctimetuple().tm_year
jday = dt.utctimetuple().tm_yday
rc_fusion_ql = 0
# Get the matlab runtim version that we require
matlab_version = '2015b'
#matlab_version = product.input('fusion_matlab').options['matlab_version']
#run matlab
#cmd = '{}/{} {} {} {} {}/ {}/ >> fusion_quicklooks.log'.format(
cmd = '{}/{} {} {} {} {}/ {}/'.format(
bin_dir,
fusion_ql_binary,
support_software.lookup('matlab', matlab_version).path,
year,
jday,
fsn_dir,
geo_dir
)
# Run the Matlab Fusion Quicklook code
try:
LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t')))
rc_fusion_ql = 0
runscript(cmd, requirements=[], env=env)
except CalledProcessError as err:
rc_fusion_ql = err.returncode
LOG.error("Matlab binary {} returned a value of {}".format(fusion_ql_binary, rc_fusion_ql))
return rc_fusion_ql, []
# Move matlab file to the output directory
orig_fusion_ql_files = glob('*.png')
if len(orig_fusion_ql_files) != 0:
LOG.info('Found Fusion quicklook files {}.'.format(
', '.join([basename(x) for x in orig_fusion_ql_files])
))
else:
LOG.error('There are no Fusion quicklook files "*.png", aborting')
rc_fusion_ql = 1
return rc_fusion_ql
fusion_ql_files = []
for orig_fusion_ql_file in orig_fusion_ql_files:
filename_chunks = splitext(orig_fusion_ql_file)[0].split('_')
fusion_ql_files.append(
'{}.A{}.{}.{}.png'.format(
sipsprod.satellite_esdt('V02FSN', satellite),
granule.strftime('%Y%j'),
filename_chunks[1],
filename_chunks[3]
)
)
for oldfile, newfile in zip(orig_fusion_ql_files, fusion_ql_files):
shutil.move(oldfile, newfile)
return rc_fusion_ql, fusion_ql_files
@reraise_as(WorkflowNotReady, FileNotFound, prefix='FUSION_MATLAB_QL')
def run_task(self, inputs, context):
LOG.debug("Running run_task()...")
for key in context.keys():
LOG.debug("run_task() context['{}'] = {}".format(key, context[key]))
granule = context['granule']
satellite = context['satellite']
version = context['version']
# Get the location of the binary package
product = context['product']
delivery = delivered_software.lookup(
'fusion_matlab', delivery_id=product.input('fusion_matlab').version)
dist_root = pjoin(delivery.path, 'dist')
envroot = pjoin(dist_root, 'env')
LOG.info("dist_root = '{}'".format(dist_root))
# Get the required environment variables
env = self.prepare_env(dist_root, inputs, context)
# What is the path of the python interpreter
py_interp = "{}/bin/python".format(envroot)
LOG.debug("py_interp = '{}'".format(py_interp))
bin_dir = pjoin(dist_root, 'bin')
# Where are we running the package
work_dir = abspath(curdir)
LOG.debug("working dir = {}".format(work_dir))
# What are our inputs?
for input in inputs.keys():
inputs_dir = dirname(inputs[input])
LOG.debug("inputs['{}'] = {}".format(input,inputs[input]))
LOG.debug("Inputs dir = {}".format(inputs_dir))
current_dir = os.getcwd()
geo_dir = pjoin(current_dir,'GEO')
fsn_dir = pjoin(current_dir,'FSN')
create_dir(geo_dir)
create_dir(fsn_dir)
os.chdir(geo_dir)
geo_keys = [key for key in inputs.keys() if 'geo' in key]
geo_inputs = {key: inputs[key] for key in geo_keys}
LOG.info(geo_inputs)
symlink_inputs_to_working_dir(geo_inputs)
os.chdir(current_dir)
os.chdir(fsn_dir)
fsn_keys = [key for key in inputs.keys() if 'l1b' in key]
fsn_inputs = {key: inputs[key] for key in fsn_keys}
LOG.info(fsn_inputs)
symlink_inputs_to_working_dir(fsn_inputs)
os.chdir(current_dir)
# Setup the require keyword arguments for the fusion_matlab package
kwargs = {}
kwargs['py_interp'] = py_interp
kwargs['bin_dir'] = bin_dir
kwargs['env'] = env
kwargs['satellite'] = satellite
kwargs['granule'] = granule
kwargs['fusion_ql_binary'] = 'run_plot_globalVIIRSfusion_fct.sh'
# Run the fusion_matlab package
rc_fusion_ql, fusion_ql_files = self.run_fusion_quicklooks(
fsn_dir,
geo_dir,
**kwargs
)
extra_attrs = {
'begin_time': context['granule'],
'end_time': context['granule']+timedelta(days=1)-timedelta(seconds=1)
}
LOG.debug('extra_attrs = {}'.format(extra_attrs))
LOG.debug('run_fusion_quicklooks() return value: {}'.format(rc_fusion_ql))
LOG.info('run_fusion_quicklooks() generated {}'.format(fusion_ql_files))
return {
'fused_l1b_ql_band27_asc': {'file': [x for x in fusion_ql_files if 'Band27.asc' in x][0], 'extra_attrs': extra_attrs},
'fused_l1b_ql_band27_desc': {'file': [x for x in fusion_ql_files if 'Band27.desc' in x][0], 'extra_attrs': extra_attrs},
'fused_l1b_ql_band33_asc': {'file': [x for x in fusion_ql_files if 'Band33.asc' in x][0], 'extra_attrs': extra_attrs},
'fused_l1b_ql_band33_desc': {'file': [x for x in fusion_ql_files if 'Band33.desc' in x][0], 'extra_attrs': extra_attrs},
}
...@@ -14,11 +14,16 @@ Licensed under GNU GPLv3. ...@@ -14,11 +14,16 @@ Licensed under GNU GPLv3.
import os import os
import sys import sys
import string import string
from copy import copy
import shutil
import traceback import traceback
import logging import logging
import time
from glob import glob
from copy import copy
import shutil
from os.path import basename, dirname, abspath, isdir, isfile, exists, join as pjoin
import fileinput
from datetime import datetime
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -44,6 +49,16 @@ def setup_logging(verbosity): ...@@ -44,6 +49,16 @@ def setup_logging(verbosity):
format=console_logFormat, format=console_logFormat,
datefmt=dateFormat) datefmt=dateFormat)
def _replaceAll(intputfile, searchExp, replaceExp):
'''
Replace all instances of 'searchExp' with 'replaceExp' in 'intputfile'
'''
for line in fileinput.input(intputfile, inplace=1):
if searchExp in line:
line = line.replace(searchExp, replaceExp)
sys.stdout.write(line)
fileinput.close()
def cleanup(work_dir, objs_to_remove): def cleanup(work_dir, objs_to_remove):
""" """
cleanup work directiory cleanup work directiory
...@@ -51,10 +66,10 @@ def cleanup(work_dir, objs_to_remove): ...@@ -51,10 +66,10 @@ def cleanup(work_dir, objs_to_remove):
""" """
for file_obj in objs_to_remove: for file_obj in objs_to_remove:
try: try:
if os.path.isdir(file_obj): if isdir(file_obj):
LOG.debug('Removing directory: {}'.format(file_obj)) LOG.debug('Removing directory: {}'.format(file_obj))
shutil.rmtree(file_obj) shutil.rmtree(file_obj)
elif os.path.isfile(file_obj): elif isfile(file_obj):
LOG.debug('Removing file: {}'.format(file_obj)) LOG.debug('Removing file: {}'.format(file_obj))
os.unlink(file_obj) os.unlink(file_obj)
except Exception: except Exception:
...@@ -66,9 +81,9 @@ def link_files(dest_path, files): ...@@ -66,9 +81,9 @@ def link_files(dest_path, files):
''' '''
files_linked = 0 files_linked = 0
for src_file in files: for src_file in files:
src = os.path.basename(src_file) src = basename(src_file)
dest_file = os.path.join(dest_path, src) dest_file = pjoin(dest_path, src)
if not os.path.exists(dest_file): if not exists(dest_file):
LOG.debug("Link {0} -> {1}".format(src_file, dest_file)) LOG.debug("Link {0} -> {1}".format(src_file, dest_file))
os.symlink(src_file, dest_file) os.symlink(src_file, dest_file)
files_linked += 1 files_linked += 1
...@@ -135,21 +150,23 @@ def create_dir(dir): ...@@ -135,21 +150,23 @@ def create_dir(dir):
try: try:
if returned_dir is not None: if returned_dir is not None:
returned_dir_path = os.path.dirname(returned_dir) returned_dir_path = dirname(returned_dir)
returned_dir_base = os.path.basename(returned_dir) returned_dir_base = basename(returned_dir)
LOG.debug("returned_dir_path = {}".format(returned_dir_path)) LOG.debug("returned_dir_path = {}".format(returned_dir_path))
LOG.debug("returned_dir_base = {}".format(returned_dir_base)) LOG.debug("returned_dir_base = {}".format(returned_dir_base))
returned_dir_path = '.' if returned_dir_path=="" else returned_dir_path
LOG.debug("returned_dir_path = {}".format(returned_dir_path))
# Check if a directory and has write permissions... # Check if a directory and has write permissions...
if not os.path.exists(returned_dir) and os.access(returned_dir_path, os.W_OK): if not exists(returned_dir) and os.access(returned_dir_path, os.W_OK):
LOG.debug("Creating directory {} ...".format(returned_dir)) LOG.debug("Creating directory {} ...".format(returned_dir))
os.makedirs(returned_dir) os.makedirs(returned_dir)
# Check if the created dir has write permissions # Check if the created dir has write permissions
if not os.access(returned_dir, os.W_OK): if not os.access(returned_dir, os.W_OK):
msg = "Created dir {} is not writable.".format(returned_dir) msg = "Created dir {} is not writable.".format(returned_dir)
raise SipsEnvironment(msg) raise SipsEnvironment(msg)
elif os.path.exists(returned_dir): elif exists(returned_dir):
LOG.debug("Directory {} exists...".format(returned_dir)) LOG.debug("Directory {} exists...".format(returned_dir))
if not(os.path.isdir(returned_dir) and os.access(returned_dir, os.W_OK)): if not(isdir(returned_dir) and os.access(returned_dir, os.W_OK)):
msg = "Existing dir {} is not writable.".format(returned_dir) msg = "Existing dir {} is not writable.".format(returned_dir)
raise SipsEnvironment(msg) raise SipsEnvironment(msg)
else: else:
......
#!/usr/bin/env python #!/usr/bin/env python
# encoding: utf-8 # encoding: utf-8
"""
Purpose: Run the fusion_matlab package
Copyright (c) 2015 University of Wisconsin Regents.
Licensed under GNU GPLv3.
"""
import sys import sys
import traceback import traceback
import calendar import calendar
...@@ -65,7 +71,7 @@ comp = FUSION_MATLAB() ...@@ -65,7 +71,7 @@ comp = FUSION_MATLAB()
LOG.info("Submitting intervals...") LOG.info("Submitting intervals...")
dt = datetime.utcnow() dt = datetime.utcnow()
log_name = '/home/flo/geoffc/fusion_matlab_logs/fusion_matlab_{}_s{}_e{}_c{}.log'.format( log_name = 'fusion_matlab_{}_s{}_e{}_c{}.log'.format(
satellite, satellite,
intervals[0].left.strftime('%Y%m%d%H%M'), intervals[0].left.strftime('%Y%m%d%H%M'),
intervals[-1].right.strftime('%Y%m%d%H%M'), intervals[-1].right.strftime('%Y%m%d%H%M'),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment