Skip to content
Snippets Groups Projects
Commit 10c2a779 authored by Geoff Cureton's avatar Geoff Cureton
Browse files

Now can handle different product versions on a per-satellite basis. Uses...

Now can handle different product versions on a per-satellite basis. Uses updated CF-converter package which now generates a candidate fusion nc4 files from a CDL file.
parent 1a8c7e62
Branches
No related tags found
No related merge requests found
...@@ -21,10 +21,8 @@ comp = FUSION_MATLAB() ...@@ -21,10 +21,8 @@ comp = FUSION_MATLAB()
#granule = datetime(2015, 4, 17, 14) #granule = datetime(2015, 4, 17, 14)
#interval = TimeInterval(granule, granule+timedelta(minutes=0)) #interval = TimeInterval(granule, granule+timedelta(minutes=0))
satellite = 'snpp' satellite, version = 'snpp', '1.0.0dev1'
#satellite = 'noaa20' satellite, version = 'noaa20', '1.0.1dev1'
#satellite = 'aqua'
version = '1.0dev10' # base VIIRS level-1b
def local_execute_example(interval, satellite, version, skip_prepare=False, skip_execute=False, verbosity=2): def local_execute_example(interval, satellite, version, skip_prepare=False, skip_execute=False, verbosity=2):
......
...@@ -53,7 +53,7 @@ import traceback ...@@ -53,7 +53,7 @@ import traceback
from glob import glob from glob import glob
from itertools import chain from itertools import chain
from subprocess import check_output, check_call, CalledProcessError from subprocess import check_output, check_call, CalledProcessError
from netCDF4 import Dataset import netCDF4
from pyhdf.SD import SD, SDC from pyhdf.SD import SD, SDC
from flo.computation import Computation from flo.computation import Computation
...@@ -70,13 +70,12 @@ from glutil import ( ...@@ -70,13 +70,12 @@ from glutil import (
runscript, runscript,
get_viirs_l1_luts, get_viirs_l1_luts,
#prepare_env, #prepare_env,
#nc_gen,
nc_compress,
hdf_compress, hdf_compress,
reraise_as, reraise_as,
set_official_product_metadata, set_official_product_metadata,
FileNotFound FileNotFound
) )
from glutil.nc import (nc_gen, nc_compress, nc_remove_unlimited_dims, nc_setattrs, nc_getattrs)
from utils import create_dir from utils import create_dir
from detect_bad_fusion import SFX, detect from detect_bad_fusion import SFX, detect
...@@ -225,7 +224,7 @@ class FUSION_MATLAB(Computation): ...@@ -225,7 +224,7 @@ class FUSION_MATLAB(Computation):
input_name = sipsprod.satellite_esdt('V03MOD', satellite) input_name = sipsprod.satellite_esdt('V03MOD', satellite)
LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) LOG.debug("Ingesting input {} ({}) for FSNRAD_L2_VIIRS_CRIS version {}".format(input_name, version, product.version))
vgeom = dawg_catalog.files(satellite, input_name, interval, version=version) vgeom = dawg_catalog.files(satellite, input_name, interval, version=version)
if vgeom == []: if vgeom == []:
...@@ -256,7 +255,7 @@ class FUSION_MATLAB(Computation): ...@@ -256,7 +255,7 @@ class FUSION_MATLAB(Computation):
if viirs_l1: if viirs_l1:
version = viirs_l1.version version = viirs_l1.version
input_name = sipsprod.satellite_esdt('V02MOD', satellite) input_name = sipsprod.satellite_esdt('V02MOD', satellite)
LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) LOG.debug("Ingesting input {} ({}) for FSNRAD_L2_VIIRS_CRIS version {}".format(input_name, version, product.version))
vl1b = dawg_catalog.files(satellite, input_name, interval, version=version) vl1b = dawg_catalog.files(satellite, input_name, interval, version=version)
if vl1b == []: if vl1b == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format(
...@@ -266,7 +265,7 @@ class FUSION_MATLAB(Computation): ...@@ -266,7 +265,7 @@ class FUSION_MATLAB(Computation):
elif v02mod_bt_sc: elif v02mod_bt_sc:
version = v02mod_bt_sc.version version = v02mod_bt_sc.version
input_name = sipsprod.satellite_esdt('V02MOD-bt-sc', satellite) input_name = sipsprod.satellite_esdt('V02MOD-bt-sc', satellite)
LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) LOG.debug("Ingesting input {} ({}) for FSNRAD_L2_VIIRS_CRIS version {}".format(input_name, version, product.version))
from flo.sw.v02mod_bt_sc import v02mod_bt_sc from flo.sw.v02mod_bt_sc import v02mod_bt_sc
vl1b = v02mod_bt_sc().dataset('out').product( vl1b = v02mod_bt_sc().dataset('out').product(
{'granule': context['granule'], 'satellite': context['satellite'], {'granule': context['granule'], 'satellite': context['satellite'],
...@@ -298,8 +297,15 @@ class FUSION_MATLAB(Computation): ...@@ -298,8 +297,15 @@ class FUSION_MATLAB(Computation):
interval = TimeInterval(granule-granule_length, granule+granule_length) interval = TimeInterval(granule-granule_length, granule+granule_length)
version = cris_l1.version version = cris_l1.version
# SNPP CrIS hardware switch-over requires two different cris_l1 versions
if satellite == 'snpp':
dt_switch = datetime(2019, 6, 24)
if (granule - dt_switch).total_seconds() <= 0.:
version = '2.0.15'
input_name = 'CL1B' input_name = 'CL1B'
LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) LOG.debug("Ingesting input {} ({}) for FSNRAD_L2_VIIRS_CRIS version {}".format(input_name, version, product.version))
cris = dawg_catalog.files(satellite, input_name, interval, version=version) cris = dawg_catalog.files(satellite, input_name, interval, version=version)
if cris == []: if cris == []:
...@@ -320,11 +326,11 @@ class FUSION_MATLAB(Computation): ...@@ -320,11 +326,11 @@ class FUSION_MATLAB(Computation):
''' '''
Build up a set of inputs for a single context Build up a set of inputs for a single context
''' '''
LOG.debug("Ingesting inputs for V02FSN version {} ...".format(context['version'])) LOG.debug("Ingesting inputs for FSNRAD_L2_VIIRS_CRIS version {} ...".format(context['version']))
# Get the product definition for 'V02FSN'. Different versions may use regular or bias # Get the product definition for 'FSNRAD_L2_VIIRS_CRIS'. Different versions may use regular or bias
# corrected VIIRS level-1b files. # corrected VIIRS level-1b files.
product_name = 'V02FSN' product_name = 'FSNRAD_L2_VIIRS_CRIS'
product = sipsprod.lookup_product_recurse(product_name, version=context['version']) product = sipsprod.lookup_product_recurse(product_name, version=context['version'])
# Ingest the required inputs, defined in the VNP02 product definition for context['version'] # Ingest the required inputs, defined in the VNP02 product definition for context['version']
...@@ -335,7 +341,7 @@ class FUSION_MATLAB(Computation): ...@@ -335,7 +341,7 @@ class FUSION_MATLAB(Computation):
# Make the product definition available to build_task() # Make the product definition available to build_task()
task.option('product', product) task.option('product', product)
@reraise_as(WorkflowNotReady, FileNotFound, prefix='V02FSN') @reraise_as(WorkflowNotReady, FileNotFound, prefix='FSNRAD_L2_VIIRS_CRIS')
def build_task(self, context, task): def build_task(self, context, task):
satellite = context['satellite'] satellite = context['satellite']
...@@ -389,6 +395,7 @@ class FUSION_MATLAB(Computation): ...@@ -389,6 +395,7 @@ class FUSION_MATLAB(Computation):
if not dummy: if not dummy:
for cris, vgeo in zip(sounder, geo): for cris, vgeo in zip(sounder, geo):
cmd = '{} {} {} > /dev/null'.format(crisviirs_exe, cris, vgeo) cmd = '{} {} {} > /dev/null'.format(crisviirs_exe, cris, vgeo)
#cmd = '{} {} {} > {}.log'.format(crisviirs_exe, cris, vgeo, basename(cris)) # DEBUG
runscript(cmd, requirements=[]) runscript(cmd, requirements=[])
else: else:
...@@ -450,17 +457,18 @@ class FUSION_MATLAB(Computation): ...@@ -450,17 +457,18 @@ class FUSION_MATLAB(Computation):
dummy = kwargs['dummy'] dummy = kwargs['dummy']
if dummy: if dummy:
dummy_matlab_file = pjoin('/data/geoffc/fusion_matlab/work/local_processing', dummy_matlab_file = pjoin('/data/geoffc/fusion_matlab/work/local_processing',
'snpp_test_case/outputs/tmptkFpCs', 'snpp_fusion_output',
'fused_outputs/fusion_output.mat') 'fusion_output.mat')
rc_fusion = 0 rc_fusion = 0
# Get the matlab runtim version that we require # Get the matlab runtim version that we require
matlab_version = '2015b' #matlab_version = '2015b'
matlab_version = '2018b'
#matlab_version = product.input('fusion_matlab').options['matlab_version'] #matlab_version = product.input('fusion_matlab').options['matlab_version']
#run matlab #run matlab
cmd = '{}/{} {} {} {} {} {} {}'.format( cmd = '{}/{} {}/ {} {} {} {} {}'.format(
bin_dir, bin_dir,
fusion_binary, fusion_binary,
support_software.lookup('matlab', matlab_version).path, support_software.lookup('matlab', matlab_version).path,
...@@ -507,14 +515,14 @@ class FUSION_MATLAB(Computation): ...@@ -507,14 +515,14 @@ class FUSION_MATLAB(Computation):
return rc_fusion, matlab_file return rc_fusion, matlab_file
def convert_matlab_to_viirs_netcdf(self, product, viirs_l1b_file, cris_l1b_file, matlab_file, **kwargs):
def convert_matlab_to_netcdf(self, product, matlab_file, l1b_file, **kwargs):
''' '''
Transcode the Matlab *.mat file into a CF-compliant HDF4 (aqua) or NetCDF4 (snpp) file. Transcode the Matlab *.mat file into a CF-compliant NetCDF4 file.
''' '''
py_interp = kwargs['py_interp'] py_interp = kwargs['py_interp']
bin_dir = kwargs['bin_dir'] bin_dir = kwargs['bin_dir']
cdl_dir = kwargs['cdl_dir']
fused_output_dir = kwargs['fused_output_dir'] fused_output_dir = kwargs['fused_output_dir']
conversion_bin = kwargs['conversion_bin'] conversion_bin = kwargs['conversion_bin']
env = kwargs['env'] env = kwargs['env']
...@@ -523,14 +531,6 @@ class FUSION_MATLAB(Computation): ...@@ -523,14 +531,6 @@ class FUSION_MATLAB(Computation):
dummy = kwargs['dummy'] dummy = kwargs['dummy']
if dummy:
if satellite=='snpp':
dummy_fusion_file = '/mnt/sdata/geoffc/fusion_matlab/work/snpp_temp_outputs/outputs/tmp5PwThb/VNP02FSN.A2018033.1836.001.2018058173216.nc'
if satellite=='noaa20':
dummy_fusion_file = '/mnt/sdata/geoffc/fusion_matlab/work/snpp_temp_outputs/outputs/tmp5PwThb/VNP02FSN.A2018033.1836.001.2018058173216.nc'
if satellite=='aqua':
dummy_fusion_file = '/mnt/sdata/geoffc/fusion_matlab/work/aqua_temp_outputs/outputs/tmpMUoHF3/MYD02FSN.A2015107.1755.006.2018058170733.hdf'
rc_fusion = 0 rc_fusion = 0
dt_create = datetime.utcnow() dt_create = datetime.utcnow()
...@@ -539,98 +539,106 @@ class FUSION_MATLAB(Computation): ...@@ -539,98 +539,106 @@ class FUSION_MATLAB(Computation):
LOG.debug('tmp_work_dir (CWD): "{}"'.format(tmp_work_dir)) LOG.debug('tmp_work_dir (CWD): "{}"'.format(tmp_work_dir))
LOG.debug('fused_output_dir: "{}"'.format(fused_output_dir)) LOG.debug('fused_output_dir: "{}"'.format(fused_output_dir))
# Copy the un-fused level-1b file to the work directory as a template... # Construct the candidate filename for the new level-2 file
unfused_l1b_file = pjoin(tmp_work_dir, 'unfused', basename(l1b_file)) esdt = 'FSNRAD_L2_VIIRS_CRIS' + ('_SNPP' if satellite=='snpp' else '_NOAA20')
unfused_l1b_dir = dirname(unfused_l1b_file) collection = product.options['collection']
create_dir(unfused_l1b_dir) viirs_fused_l1b_file = sipsprod.product_filename(esdt, collection, granule, created=dt_create)
viirs_fused_l1b_file = pjoin(fused_output_dir, basename(viirs_fused_l1b_file))
if exists(unfused_l1b_file):
LOG.debug('{} exists, removing...'.format(unfused_l1b_file)) # Remove the output level-2 template file if it exists
os.remove(unfused_l1b_file) LOG.debug('Checking for existing fused output file "{}"'.format(viirs_fused_l1b_file))
if exists(viirs_fused_l1b_file):
LOG.debug('Copying {} to {}'.format(l1b_file, unfused_l1b_file)) LOG.debug('{} exists, removing...'.format(viirs_fused_l1b_file))
shutil.copy(l1b_file, unfused_l1b_file) os.remove(viirs_fused_l1b_file)
# Removing the fused file if it exists if not dummy:
fused_l1b_file = pjoin(fused_output_dir, basename(unfused_l1b_file)) # Create the new level2 template file and add a dimension from the level1 file
LOG.debug('Checking for existing fused output file "{}"'.format(fused_l1b_file)) cdl_file = pjoin(cdl_dir, 'fusion_cris_viirs.cdl')
if exists(fused_l1b_file): LOG.info('Creating template file {} from CDL file {}'.format(viirs_fused_l1b_file, cdl_file))
LOG.debug('{} exists, removing...'.format(fused_l1b_file))
os.remove(fused_l1b_file) try:
nc_gen(cdl_file, viirs_fused_l1b_file)
# Convert the Matlab file to the desired format... except CalledProcessError as err:
cmd = '{} {} {} {} {}'.format( rc = err.returncode
py_interp, LOG.error("ncgen returned a value of {}".format(rc))
conversion_bin, return rc, []
unfused_l1b_file,
matlab_file, # Open the l1b file and get the 'number_of_LUT_values' dimension
fused_output_dir nc_l1b = netCDF4.Dataset(viirs_l1b_file, 'r')
) dim = nc_l1b.dimensions['number_of_LUT_values']
try:
LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t'))) # Copy the 'number_of_LUT_values' dim to the level-2 template file
rc_fusion = 0 nc_l2 = netCDF4.Dataset(viirs_fused_l1b_file, 'r+')
if not dummy: nc_l2.createDimension(dim.name, size=dim.size)
# Close the level1 and level2 files
nc_l2.close()
nc_l1b.close()
# Copy the Matlab file data to the level-2 tamplate file...
cmd = '{} {} {} {} {} {}'.format(
py_interp,
conversion_bin,
viirs_l1b_file,
cris_l1b_file,
matlab_file,
viirs_fused_l1b_file)
try:
LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t')))
rc_fusion = 0
runscript(cmd, requirements=[], env=env) runscript(cmd, requirements=[], env=env)
else: except CalledProcessError as err:
LOG.debug('dummy cmd = "cp {} {}"'.format(dummy_fusion_file, rc_fusion = err.returncode
pjoin(fused_output_dir, basename(l1b_file)))) LOG.error("CF converter {} returned a value of {}".format(conversion_bin, rc_fusion))
shutil.copy(dummy_fusion_file, pjoin(fused_output_dir, basename(l1b_file))) return rc_fusion, None, {}
except CalledProcessError as err:
rc_fusion = err.returncode else:
LOG.error("CF converter {} returned a value of {}".format(conversion_bin, rc_fusion))
return rc_fusion, None, {} if satellite=='snpp':
dummy_fusion_file = '/data/geoffc/fusion_matlab/work/local_processing/snpp_fusion_output/FSNRAD_L2_VIIRS_CRIS_SNPP.A2018283.1206.001.2019218152028.nc'
if satellite=='noaa20':
dummy_fusion_file = '/data/geoffc/fusion_matlab/work/local_processing/snpp_fusion_output/VNP02FSN.A2018033.1836.001.2018058173216.nc'
LOG.debug('dummy cmd = "cp {} {}"'.format(dummy_fusion_file, fused_output_dir))
shutil.copy(dummy_fusion_file, fused_output_dir)
viirs_fused_l1b_file = pjoin(fused_output_dir, basename(dummy_fusion_file))
# Determine success... # Determine success...
LOG.debug('Looking for fused output file "{}"'.format(fused_l1b_file)) LOG.debug('Looking for fused output file "{}"'.format(viirs_fused_l1b_file))
fused_l1b_file = glob(fused_l1b_file) viirs_fused_l1b_file = glob(viirs_fused_l1b_file)
if len(fused_l1b_file) != 0: if len(viirs_fused_l1b_file) != 0:
fused_l1b_file = fused_l1b_file[0] viirs_fused_l1b_file = viirs_fused_l1b_file[0]
LOG.debug('Found final fused output file "{}"'.format(fused_l1b_file)) LOG.debug('Found final fused output file "{}"'.format(viirs_fused_l1b_file))
else: else:
LOG.error('There is no fused file {}, aborting'.format(fused_l1b_file)) LOG.error('There is no fused file {}, aborting'.format(viirs_fused_l1b_file))
rc_fusion = 1 rc_fusion = 1
return rc_fusion, None, {} return rc_fusion, None, {}
# Remove the unfused dir...
#LOG.debug('Removing the unfused level-1b dir {} ...'.format(unfused_l1b_dir))
#shutil.rmtree(unfused_l1b_dir)
# Determine the name of the output fused file # Move the NetCDF4 file to its new filename
if satellite=='snpp' or satellite=='noaa20': LOG.debug('Moving "{}" to "{}" ...'.format(viirs_fused_l1b_file, tmp_work_dir))
esdt = sipsprod.satellite_esdt('V02FSN', satellite) shutil.move(viirs_fused_l1b_file, tmp_work_dir)
product.options['collection'] = int(basename(l1b_file).split('.')[3]) viirs_fused_l1b_file = glob(pjoin(tmp_work_dir, basename(viirs_fused_l1b_file)))[0]
fused_l1b_file_new = sipsprod.product_filename(esdt, product.options['collection'], LOG.debug('Final fused output file "{}"'.format(viirs_fused_l1b_file))
granule, dt_create)
if satellite=='aqua':
esdt = sipsprod.satellite_esdt('M02FSN', satellite)
product.options['collection'] = int(basename(l1b_file).split('.')[3])
fused_l1b_file_new = sipsprod.product_filename(esdt, product.options['collection'],
granule, dt_create)
fused_l1b_file_new = fused_l1b_file_new.replace('.nc', '.hdf')
# Move the HDF4/NetCDF4 file to its new filename
LOG.debug('Moving "{}" to "{}" ...'.format(fused_l1b_file, pjoin(tmp_work_dir, fused_l1b_file_new)))
shutil.move(fused_l1b_file, pjoin(tmp_work_dir, fused_l1b_file_new))
fused_l1b_file = glob(pjoin(tmp_work_dir, fused_l1b_file_new))[0]
# Move the matlab file to its new filename # Move the matlab file to its new filename
if satellite=='snpp' or satellite=='noaa20': matlab_file_new = viirs_fused_l1b_file.replace('.nc','.mat')
matlab_file_new = basename(fused_l1b_file_new).replace('.nc','.mat')
if satellite=='aqua':
matlab_file_new = basename(fused_l1b_file_new).replace('.hdf','.mat')
LOG.debug('Moving "{}" to {}...'.format(matlab_file, pjoin(fused_output_dir, matlab_file_new))) LOG.debug('Moving "{}" to {}...'.format(matlab_file, matlab_file_new))
shutil.move(matlab_file, pjoin(fused_output_dir, matlab_file_new)) shutil.move(matlab_file, matlab_file_new)
matlab_file = glob(pjoin(fused_output_dir, matlab_file_new))[0] matlab_file = glob(matlab_file_new)[0]
LOG.debug('Final fused output matlab file "{}"'.format(matlab_file))
# Remove the fused_outputs directory # Remove the fused_outputs directory
#LOG.debug('Removing the fused_outputs dir {} ...'.format(fused_output_dir)) LOG.debug('Removing the fused_outputs dir {} ...'.format(fused_output_dir))
#shutil.rmtree(fused_output_dir) shutil.rmtree(fused_output_dir)
output_attrs = {'esdt': esdt, 'collection': product.options['collection'], output_attrs = {'esdt': esdt, 'collection': product.options['collection'],
'created': dt_create} 'created': dt_create}
return rc_fusion, fused_l1b_file, output_attrs return rc_fusion, viirs_fused_l1b_file, output_attrs
def output_QC(self, l1b_file, fused_l1b_file, band=None, input_rms=0.2, **kwargs): def output_QC(self, l1b_file, fused_l1b_file, band=None, input_rms=0.2, **kwargs):
...@@ -656,59 +664,95 @@ class FUSION_MATLAB(Computation): ...@@ -656,59 +664,95 @@ class FUSION_MATLAB(Computation):
return 0 if passfail else 1 return 0 if passfail else 1
def set_l2_metadata(self, l2_file, viirs_l1b_file, viirs_geo_file, cris_l1b_file, product, context):
'''
Construct various metadata strings and write them to the level2 file
'''
# def update_global_attrs(self, netcdf_file, readme_file, **kwargs): #context = {}
#context['satellite'] = satellite
# satellite = kwargs['satellite'] context['nrt'] = True
#context['collection'] = 0
# # Get the git repo information
# repo_attrs = [] #product = sipsprod.lookup_product_recurse('FSNRAD_L2_VIIRS_CRIS', version=version)
# try: #product.options['collection'] = context['collection']
# LOG.debug('Opening {}...'.format(readme_file))
# readme_obj = open(readme_file, 'ro') satname = 'SNPP' if context['satellite']=='snpp' else 'NOAA20'
# line_obj = readme_obj.readlines() esdt = product.name + ('_SNPP' if context['satellite']=='snpp' else '_NOAA20')
# for idx, line in enumerate(line_obj): dt_create = datetime.strptime(splitext(basename(l2_file))[0].split('.')[-1], '%Y%j%H%M%S')
# if '.git' in line: print("Creation date is {}".format(dt_create))
# repo_line = line.lstrip(' -*,*').rstrip(' -*,;')
# commit_line = line_obj[idx+1].lstrip(' -*,;').rstrip(' -*,;') viirs_input_fns, viirs_lut_version, viirs_lut_created = get_viirs_l1_luts(viirs_l1b_file, geo_fn=viirs_geo_file)
# git_line = '{}; {}'.format(repo_line, commit_line).replace('\n', '') ancillary_fns = []
# LOG.debug('{}'.format(git_line)) viirs_l1_version = product.input('viirs_l1').version
# repo_attrs.append(git_line) cris_l1_version = product.input('cris_l1').version
# except Exception:
# LOG.debug(traceback.format_exc()) set_official_product_metadata(
esdt,
# readme_obj.close() product.version,
product.options['collection'],
# # Update the various file global attributes product.input('fusion_matlab').version,
# LOG.debug('Adding attributes to {} ...'.format(netcdf_file)) context['satellite'],
# if splitext(netcdf_file)[-1] == '.nc': l2_file,
# args = (netcdf_file, "a") viirs_geo_file,
# kwargs = {'format': "NETCDF4"} viirs_input_fns,
# file_open = Dataset ancillary_fns,
# if splitext(netcdf_file)[-1] == '.hdf': viirs_l1_version,
# args = (netcdf_file, SDC.WRITE) viirs_lut_version,
# kwargs = {} viirs_lut_created,
# file_open = SD product.inputstr(),
dt_create,
# try: context['nrt'])
# file_obj = file_open(*args, **kwargs) # Remove any troublesome global attributes
try:
# # Update the attributes, moving to the end nc_l2 = netCDF4.Dataset(l2_file, 'a')
# for idx, attr in enumerate(repo_attrs): xmlmetadata = nc_l2.getncattr('xmlmetadata')
# LOG.debug('{}'.format(attr)) nc_l2.delncattr('xmlmetadata')
# setattr(file_obj, 'source_git_repo_{}'.format(idx), attr) nc_l2.delncattr('l1_version')
nc_l2.delncattr('l1_lut_version')
# except Exception: nc_l2.delncattr('l1_lut_created')
# LOG.warning("\tProblem setting attributes in output file {}".format(netcdf_file)) nc_l2.delncattr('input_files')
# LOG.debug(traceback.format_exc()) except:
pass
nc_l2.close()
nc_remove_unlimited_dims(basename(l2_file))
FIX_ATTRS = [
('title', '{0:} VIIRS+CrIS Fusion ({1:})'.format(satname, esdt)),
('platform', {'snpp':'Suomi NPP', 'noaa20':'NOAA-20'}[context['satellite']]),
('instrument', 'VIIRS+CrIS'),
('conventions', 'CF-1.6, ACDD-1.3'),
('AlgorithmType', 'OPS'),
('long_name', '{} VIIRS+CrIS Fusion 6-Min L2 Swath 750m'.format(satname)),
('project', 'NASA Atmosphere Discipline'),
('creator_name', 'NASA Atmosphere SIPS'),
('processing_version', product.input('fusion_matlab').version),
('viirs_l1_version', viirs_l1_version),
('viirs_lut_version', viirs_lut_version),
('viirs_lut_created', str(viirs_lut_created.isoformat())),
('cris_l1_version', cris_l1_version),
('input_files', ', '.join([basename(x) for x in [viirs_geo_file, viirs_l1b_file, cris_l1b_file]])),
('xmlmetadata', xmlmetadata),
]
nc_setattrs(l2_file, FIX_ATTRS)
extra_attrs = {'esdt': esdt,
'collection': product.options['collection'],
'ecstype': 'SCIENCE',
'viirs_lut_version': viirs_lut_version,
'viirs_lut_created': viirs_lut_created,
'created': dt_create}
# if netcdf_file.split('.')[-1] == 'nc':
# file_obj.close()
# elif netcdf_file.split('.')[-1] == 'hdf':
# file_obj.end()
# return return {
'fused_l1b': {
'file': basename(l2_file),
'extra_attrs': extra_attrs,
}
}
def prepare_env(self, dist_root, inputs, context): def prepare_env(self, dist_root, inputs, context):
LOG.debug("Running prepare_env()...") LOG.debug("Running prepare_env()...")
...@@ -732,7 +776,7 @@ class FUSION_MATLAB(Computation): ...@@ -732,7 +776,7 @@ class FUSION_MATLAB(Computation):
return env return env
@reraise_as(WorkflowNotReady, FileNotFound, prefix='V02FSN') @reraise_as(WorkflowNotReady, FileNotFound, prefix='FSNRAD_L2_VIIRS_CRIS')
def run_task(self, inputs, context): def run_task(self, inputs, context):
LOG.debug("Running run_task()...") LOG.debug("Running run_task()...")
...@@ -770,7 +814,7 @@ class FUSION_MATLAB(Computation): ...@@ -770,7 +814,7 @@ class FUSION_MATLAB(Computation):
LOG.debug("Inputs dir = {}".format(inputs_dir)) LOG.debug("Inputs dir = {}".format(inputs_dir))
# Are we doing a dummy run? # Are we doing a dummy run?
# dummy = True #dummy = True
dummy = False dummy = False
geo_keys = sorted([key for key in inputs.keys() if 'geo' in key]) geo_keys = sorted([key for key in inputs.keys() if 'geo' in key])
...@@ -817,6 +861,7 @@ class FUSION_MATLAB(Computation): ...@@ -817,6 +861,7 @@ class FUSION_MATLAB(Computation):
LOG.debug("collo = \n\t{}".format('\n\t'.join(collo))) LOG.debug("collo = \n\t{}".format('\n\t'.join(collo)))
bin_dir = pjoin(dist_root, 'bin') bin_dir = pjoin(dist_root, 'bin')
cdl_dir = pjoin(dist_root, 'cdl')
anc_dir = pjoin(dist_root, 'luts') anc_dir = pjoin(dist_root, 'luts')
fused_output_dir = pjoin(work_dir, 'fused_outputs') fused_output_dir = pjoin(work_dir, 'fused_outputs')
...@@ -824,6 +869,7 @@ class FUSION_MATLAB(Computation): ...@@ -824,6 +869,7 @@ class FUSION_MATLAB(Computation):
kwargs = {} kwargs = {}
kwargs['py_interp'] = py_interp kwargs['py_interp'] = py_interp
kwargs['bin_dir'] = bin_dir kwargs['bin_dir'] = bin_dir
kwargs['cdl_dir'] = cdl_dir
kwargs['env'] = env kwargs['env'] = env
kwargs['fused_output_dir'] = fused_output_dir kwargs['fused_output_dir'] = fused_output_dir
kwargs['satellite'] = satellite kwargs['satellite'] = satellite
...@@ -833,6 +879,7 @@ class FUSION_MATLAB(Computation): ...@@ -833,6 +879,7 @@ class FUSION_MATLAB(Computation):
if satellite=='snpp' or satellite=='noaa20': if satellite=='snpp' or satellite=='noaa20':
geo_file = geo[1] geo_file = geo[1]
l1b_file = l1b[1] l1b_file = l1b[1]
sounder_file = sounder[1]
kwargs['anc_paths'] = [pjoin(anc_dir, 'modis_aqua.srf.nc'), kwargs['anc_paths'] = [pjoin(anc_dir, 'modis_aqua.srf.nc'),
pjoin(anc_dir, 'NG_VIIRS_NPP_RSR_filtered_Oct2011_BA/')] pjoin(anc_dir, 'NG_VIIRS_NPP_RSR_filtered_Oct2011_BA/')]
kwargs['fusion_binary'] = 'run_imagersounderfusion_V.sh' kwargs['fusion_binary'] = 'run_imagersounderfusion_V.sh'
...@@ -841,6 +888,7 @@ class FUSION_MATLAB(Computation): ...@@ -841,6 +888,7 @@ class FUSION_MATLAB(Computation):
elif satellite=='aqua': elif satellite=='aqua':
geo_file = geo[0] geo_file = geo[0]
l1b_file = l1b[0] l1b_file = l1b[0]
sounder_file = sounder[0]
kwargs['anc_paths'] = [pjoin(anc_dir, 'L2.chan_prop.2005.03.01.v9.5.1.txt'), kwargs['anc_paths'] = [pjoin(anc_dir, 'L2.chan_prop.2005.03.01.v9.5.1.txt'),
pjoin(anc_dir, 'modis_aqua.srf.nc'), pjoin(anc_dir, 'modis_aqua.srf.nc'),
pjoin(anc_dir, 'modis_conv_error_2005.mat')] pjoin(anc_dir, 'modis_conv_error_2005.mat')]
...@@ -866,12 +914,22 @@ class FUSION_MATLAB(Computation): ...@@ -866,12 +914,22 @@ class FUSION_MATLAB(Computation):
if matlab_file is None: if matlab_file is None:
raise RuntimeError('Output fusion file fusion_output.mat not created.') raise RuntimeError('Output fusion file fusion_output.mat not created.')
#kwargs['dummy'] = True # dummy
# Now that we've computed the Matlab file, convert to a NetCDF file... # Now that we've computed the Matlab file, convert to a NetCDF file...
rc_fusion, fused_l1b_file, output_attrs = self.convert_matlab_to_netcdf( if satellite=='snpp' or satellite=='noaa20':
product, rc_fusion, fused_l1b_file, output_attrs = self.convert_matlab_to_viirs_netcdf(
matlab_file, product,
l1b_file, l1b_file,
**kwargs) sounder_file,
matlab_file,
**kwargs)
#elif satellite=='aqua':
#rc_fusion, fused_l1b_file, output_attrs = self.convert_matlab_to_netcdf(
#product,
#matlab_file,
#l1b_file,
#**kwargs)
LOG.debug('convert_matlab_to_netcdf() return value: {}'.format(rc_fusion)) LOG.debug('convert_matlab_to_netcdf() return value: {}'.format(rc_fusion))
LOG.info('convert_matlab_to_netcdf() generated {}'.format(fused_l1b_file)) LOG.info('convert_matlab_to_netcdf() generated {}'.format(fused_l1b_file))
...@@ -879,49 +937,24 @@ class FUSION_MATLAB(Computation): ...@@ -879,49 +937,24 @@ class FUSION_MATLAB(Computation):
if fused_l1b_file is None: if fused_l1b_file is None:
raise RuntimeError('NetCDF output fusion file was not created.') raise RuntimeError('NetCDF output fusion file was not created.')
# Update some global attributes in the output file
#readme_file = pjoin(delivery.path, 'README.txt')
#self.update_global_attrs(basename(fused_l1b_file), readme_file, **kwargs)
# Run a QC check on the output file
#rc_qc = self.output_QC(l1b, fused_l1b_file, **kwargs)
#LOG.debug('output_QC() return value: {}'.format(rc_qc))
#if rc_qc != 0:
#raise RuntimeError('Output fusion file {} failed RMS error QC check, output aborted.'.format(fused_l1b_file))
# The staging routine assumes that the output file is located in the work directory # The staging routine assumes that the output file is located in the work directory
# "tmp******", and that the output path is to be prepended, so return the basename. # "tmp******", and that the output path is to be prepended, so return the basename.
out_fn = basename(fused_l1b_file) out_fn = basename(fused_l1b_file)
# Set metadata to be put in the output file. # Update the output file metadata to conform to LAADS requirements.
if satellite=='snpp' or satellite=='noaa20': out_dict = self.set_l2_metadata(fused_l1b_file, l1b_file, geo_file, sounder_file, product, context)
viirs_l1 = product.input('viirs_l1')
v02mod_bt_sc = product.input('V02MOD-bt-sc')
if viirs_l1: if satellite=='snpp' or satellite=='noaa20':
l1_version = product.input('viirs_l1').version
elif v02mod_bt_sc:
l1_version = v02mod_bt_sc.input('viirs_l1').version
input_fns, lut_version, lut_created = get_viirs_l1_luts(l1b_file, geo_fn=geo_file)
ancillary_fns = []
out_compress = nc_compress out_compress = nc_compress
set_official_product_metadata(
output_attrs['esdt'], product.version, output_attrs['collection'],
product.input('fusion_matlab').version, context['satellite'],
fused_l1b_file, geo_file,
input_fns, ancillary_fns,
l1_version, lut_version, lut_created,
product.inputstr(), output_attrs['created'])
if satellite == 'aqua': if satellite == 'aqua':
out_compress = hdf_compress out_compress = hdf_compress
LOG.debug('We are in {}'.format(os.getcwd())) LOG.debug('We are in {}'.format(os.getcwd()))
LOG.debug('Compressing {}'.format(out_fn))
return {'fused_l1b': out_compress(out_fn)} #LOG.debug('Compressing {}'.format(out_fn))
#return {'fused_l1b': out_compress(out_fn)}
return out_dict
class FUSION_MATLAB_QL(Computation): class FUSION_MATLAB_QL(Computation):
...@@ -973,8 +1006,8 @@ class FUSION_MATLAB_QL(Computation): ...@@ -973,8 +1006,8 @@ class FUSION_MATLAB_QL(Computation):
def _add_cris_viirs_fusion_l1b_input(self, product, context, task): def _add_cris_viirs_fusion_l1b_input(self, product, context, task):
satellite = context['satellite'] satellite = context['satellite']
granule = context['granule'] granule = context['granule']
version = product.input('V02FSN').version version = product.input('FSNRAD_L2_VIIRS_CRIS').version
input_name = sipsprod.satellite_esdt('V02FSN', satellite) input_name = sipsprod.satellite_esdt('FSNRAD_L2_VIIRS_CRIS', satellite)
interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1)) interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version)) LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version))
vl1b = dawg_catalog.files(satellite, input_name, interval, version=version) vl1b = dawg_catalog.files(satellite, input_name, interval, version=version)
...@@ -987,7 +1020,7 @@ class FUSION_MATLAB_QL(Computation): ...@@ -987,7 +1020,7 @@ class FUSION_MATLAB_QL(Computation):
raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format( raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format(
input_name, version, interval)) input_name, version, interval))
for idx, l1b_file in enumerate(vl1b): for idx, l1b_file in enumerate(vl1b):
LOG.debug('V02FSN granule {}: {} -> {}'.format(idx, l1b_file.begin_time, l1b_file.end_time)) LOG.debug('FSNRAD_L2_VIIRS_CRIS granule {}: {} -> {}'.format(idx, l1b_file.begin_time, l1b_file.end_time))
task.input('l1b_{}'.format(idx), l1b_file) task.input('l1b_{}'.format(idx), l1b_file)
def build_task_snpp(self, context, task): def build_task_snpp(self, context, task):
...@@ -1096,7 +1129,7 @@ class FUSION_MATLAB_QL(Computation): ...@@ -1096,7 +1129,7 @@ class FUSION_MATLAB_QL(Computation):
filename_chunks = splitext(orig_fusion_ql_file)[0].split('_') filename_chunks = splitext(orig_fusion_ql_file)[0].split('_')
fusion_ql_files.append( fusion_ql_files.append(
'{}.A{}.{}.{}.png'.format( '{}.A{}.{}.{}.png'.format(
sipsprod.satellite_esdt('V02FSN', satellite), sipsprod.satellite_esdt('FSNRAD_L2_VIIRS_CRIS', satellite),
granule.strftime('%Y%j'), granule.strftime('%Y%j'),
filename_chunks[1], filename_chunks[1],
filename_chunks[3] filename_chunks[3]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment