Skip to content
Snippets Groups Projects
Commit 989b52ed authored by Geoff Cureton's avatar Geoff Cureton
Browse files

Added logic to handle the CrIS change back to side 1.

parent 6b703329
Branches
No related tags found
No related merge requests found
...@@ -138,7 +138,6 @@ class FUSION_MATLAB(Computation): ...@@ -138,7 +138,6 @@ class FUSION_MATLAB(Computation):
version = 'c6' version = 'c6'
input_name = 'MOD03' input_name = 'MOD03'
LOG.debug("Ingesting input {} ({})...".format(input_name, version))
mgeo = dawg_catalog.files(satellite, input_name, interval, version=version) mgeo = dawg_catalog.files(satellite, input_name, interval, version=version)
if mgeo == []: if mgeo == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format(
...@@ -159,7 +158,6 @@ class FUSION_MATLAB(Computation): ...@@ -159,7 +158,6 @@ class FUSION_MATLAB(Computation):
version = 'c6' version = 'c6'
input_name = 'MOD021KM' input_name = 'MOD021KM'
LOG.debug("Ingesting input {} ({})...".format(input_name, version))
myd021km = dawg_catalog.files(satellite, input_name, interval, version=version) myd021km = dawg_catalog.files(satellite, input_name, interval, version=version)
if myd021km == []: if myd021km == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format(
...@@ -195,7 +193,6 @@ class FUSION_MATLAB(Computation): ...@@ -195,7 +193,6 @@ class FUSION_MATLAB(Computation):
input_name = 'AIRIBRAD' input_name = 'AIRIBRAD'
LOG.debug("Ingesting input {}...".format(input_name))
airs = dawg_catalog.files(satellite, 'AIRIBRAD', airs_interval) airs = dawg_catalog.files(satellite, 'AIRIBRAD', airs_interval)
if airs == []: if airs == []:
raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for interval {}'.format( raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for interval {}'.format(
...@@ -209,7 +206,6 @@ class FUSION_MATLAB(Computation): ...@@ -209,7 +206,6 @@ class FUSION_MATLAB(Computation):
''' '''
Build up a set of inputs for a single context Build up a set of inputs for a single context
''' '''
LOG.debug("Ingesting inputs for M02FSN version {} ...".format(context['version']))
# Get the product definition for 'V02FSN' # Get the product definition for 'V02FSN'
product = sipsprod.lookup_product_recurse('V02FSN', version=context['version']) product = sipsprod.lookup_product_recurse('V02FSN', version=context['version'])
...@@ -323,6 +319,8 @@ class FUSION_MATLAB(Computation): ...@@ -323,6 +319,8 @@ class FUSION_MATLAB(Computation):
if satellite == 'snpp': if satellite == 'snpp':
# Start of CrIS full spectral resolution for SNPP
snpp_fsr_start = datetime(2015, 12, 1, 0)
# Change to side 2 completed June 24, 2019 at 1848 UTC # Change to side 2 completed June 24, 2019 at 1848 UTC
snpp_side_2_start = datetime(2019, 6, 24, 18, 48) snpp_side_2_start = datetime(2019, 6, 24, 18, 48)
# Change back to side 1 completed July 14, 2021 at 1749 UTC # Change back to side 1 completed July 14, 2021 at 1749 UTC
...@@ -331,17 +329,27 @@ class FUSION_MATLAB(Computation): ...@@ -331,17 +329,27 @@ class FUSION_MATLAB(Computation):
# Product version 1.0.0 # Product version 1.0.0
if product_major_version == 1 and product_minor_version == 0: if product_major_version == 1 and product_minor_version == 0:
snpp_fsr_start = datetime(2015, 12, 1, 0)
while True: while True:
# Use CL1B_nsr (2.0.15) before snpp_fsr_start # Use CL1B_nsr (2.0.15) before snpp_fsr_start...
if (granule - snpp_fsr_start).total_seconds() <= 0.: if (granule < snpp_fsr_start):
input_name = 'CL1B_nsr' input_name = 'CL1B_nsr'
version = '2.0.15' version = '2.0.15'
break break
# Use CL1B (2.0.15) before snpp_side_2_start # Use CL1B (2.0.15) for initial FSR side 1 run...
if (granule - snpp_side_2_start).total_seconds() <= 0.: if (granule >= snpp_fsr_start) and (granule < snpp_side_2_start):
input_name = 'CL1B'
version = '2.0.15'
break
# Use CL1B (2.0.16) for initial FSR side 2 run...
if (granule >= snpp_side_2_start) and (granule < snpp_side_1_restart):
input_name = 'CL1B'
version = '2.0.16'
break
# Use CL1B (2.0.15) for second FSR side 1 run...
if (granule >= snpp_side_1_restart):
input_name = 'CL1B' input_name = 'CL1B'
version = '2.0.15' version = '2.0.15'
break break
...@@ -355,18 +363,23 @@ class FUSION_MATLAB(Computation): ...@@ -355,18 +363,23 @@ class FUSION_MATLAB(Computation):
snpp_fsr_start = datetime(2015, 11, 3) snpp_fsr_start = datetime(2015, 11, 3)
snpp_l1b_side_2_versions = { # Use CL1B_nsr (2.0.15) before snpp_fsr_start...
'2.0.15': '2.0.16', if (granule < snpp_fsr_start):
'2.1.3_pol_corr': '2.1.3_pol_corr_snpp_side2', input_name = 'CL1B_nsr'
'2.1.3': '2.1.3_pol_corr_snpp_side2'} version = '2.0.15'
# snpp_l1b_side_2_versions = {
# '2.0.15': '2.0.16',
# '2.1.3_pol_corr': '2.1.3_pol_corr_snpp_side2',
# '2.1.3': '2.1.3_pol_corr_snpp_side2'}
use_fsr = (granule >= snpp_fsr_start) # use_fsr = (granule >= snpp_fsr_start)
input_name = 'CL1B' if use_fsr else 'CL1B_nsr' # input_name = 'CL1B' if use_fsr else 'CL1B_nsr'
if granule >= snpp_side_2_start: # if granule >= snpp_side_2_start:
cris_major_version = int(version.split('.')[0]) # cris_major_version = int(version.split('.')[0])
if cris_major_version < 3: # if cris_major_version < 3:
version = snpp_l1b_side_2_versions[version] # version = snpp_l1b_side_2_versions[version]
else: else:
pass pass
...@@ -467,9 +480,21 @@ class FUSION_MATLAB(Computation): ...@@ -467,9 +480,21 @@ class FUSION_MATLAB(Computation):
LOG.debug("geo = \n\t{}".format('\n\t'.join(geo))) LOG.debug("geo = \n\t{}".format('\n\t'.join(geo)))
LOG.debug("sounder = \n\t{}".format('\n\t'.join(sounder))) LOG.debug("sounder = \n\t{}".format('\n\t'.join(sounder)))
version = product.input('julia_cris_viirs').version product_major_version = int(product.version.split('.')[0])
crisviirs = support_software.lookup('julia_cris_viirs', version=version) product_minor_version = int(product.version.split('.')[1])
crisviirs_exe = pjoin(crisviirs.path, 'bin/cris_viirs') LOG.debug(f"Product major version {product_major_version}")
LOG.debug(f"Product minor version {product_minor_version}")
if 'collopak' in product.inputstr():
version = product.input('collopak').version
crisviirs = support_software.lookup('collopak', version=version)
crisviirs_exe = pjoin(crisviirs.path, 'bin/crisviirs')
elif 'julia_cris_viirs' in product.inputstr():
version = product.input('julia_cris_viirs').version
crisviirs = support_software.lookup('julia_cris_viirs', version=version)
crisviirs_exe = pjoin(crisviirs.path, 'bin/cris_viirs')
else:
raise ValueError(f"Correct VIIRS/CrIS collocation package cannot be found in {product.inputstr()}")
LOG.debug(f"Current directory (cris_viirs_collocation) is {abspath(curdir)}") LOG.debug(f"Current directory (cris_viirs_collocation) is {abspath(curdir)}")
dummy_collo_file = sorted(glob(pjoin('../..', 'test_data', 'outputs', 'work', dummy_collo_file = sorted(glob(pjoin('../..', 'test_data', 'outputs', 'work',
...@@ -1302,275 +1327,3 @@ class FUSION_subset(Computation): ...@@ -1302,275 +1327,3 @@ class FUSION_subset(Computation):
'extra_attrs': extra_attrs, 'extra_attrs': extra_attrs,
} }
} }
class FUSION_MATLAB_QL(Computation):
parameters = ['granule', 'satellite', 'version']
outputs = ['fused_l1b_ql_band27_asc', 'fused_l1b_ql_band27_desc', 'fused_l1b_ql_band33_asc', 'fused_l1b_ql_band33_desc']
def find_contexts(self, time_interval, satellite, version):
'''
Takes the input time interval, and returns whatever 0Z datetimes fall within the interval.
'''
LOG.debug('interval = {}'.format(time_interval))
LOG.debug('satellite = {}'.format(satellite))
LOG.debug('version = {}'.format(version))
days = [day.left for day in time_interval.overlapping_interval_series(timedelta(days=1))]
return [{'satellite': satellite, 'version': version, 'granule': day} for day in days]
def _add_viirs_l1b_geo_input(self, product, context, task):
satellite = context['satellite']
granule = context['granule']
version = product.input('viirs_l1').version
input_name = sipsprod.satellite_esdt('V03MOD', satellite)
interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
# interval = TimeInterval(granule, granule+timedelta(days=0.01)-timedelta(seconds=1)) # DEBUG
LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version))
vgeom = dawg_catalog.files(satellite, input_name, interval, version=version)
if vgeom == []:
raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format(
input_name, version, interval))
elapsed_time = ((datetime.utcnow()-context['granule']).total_seconds())/86400.
LOG.info("After {:4.2f} days we have {} {} files ({}).".format(elapsed_time, len(vgeom), input_name, version))
if len(vgeom) < 228 and elapsed_time < 2.:
raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format(
input_name, version, interval))
for idx, geo_file in enumerate(vgeom):
LOG.debug('V03MOD granule {}: {} -> {}'.format(idx, geo_file.begin_time, geo_file.end_time))
task.input('geo_{}'.format(idx), geo_file)
def _add_cris_viirs_fusion_l1b_input(self, product, context, task):
satellite = context['satellite']
granule = context['granule']
version = '.'.join(product.input('FSNRAD_L2_VIIRS_CRIS').version.split('.')[:-1])
esdt = 'FSNRAD_L2_VIIRS_CRIS' + ('_SNPP' if satellite == 'snpp' else '_NOAA20')
interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1))
# interval = TimeInterval(granule, granule+timedelta(days=0.01)-timedelta(seconds=1)) # DEBUG
LOG.debug("Ingesting input {} ({}) for FSNRAD_L2_VIIRS_CRIS_DailyQL version {}".format(esdt, version, product.version))
vl1b = dawg_catalog.files(satellite, esdt, interval, version=version)
if vl1b == []:
raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format(
esdt, version, interval))
elapsed_time = ((datetime.utcnow()-context['granule']).total_seconds())/86400.
LOG.info("After {:4.2f} days we have {} {} files ({}).".format(elapsed_time, len(vl1b), esdt, version))
if len(vl1b) < 228 and elapsed_time < 2.:
raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format(
esdt, version, interval))
for idx, l1b_file in enumerate(vl1b):
LOG.debug('FSNRAD_L2_VIIRS_CRIS granule {}: {} -> {}'.format(idx, l1b_file.begin_time, l1b_file.end_time))
task.input('l1b_{}'.format(idx), l1b_file)
def build_task_snpp(self, context, task):
'''
Build up a set of inputs for a single context
'''
LOG.debug("Ingesting inputs for V02FSN_DailyQL version {} ...".format(context['version']))
# Get the product definition for 'FSNRAD_L2_VIIRS_CRIS_DailyQL'. Different versions may use regular or bias
# corrected VIIRS level-1b files.
product_name = 'FSNRAD_L2_VIIRS_CRIS_DailyQL'
product = sipsprod.lookup_product_recurse(product_name, version=context['version'])
# Ingest the required inputs, defined in the VNP02 product definition for context['version']
self._add_viirs_l1b_geo_input(product, context, task)
self._add_cris_viirs_fusion_l1b_input(product, context, task)
# Make the product definition available to build_task()
task.option('product', product)
@reraise_as(WorkflowNotReady, FileNotFound, prefix='FUSION_MATLAB_QL')
def build_task(self, context, task):
satellite = context['satellite']
if satellite == 'snpp' or satellite == 'noaa20':
self.build_task_snpp(context, task)
elif satellite == 'aqua':
raise ValueError('Satellite option "{}" not yet implemented.'.format(satellite))
else:
raise ValueError('Invalid satellite: {}'.format(satellite))
def prepare_env(self, dist_root, inputs, context):
LOG.debug("Running prepare_env()...")
LOG.debug("package_root = {}".format(self.package_root))
LOG.debug("dist_root = {}".format(dist_root))
env = dict(os.environ)
envroot = pjoin(dist_root, 'env')
LOG.debug("envroot = {}".format(envroot))
env['LD_LIBRARY_PATH'] = ':'.join([pjoin(envroot, 'lib'),
pjoin(dist_root, 'lib')])
env['PATH'] = ':'.join([pjoin(envroot, 'bin'),
pjoin(dist_root, 'bin'),
'/usr/bin:/bin'])
LOG.debug("env['PATH'] = \n\t{}".format(env['PATH'].replace(':', '\n\t')))
LOG.debug("env['LD_LIBRARY_PATH'] = \n\t{}".format(env['LD_LIBRARY_PATH'].replace(':', '\n\t')))
return env
def run_fusion_quicklooks(self, fsn_dir, geo_dir, **kwargs):
bin_dir = kwargs['bin_dir']
fusion_ql_binary = kwargs['fusion_ql_binary']
granule = kwargs['granule']
satellite = kwargs['satellite']
env = kwargs['env']
dt = granule
year = dt.utctimetuple().tm_year
jday = dt.utctimetuple().tm_yday
rc_fusion_ql = 0
# Get the matlab runtim version that we require
# matlab_version = '2015b'
matlab_version = '2018b'
# matlab_version = product.input('fusion_matlab').options['matlab_version']
# Run Matlab
# cmd = '{}/{} {} {} {} {}/ {}/'.format(
cmd = '{}/{} {} {} {} {}/ {}/ >> fusion_quicklooks.log'.format(
bin_dir,
fusion_ql_binary,
support_software.lookup('matlab', matlab_version).path,
year,
jday,
fsn_dir,
geo_dir
)
# Run the Matlab Fusion Quicklook code
try:
LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ', ' \\\n\t')))
rc_fusion_ql = 0
runscript(cmd, requirements=[], env=env)
except CalledProcessError as err:
rc_fusion_ql = err.returncode
LOG.error("Matlab binary {} returned a value of {}".format(fusion_ql_binary, rc_fusion_ql))
raise FusionProcessFailed('Matlab binary {} failed or was killed, returning a value of {}'.format(fusion_ql_binary, rc_fusion_ql))
# Move matlab file to the output directory
orig_fusion_ql_files = glob('*.png')
if len(orig_fusion_ql_files) != 0:
LOG.info('Found Fusion quicklook files {}.'.format(
', '.join([basename(x) for x in orig_fusion_ql_files])
))
else:
LOG.error('There are no Fusion quicklook files "*.png", aborting')
raise FusionFailedToProducePng('Matlab QL binary {} failed to produce any PNG files.'.format(fusion_ql_binary))
fusion_ql_files = []
for orig_fusion_ql_file in orig_fusion_ql_files:
filename_chunks = splitext(orig_fusion_ql_file)[0].split('_')
fusion_ql_files.append(
'{}.A{}.{}.{}.png'.format(
sipsprod.satellite_esdt('FSNRAD_L2_VIIRS_CRIS', satellite),
granule.strftime('%Y%j'),
filename_chunks[1],
filename_chunks[3]
)
)
for oldfile, newfile in zip(orig_fusion_ql_files, fusion_ql_files):
shutil.move(oldfile, newfile)
return rc_fusion_ql, fusion_ql_files
@reraise_as(WorkflowNotReady, FileNotFound, prefix='FUSION_MATLAB_QL')
def run_task(self, inputs, context):
LOG.debug("Running run_task()...")
for key in context.keys():
LOG.debug("run_task() context['{}'] = {}".format(key, context[key]))
granule = context['granule']
satellite = context['satellite']
# version = context['version']
# Get the location of the binary package
product = context['product']
delivery = delivered_software.lookup(
'fusion_matlab', delivery_id=product.input('fusion_matlab').version)
dist_root = pjoin(delivery.path, 'dist')
envroot = pjoin(dist_root, 'env')
LOG.debug("dist_root = '{}'".format(dist_root))
# Get the required environment variables
env = self.prepare_env(dist_root, inputs, context)
# What is the path of the python interpreter
py_interp = "{}/bin/python".format(envroot)
LOG.debug("py_interp = '{}'".format(py_interp))
bin_dir = pjoin(dist_root, 'bin')
# Where are we running the package
work_dir = abspath(curdir)
LOG.debug("working dir = {}".format(work_dir))
# What are our inputs?
for input in inputs.keys():
inputs_dir = dirname(inputs[input])
LOG.debug("inputs['{}'] = {}".format(input, inputs[input]))
LOG.debug("Inputs dir = {}".format(inputs_dir))
current_dir = os.getcwd()
geo_dir = pjoin(current_dir, 'GEO')
fsn_dir = pjoin(current_dir, 'FSN')
create_dir(geo_dir)
create_dir(fsn_dir)
os.chdir(geo_dir)
geo_keys = [key for key in inputs.keys() if 'geo' in key]
geo_inputs = {key: inputs[key] for key in geo_keys}
symlink_inputs_to_working_dir(geo_inputs)
os.chdir(current_dir)
os.chdir(fsn_dir)
fsn_keys = [key for key in inputs.keys() if 'l1b' in key]
fsn_inputs = {key: inputs[key] for key in fsn_keys}
symlink_inputs_to_working_dir(fsn_inputs)
os.chdir(current_dir)
# Setup the require keyword arguments for the fusion_matlab package
kwargs = {}
kwargs['py_interp'] = py_interp
kwargs['bin_dir'] = bin_dir
kwargs['env'] = env
kwargs['satellite'] = satellite
kwargs['granule'] = granule
kwargs['fusion_ql_binary'] = 'run_plot_globalVIIRSfusion_fct.sh'
# Run the fusion_matlab package
rc_fusion_ql, fusion_ql_files = self.run_fusion_quicklooks(
fsn_dir,
geo_dir,
**kwargs
)
extra_attrs = {
'begin_time': context['granule'],
'end_time': context['granule']+timedelta(days=1)-timedelta(seconds=1)
}
LOG.debug('extra_attrs = {}'.format(extra_attrs))
LOG.debug('run_fusion_quicklooks() return value: {}'.format(rc_fusion_ql))
LOG.info('run_fusion_quicklooks() generated {}'.format(fusion_ql_files))
return {
'fused_l1b_ql_band27_asc': {'file': [x for x in fusion_ql_files if 'Band27.asc' in x][0], 'extra_attrs': extra_attrs},
'fused_l1b_ql_band27_desc': {'file': [x for x in fusion_ql_files if 'Band27.desc' in x][0], 'extra_attrs': extra_attrs},
'fused_l1b_ql_band33_asc': {'file': [x for x in fusion_ql_files if 'Band33.asc' in x][0], 'extra_attrs': extra_attrs},
'fused_l1b_ql_band33_desc': {'file': [x for x in fusion_ql_files if 'Band33.desc' in x][0], 'extra_attrs': extra_attrs},
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment