diff --git a/source/flo/__init__.py b/source/flo/__init__.py
index d623518f1944c872362c9d3c5521c062ddf1bd98..a2e66590b4d2779e391fbcfdcde2f8a8911d0d2e 100644
--- a/source/flo/__init__.py
+++ b/source/flo/__init__.py
@@ -138,7 +138,6 @@ class FUSION_MATLAB(Computation):
version = 'c6'
input_name = 'MOD03'
- LOG.debug("Ingesting input {} ({})...".format(input_name, version))
mgeo = dawg_catalog.files(satellite, input_name, interval, version=version)
if mgeo == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format(
@@ -159,7 +158,6 @@ class FUSION_MATLAB(Computation):
version = 'c6'
input_name = 'MOD021KM'
- LOG.debug("Ingesting input {} ({})...".format(input_name, version))
myd021km = dawg_catalog.files(satellite, input_name, interval, version=version)
if myd021km == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format(
@@ -195,7 +193,6 @@ class FUSION_MATLAB(Computation):
input_name = 'AIRIBRAD'
- LOG.debug("Ingesting input {}...".format(input_name))
airs = dawg_catalog.files(satellite, 'AIRIBRAD', airs_interval)
if airs == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for interval {}'.format(
@@ -209,7 +206,6 @@ class FUSION_MATLAB(Computation):
'''
Build up a set of inputs for a single context
'''
- LOG.debug("Ingesting inputs for M02FSN version {} ...".format(context['version']))
# Get the product definition for 'V02FSN'
product = sipsprod.lookup_product_recurse('V02FSN', version=context['version'])
@@ -323,6 +319,8 @@ class FUSION_MATLAB(Computation):
if satellite == 'snpp':
+ # Start of CrIS full spectral resolution for SNPP
+ snpp_fsr_start = datetime(2015, 12, 1, 0)
# Change to side 2 completed June 24, 2019 at 1848 UTC
snpp_side_2_start = datetime(2019, 6, 24, 18, 48)
# Change back to side 1 completed July 14, 2021 at 1749 UTC
@@ -331,17 +329,27 @@ class FUSION_MATLAB(Computation):
# Product version 1.0.0
if product_major_version == 1 and product_minor_version == 0:
- snpp_fsr_start = datetime(2015, 12, 1, 0)
-
while True:
- # Use CL1B_nsr (2.0.15) before snpp_fsr_start
- if (granule - snpp_fsr_start).total_seconds() <= 0.:
+ # Use CL1B_nsr (2.0.15) before snpp_fsr_start...
+ if (granule < snpp_fsr_start):
input_name = 'CL1B_nsr'
version = '2.0.15'
break
- # Use CL1B (2.0.15) before snpp_side_2_start
- if (granule - snpp_side_2_start).total_seconds() <= 0.:
+ # Use CL1B (2.0.15) for initial FSR side 1 run...
+ if (granule >= snpp_fsr_start) and (granule < snpp_side_2_start):
+ input_name = 'CL1B'
+ version = '2.0.15'
+ break
+
+ # Use CL1B (2.0.16) for initial FSR side 2 run...
+ if (granule >= snpp_side_2_start) and (granule < snpp_side_1_restart):
+ input_name = 'CL1B'
+ version = '2.0.16'
+ break
+
+ # Use CL1B (2.0.15) for second FSR side 1 run...
+ if (granule >= snpp_side_1_restart):
input_name = 'CL1B'
version = '2.0.15'
break
@@ -355,18 +363,23 @@ class FUSION_MATLAB(Computation):
snpp_fsr_start = datetime(2015, 11, 3)
- snpp_l1b_side_2_versions = {
- '2.0.15': '2.0.16',
- '2.1.3_pol_corr': '2.1.3_pol_corr_snpp_side2',
- '2.1.3': '2.1.3_pol_corr_snpp_side2'}
+ # Use CL1B_nsr (2.0.15) before snpp_fsr_start...
+ if (granule < snpp_fsr_start):
+ input_name = 'CL1B_nsr'
+ version = '2.0.15'
+
+ # snpp_l1b_side_2_versions = {
+ # '2.0.15': '2.0.16',
+ # '2.1.3_pol_corr': '2.1.3_pol_corr_snpp_side2',
+ # '2.1.3': '2.1.3_pol_corr_snpp_side2'}
- use_fsr = (granule >= snpp_fsr_start)
- input_name = 'CL1B' if use_fsr else 'CL1B_nsr'
+ # use_fsr = (granule >= snpp_fsr_start)
+ # input_name = 'CL1B' if use_fsr else 'CL1B_nsr'
- if granule >= snpp_side_2_start:
- cris_major_version = int(version.split('.')[0])
- if cris_major_version < 3:
- version = snpp_l1b_side_2_versions[version]
+ # if granule >= snpp_side_2_start:
+ # cris_major_version = int(version.split('.')[0])
+ # if cris_major_version < 3:
+ # version = snpp_l1b_side_2_versions[version]
else:
pass
@@ -467,9 +480,21 @@ class FUSION_MATLAB(Computation):
LOG.debug("geo = \n\t{}".format('\n\t'.join(geo)))
LOG.debug("sounder = \n\t{}".format('\n\t'.join(sounder)))
- version = product.input('julia_cris_viirs').version
- crisviirs = support_software.lookup('julia_cris_viirs', version=version)
- crisviirs_exe = pjoin(crisviirs.path, 'bin/cris_viirs')
+ product_major_version = int(product.version.split('.')[0])
+ product_minor_version = int(product.version.split('.')[1])
+ LOG.debug(f"Product major version {product_major_version}")
+ LOG.debug(f"Product minor version {product_minor_version}")
+
+ if 'collopak' in product.inputstr():
+ version = product.input('collopak').version
+ crisviirs = support_software.lookup('collopak', version=version)
+ crisviirs_exe = pjoin(crisviirs.path, 'bin/crisviirs')
+ elif 'julia_cris_viirs' in product.inputstr():
+ version = product.input('julia_cris_viirs').version
+ crisviirs = support_software.lookup('julia_cris_viirs', version=version)
+ crisviirs_exe = pjoin(crisviirs.path, 'bin/cris_viirs')
+ else:
+ raise ValueError(f"Correct VIIRS/CrIS collocation package cannot be found in {product.inputstr()}")
LOG.debug(f"Current directory (cris_viirs_collocation) is {abspath(curdir)}")
dummy_collo_file = sorted(glob(pjoin('../..', 'test_data', 'outputs', 'work',
@@ -1302,275 +1327,3 @@ class FUSION_subset(Computation):
'extra_attrs': extra_attrs,
}
}
-
-class FUSION_MATLAB_QL(Computation):
-
- parameters = ['granule', 'satellite', 'version']
-
- outputs = ['fused_l1b_ql_band27_asc', 'fused_l1b_ql_band27_desc', 'fused_l1b_ql_band33_asc', 'fused_l1b_ql_band33_desc']
-
- def find_contexts(self, time_interval, satellite, version):
- '''
- Takes the input time interval, and returns whatever 0Z datetimes fall within the interval.
- '''
-
- LOG.debug('interval = {}'.format(time_interval))
- LOG.debug('satellite = {}'.format(satellite))
- LOG.debug('version = {}'.format(version))
-
- days = [day.left for day in time_interval.overlapping_interval_series(timedelta(days=1))]
-
- return [{'satellite': satellite, 'version': version, 'granule': day} for day in days]
-
- def _add_viirs_l1b_geo_input(self, product, context, task):
- satellite = context['satellite']
- granule = context['granule']
- version = product.input('viirs_l1').version
- input_name = sipsprod.satellite_esdt('V03MOD', satellite)
- interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
- # interval = TimeInterval(granule, granule+timedelta(days=0.01)-timedelta(seconds=1)) # DEBUG
- LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version))
- vgeom = dawg_catalog.files(satellite, input_name, interval, version=version)
- if vgeom == []:
- raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format(
- input_name, version, interval))
- elapsed_time = ((datetime.utcnow()-context['granule']).total_seconds())/86400.
- LOG.info("After {:4.2f} days we have {} {} files ({}).".format(elapsed_time, len(vgeom), input_name, version))
- if len(vgeom) < 228 and elapsed_time < 2.:
- raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format(
- input_name, version, interval))
- for idx, geo_file in enumerate(vgeom):
- LOG.debug('V03MOD granule {}: {} -> {}'.format(idx, geo_file.begin_time, geo_file.end_time))
- task.input('geo_{}'.format(idx), geo_file)
-
- def _add_cris_viirs_fusion_l1b_input(self, product, context, task):
- satellite = context['satellite']
- granule = context['granule']
- version = '.'.join(product.input('FSNRAD_L2_VIIRS_CRIS').version.split('.')[:-1])
- esdt = 'FSNRAD_L2_VIIRS_CRIS' + ('_SNPP' if satellite == 'snpp' else '_NOAA20')
- interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
- # interval = TimeInterval(granule, granule+timedelta(days=0.01)-timedelta(seconds=1)) # DEBUG
- LOG.debug("Ingesting input {} ({}) for FSNRAD_L2_VIIRS_CRIS_DailyQL version {}".format(esdt, version, product.version))
- vl1b = dawg_catalog.files(satellite, esdt, interval, version=version)
- if vl1b == []:
- raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format(
- esdt, version, interval))
- elapsed_time = ((datetime.utcnow()-context['granule']).total_seconds())/86400.
- LOG.info("After {:4.2f} days we have {} {} files ({}).".format(elapsed_time, len(vl1b), esdt, version))
- if len(vl1b) < 228 and elapsed_time < 2.:
- raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format(
- esdt, version, interval))
- for idx, l1b_file in enumerate(vl1b):
- LOG.debug('FSNRAD_L2_VIIRS_CRIS granule {}: {} -> {}'.format(idx, l1b_file.begin_time, l1b_file.end_time))
- task.input('l1b_{}'.format(idx), l1b_file)
-
- def build_task_snpp(self, context, task):
- '''
- Build up a set of inputs for a single context
- '''
- LOG.debug("Ingesting inputs for V02FSN_DailyQL version {} ...".format(context['version']))
-
- # Get the product definition for 'FSNRAD_L2_VIIRS_CRIS_DailyQL'. Different versions may use regular or bias
- # corrected VIIRS level-1b files.
- product_name = 'FSNRAD_L2_VIIRS_CRIS_DailyQL'
- product = sipsprod.lookup_product_recurse(product_name, version=context['version'])
-
- # Ingest the required inputs, defined in the VNP02 product definition for context['version']
- self._add_viirs_l1b_geo_input(product, context, task)
- self._add_cris_viirs_fusion_l1b_input(product, context, task)
-
- # Make the product definition available to build_task()
- task.option('product', product)
-
- @reraise_as(WorkflowNotReady, FileNotFound, prefix='FUSION_MATLAB_QL')
- def build_task(self, context, task):
-
- satellite = context['satellite']
-
- if satellite == 'snpp' or satellite == 'noaa20':
- self.build_task_snpp(context, task)
- elif satellite == 'aqua':
- raise ValueError('Satellite option "{}" not yet implemented.'.format(satellite))
- else:
- raise ValueError('Invalid satellite: {}'.format(satellite))
-
- def prepare_env(self, dist_root, inputs, context):
- LOG.debug("Running prepare_env()...")
-
- LOG.debug("package_root = {}".format(self.package_root))
- LOG.debug("dist_root = {}".format(dist_root))
-
- env = dict(os.environ)
- envroot = pjoin(dist_root, 'env')
-
- LOG.debug("envroot = {}".format(envroot))
-
- env['LD_LIBRARY_PATH'] = ':'.join([pjoin(envroot, 'lib'),
- pjoin(dist_root, 'lib')])
- env['PATH'] = ':'.join([pjoin(envroot, 'bin'),
- pjoin(dist_root, 'bin'),
- '/usr/bin:/bin'])
-
- LOG.debug("env['PATH'] = \n\t{}".format(env['PATH'].replace(':', '\n\t')))
- LOG.debug("env['LD_LIBRARY_PATH'] = \n\t{}".format(env['LD_LIBRARY_PATH'].replace(':', '\n\t')))
-
- return env
-
- def run_fusion_quicklooks(self, fsn_dir, geo_dir, **kwargs):
-
- bin_dir = kwargs['bin_dir']
- fusion_ql_binary = kwargs['fusion_ql_binary']
- granule = kwargs['granule']
- satellite = kwargs['satellite']
- env = kwargs['env']
-
- dt = granule
- year = dt.utctimetuple().tm_year
- jday = dt.utctimetuple().tm_yday
-
- rc_fusion_ql = 0
-
- # Get the matlab runtim version that we require
- # matlab_version = '2015b'
- matlab_version = '2018b'
- # matlab_version = product.input('fusion_matlab').options['matlab_version']
-
- # Run Matlab
- # cmd = '{}/{} {} {} {} {}/ {}/'.format(
- cmd = '{}/{} {} {} {} {}/ {}/ >> fusion_quicklooks.log'.format(
- bin_dir,
- fusion_ql_binary,
- support_software.lookup('matlab', matlab_version).path,
- year,
- jday,
- fsn_dir,
- geo_dir
- )
-
- # Run the Matlab Fusion Quicklook code
- try:
- LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ', ' \\\n\t')))
- rc_fusion_ql = 0
- runscript(cmd, requirements=[], env=env)
- except CalledProcessError as err:
- rc_fusion_ql = err.returncode
- LOG.error("Matlab binary {} returned a value of {}".format(fusion_ql_binary, rc_fusion_ql))
- raise FusionProcessFailed('Matlab binary {} failed or was killed, returning a value of {}'.format(fusion_ql_binary, rc_fusion_ql))
-
- # Move matlab file to the output directory
- orig_fusion_ql_files = glob('*.png')
- if len(orig_fusion_ql_files) != 0:
- LOG.info('Found Fusion quicklook files {}.'.format(
- ', '.join([basename(x) for x in orig_fusion_ql_files])
- ))
- else:
- LOG.error('There are no Fusion quicklook files "*.png", aborting')
- raise FusionFailedToProducePng('Matlab QL binary {} failed to produce any PNG files.'.format(fusion_ql_binary))
-
- fusion_ql_files = []
- for orig_fusion_ql_file in orig_fusion_ql_files:
- filename_chunks = splitext(orig_fusion_ql_file)[0].split('_')
- fusion_ql_files.append(
- '{}.A{}.{}.{}.png'.format(
- sipsprod.satellite_esdt('FSNRAD_L2_VIIRS_CRIS', satellite),
- granule.strftime('%Y%j'),
- filename_chunks[1],
- filename_chunks[3]
- )
- )
-
- for oldfile, newfile in zip(orig_fusion_ql_files, fusion_ql_files):
- shutil.move(oldfile, newfile)
-
- return rc_fusion_ql, fusion_ql_files
-
- @reraise_as(WorkflowNotReady, FileNotFound, prefix='FUSION_MATLAB_QL')
- def run_task(self, inputs, context):
-
- LOG.debug("Running run_task()...")
-
- for key in context.keys():
- LOG.debug("run_task() context['{}'] = {}".format(key, context[key]))
-
- granule = context['granule']
- satellite = context['satellite']
- # version = context['version']
-
- # Get the location of the binary package
- product = context['product']
- delivery = delivered_software.lookup(
- 'fusion_matlab', delivery_id=product.input('fusion_matlab').version)
- dist_root = pjoin(delivery.path, 'dist')
- envroot = pjoin(dist_root, 'env')
-
- LOG.debug("dist_root = '{}'".format(dist_root))
-
- # Get the required environment variables
- env = self.prepare_env(dist_root, inputs, context)
-
- # What is the path of the python interpreter
- py_interp = "{}/bin/python".format(envroot)
- LOG.debug("py_interp = '{}'".format(py_interp))
-
- bin_dir = pjoin(dist_root, 'bin')
-
- # Where are we running the package
- work_dir = abspath(curdir)
- LOG.debug("working dir = {}".format(work_dir))
-
- # What are our inputs?
- for input in inputs.keys():
- inputs_dir = dirname(inputs[input])
- LOG.debug("inputs['{}'] = {}".format(input, inputs[input]))
- LOG.debug("Inputs dir = {}".format(inputs_dir))
-
- current_dir = os.getcwd()
- geo_dir = pjoin(current_dir, 'GEO')
- fsn_dir = pjoin(current_dir, 'FSN')
- create_dir(geo_dir)
- create_dir(fsn_dir)
-
- os.chdir(geo_dir)
- geo_keys = [key for key in inputs.keys() if 'geo' in key]
- geo_inputs = {key: inputs[key] for key in geo_keys}
- symlink_inputs_to_working_dir(geo_inputs)
- os.chdir(current_dir)
-
- os.chdir(fsn_dir)
- fsn_keys = [key for key in inputs.keys() if 'l1b' in key]
- fsn_inputs = {key: inputs[key] for key in fsn_keys}
- symlink_inputs_to_working_dir(fsn_inputs)
- os.chdir(current_dir)
-
- # Setup the require keyword arguments for the fusion_matlab package
- kwargs = {}
- kwargs['py_interp'] = py_interp
- kwargs['bin_dir'] = bin_dir
- kwargs['env'] = env
- kwargs['satellite'] = satellite
- kwargs['granule'] = granule
- kwargs['fusion_ql_binary'] = 'run_plot_globalVIIRSfusion_fct.sh'
-
- # Run the fusion_matlab package
- rc_fusion_ql, fusion_ql_files = self.run_fusion_quicklooks(
- fsn_dir,
- geo_dir,
- **kwargs
- )
-
- extra_attrs = {
- 'begin_time': context['granule'],
- 'end_time': context['granule']+timedelta(days=1)-timedelta(seconds=1)
- }
-
- LOG.debug('extra_attrs = {}'.format(extra_attrs))
-
- LOG.debug('run_fusion_quicklooks() return value: {}'.format(rc_fusion_ql))
- LOG.info('run_fusion_quicklooks() generated {}'.format(fusion_ql_files))
-
- return {
- 'fused_l1b_ql_band27_asc': {'file': [x for x in fusion_ql_files if 'Band27.asc' in x][0], 'extra_attrs': extra_attrs},
- 'fused_l1b_ql_band27_desc': {'file': [x for x in fusion_ql_files if 'Band27.desc' in x][0], 'extra_attrs': extra_attrs},
- 'fused_l1b_ql_band33_asc': {'file': [x for x in fusion_ql_files if 'Band33.asc' in x][0], 'extra_attrs': extra_attrs},
- 'fused_l1b_ql_band33_desc': {'file': [x for x in fusion_ql_files if 'Band33.desc' in x][0], 'extra_attrs': extra_attrs},
- }