diff --git a/.gitignore b/.gitignore index 8e18231219e6dff935fc38fd40c7d7ca73a03122..74bb064ec50d44d2238e68bc5e62de03ac9e6074 100644 --- a/.gitignore +++ b/.gitignore @@ -61,6 +61,9 @@ Thumbs.db # pycharm projects .idea +# Visual Studio +.vscode + # Compiled object files *.o @@ -76,9 +79,4 @@ Thumbs.db # VIM swap files .*.sw* -### Various OS Files ### -.DS_Store -._.DS_Store -Thumbs.db - # repos specific ignores diff --git a/example_local_prepare.py b/example_local_prepare.py index 021bf1e95ddfc4649ab7a15bf21065717981eeec..16bae75b6d66f0255c8c28c2e78fa507ce01a11a 100644 --- a/example_local_prepare.py +++ b/example_local_prepare.py @@ -23,9 +23,7 @@ comp = FUSION_MATLAB() satellite = 'snpp' #satellite = 'aqua' -version = '1.0dev4' # base VIIRS l1vel-1b -version = '1.0dev5' # bias corrected VIIRS level-1b - +version = '1.0dev7' # base VIIRS level-1b def local_execute_example(interval, satellite, version, skip_prepare=False, skip_execute=False, verbosity=2): @@ -61,7 +59,7 @@ def local_execute_example(interval, satellite, version, skip_prepare=False, skip else: LOG.error("There are no valid {} contexts for the interval {}.".format(satellite, interval)) -def print_contexts(interval, satellite, type, version): +def print_contexts(interval, satellite, version): contexts = comp.find_contexts(interval, satellite, version) for context in contexts: print context diff --git a/example_local_prepare_ql.py b/example_local_prepare_ql.py index 964533cca821adb665b66cb0a39de9765cc7b35b..ea9f969c81e2d49aab480cf20ab56391e7479a7d 100644 --- a/example_local_prepare_ql.py +++ b/example_local_prepare_ql.py @@ -23,8 +23,7 @@ comp = FUSION_MATLAB_QL() satellite = 'snpp' #satellite = 'aqua' -delivery_id = '20180620-1' -version = '1.0dev2' +version = '1.0dev5' # base VIIRS level-1b def local_execute_example(interval, satellite, version, skip_prepare=False, skip_execute=False, verbosity=2): @@ -47,11 +46,11 @@ def local_execute_example(interval, satellite, version, skip_prepare=False, skip try: if not skip_prepare: - LOG.info("Running fusion_matlab local_prepare()...") + LOG.info("Running fusion_matlab_ql local_prepare()...") LOG.info("Preparing context... {}".format(contexts[0])) local_prepare(comp, contexts[0]) if not skip_execute: - LOG.info("Running local_execute()...") + LOG.info("Running fusion_matlab_ql local_execute()...") LOG.info("Running context... {}".format(contexts[0])) local_execute(comp, contexts[0]) except Exception, err: diff --git a/source/flo/__init__.py b/source/flo/__init__.py index e8c33d6138b9cd09eed4e98a78729eafd7fda771..f2a6ba0e4c010052293efc30e8fbc88e185611c7 100644 --- a/source/flo/__init__.py +++ b/source/flo/__init__.py @@ -106,6 +106,7 @@ class FUSION_MATLAB(Computation): else: return [] + return [{'satellite': satellite, 'version': version, 'granule': g} for g in time_interval.contained_series(granule_length)] @@ -113,51 +114,76 @@ class FUSION_MATLAB(Computation): satellite = context['satellite'] granule = context['granule'] granule_length = timedelta(minutes=5) + wedge = timedelta(seconds=1) - myd03_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1)) - myd03 = dawg_catalog.files('aqua','MOD03', myd03_interval, version='c6') - if myd03 == []: - raise WorkflowNotReady('Unable to find matching MYD03 granule for interval {}'.format(myd03_interval)) - myd03_file = myd03[0] - LOG.debug('MYD03 granule path: {}'.format(myd03_file.path)) - #if not exists(myd03_file.path): - #raise WorkflowNotReady('Nominally valid MYD03 path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd03_file.path, granule, myd03_interval)) + interval = TimeInterval(granule, granule+granule_length-wedge) - task.input('geo', myd03_file) + version = 'c6' + input_name = 'MOD03' + + LOG.debug("Ingesting input {} ({})...".format(input_name, version)) + mgeo = dawg_catalog.files(satellite, input_name, interval, version=version) + if mgeo == []: + raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( + input_name, version, interval)) + + for idx, mgeo_file in enumerate(mgeo): + LOG.debug('MODIS GEO granule {}: {} -> {}'.format(idx, mgeo_file.begin_time, mgeo_file.end_time)) + task.input('geo_{}'.format(idx), mgeo_file) def _add_modis_l1b_m_input(self, context, task): satellite = context['satellite'] granule = context['granule'] granule_length = timedelta(minutes=5) + wedge = timedelta(seconds=1) + + interval = TimeInterval(granule, granule+granule_length-wedge) - myd021km_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1)) - myd021km = dawg_catalog.files('aqua','MOD021KM', myd021km_interval, version='c6') + version = 'c6' + input_name = 'MOD021KM' + + LOG.debug("Ingesting input {} ({})...".format(input_name, version)) + myd021km = dawg_catalog.files(satellite, input_name, interval, version=version) if myd021km == []: - raise WorkflowNotReady('Unable to find matching MYD021KM granule for interval {}'.format(myd021km_interval)) - myd021km_file = myd021km[0] - LOG.debug('MYD021KM granule path: {}'.format(myd021km_file.path)) + raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( + input_name, version, interval)) - task.input('l1b', myd021km_file) + for idx, myd021km_file in enumerate(myd021km): + LOG.debug('MODIS L1B granule {}: {} -> {}'.format(idx, myd021km_file.begin_time, myd021km_file.end_time)) + task.input('l1b_{}'.format(idx), myd021km_file) def _add_airs_l1b_input(self, context, task): satellite = context['satellite'] granule = context['granule'] granule_length = timedelta(minutes=5) + wedge = timedelta(seconds=1) + + myd03_interval = TimeInterval(granule, granule+granule_length-wedge) - myd03_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1)) - myd03 = dawg_catalog.files('aqua','MOD03', myd03_interval, version='c6') + version = 'c6' + input_name = 'MOD03' + + myd03 = dawg_catalog.files(satellite, input_name, myd03_interval, version=version) if myd03 == []: - raise WorkflowNotReady('Unable to find matching MYD03 granule for interval {}'.format(myd03_interval)) - myd03_file = myd03[0] - LOG.debug('MYD03 granule path: {}'.format(myd03_file.path)) + raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( + input_name, version, myd03_interval)) + + myd03 = sorted(myd03, key=lambda x: basename(str(x.begin_time))) - buf = 0 # seconds - airs_begin = myd03_file.begin_time - timedelta(seconds=buf) - airs_end = myd03_file.end_time + timedelta(seconds=buf) + buf = 1 # seconds + # airs_begin = myd03[0].begin_time - timedelta(seconds=buf) + # airs_end = myd03[-1].end_time + timedelta(seconds=buf) + airs_begin = myd03[0].begin_time - granule_length #+ timedelta(seconds=buf) + airs_end = myd03[-1].end_time + granule_length #- timedelta(seconds=buf) airs_interval = TimeInterval(airs_begin, airs_end) - airs = dawg_catalog.files('aqua', 'AIRIBRAD', airs_interval) + + input_name = 'AIRIBRAD' + + LOG.debug("Ingesting input {}...".format(input_name)) + airs = dawg_catalog.files(satellite, 'AIRIBRAD', airs_interval) if airs == []: - raise WorkflowNotReady('Unable to find matching AIRS granules for interval {}'.format(airs_interval)) + raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for interval {}'.format( + input_name, airs_interval)) for idx, airs_file in enumerate(airs): LOG.debug('AIRS granule {}: {} -> {}'.format(idx, airs_file.begin_time, airs_file.end_time)) @@ -167,7 +193,7 @@ class FUSION_MATLAB(Computation): ''' Build up a set of inputs for a single context ''' - LOG.debug("Ingesting inputs for M02FSN version {} ...".format(context['version'])) + LOG.info("Ingesting inputs for M02FSN version {} ...".format(context['version'])) # Get the product definition for 'V02FSN' product = sipsprod.lookup_product_recurse('V02FSN', version=context['version']) @@ -184,33 +210,49 @@ class FUSION_MATLAB(Computation): def _add_viirs_l1b_geo_input(self, product, context, task): satellite = context['satellite'] granule = context['granule'] + granule_length = timedelta(minutes=6) viirs_l1 = product.input('viirs_l1') v02mod_bt_sc = product.input('V02MOD-bt-sc') + interval = TimeInterval(granule-granule_length, granule+granule_length) + if viirs_l1: version = viirs_l1.version elif v02mod_bt_sc: version = v02mod_bt_sc.input('viirs_l1').version input_name = sipsprod.satellite_esdt('V03MOD', satellite) + LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) - vgeom = dawg_catalog.file(satellite, input_name, granule, version=version) - task.input('geo', vgeom) + vgeom = dawg_catalog.files(satellite, input_name, interval, version=version) + if vgeom == []: + raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( + input_name, version, interval)) + + for idx, vgeom_file in enumerate(vgeom): + LOG.debug('VIIRS GEO granule {}: {} -> {}'.format(idx, vgeom_file.begin_time, vgeom_file.end_time)) + task.input('geo_{}'.format(idx), vgeom_file) def _add_viirs_l1b_m_input(self, product, context, task): satellite = context['satellite'] granule = context['granule'] + granule_length = timedelta(minutes=6) viirs_l1 = product.input('viirs_l1') v02mod_bt_sc = product.input('V02MOD-bt-sc') + interval = TimeInterval(granule-granule_length, granule+granule_length) + # Standard NASA VIIRS level-1 radiances and BT if viirs_l1: version = viirs_l1.version input_name = sipsprod.satellite_esdt('V02MOD', satellite) LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) - vl1b = dawg_catalog.file(satellite, input_name, granule, version=version) + vl1b = dawg_catalog.files(satellite, input_name, interval, version=version) + if vl1b == []: + raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( + input_name, version, interval)) # Standard NASA VIIRS level-1 radiances and BT, bias corrected to match MODIS (or something...) elif v02mod_bt_sc: @@ -224,23 +266,29 @@ class FUSION_MATLAB(Computation): else: raise ValueError('V02MOD -- missing one of (V02MOD, V02MOD-bc) input for VIIRS') - task.input('l1b', vl1b) + for idx, vl1b_file in enumerate(vl1b): + LOG.debug('VIIRS L1B granule {}: {} -> {}'.format(idx, vl1b_file.begin_time, vl1b_file.end_time)) + task.input('l1b_{}'.format(idx), vl1b_file) def _add_cris_l1b_input(self, product, context, task): satellite = context['satellite'] granule = context['granule'] granule_length = timedelta(minutes=6) - cris_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1)) - version = product.input('cris_l1').version + + cris_l1 = product.input('cris_l1') + + interval = TimeInterval(granule-granule_length, granule+granule_length) + + version = cris_l1.version input_name = 'CL1B' LOG.debug("Ingesting input {} ({}) for V02FSN version {}".format(input_name, version, product.version)) - cris = dawg_catalog.files(satellite, input_name, cris_interval, version=version) + cris = dawg_catalog.files(satellite, input_name, interval, version=version) if cris == []: raise WorkflowNotReady('FUSION_MATLAB: Missing {} inputs for version {} and interval {}'.format( - input_name, version, cris_interval)) + input_name, version, interval)) for idx, cris_file in enumerate(cris): - LOG.debug('CrIS granule {}: {} -> {}'.format(idx, cris_file.begin_time, cris_file.end_time)) + LOG.debug('CrIS L1B granule {}: {} -> {}'.format(idx, cris_file.begin_time, cris_file.end_time)) task.input('sounder_{}'.format(idx), cris_file) def build_task_snpp(self, context, task): @@ -277,7 +325,7 @@ class FUSION_MATLAB(Computation): def mend_viirs_l1b(self, product, geo, l1b, dummy=False): out_fn = basename(l1b).replace('.nc', '.bowtie_restored.nc') - LOG.info('out_fn = {}'.format(out_fn)) + LOG.debug('out_fn = {}'.format(out_fn)) shutil.copy(l1b, out_fn) version = product.input('viirsmend').version @@ -286,53 +334,50 @@ class FUSION_MATLAB(Computation): viirsmend_exe = pjoin(viirsmend.path,'bin/viirsl1mend') cmd = '{} {} {} {}'.format(py_exe, viirsmend_exe, out_fn, geo) - LOG.debug('cmd = {}'.format(cmd)) - dummy_bowtie_rest_file = '/mnt/sdata/geoffc/fusion_matlab/work/snpp_temp_outputs/outputs/tmp5PwThb/VNP02MOD.A2018033.1836.001.2018033235822.uwssec.bowtie_restored.nc' + dummy_bowtie_rest_file = sorted(glob(pjoin('/data/geoffc/fusion_matlab/work/local_processing', + 'snpp_test_case/outputs/tmptkFpCs', + 'VNP02MOD*bowtie*.nc'))) if not dummy: runscript(cmd, requirements=[viirsmend]) else: LOG.debug('dummy cmd = "cp {} {}"'.format(dummy_bowtie_rest_file, out_fn)) - shutil.copy(dummy_bowtie_rest_file, out_fn) + rc_copy = [shutil.copy(bowtie_file, out_fn) for bowtie_file in dummy_bowtie_rest_file] + LOG.debug('rc_copy = {}'.format(rc_copy)) return out_fn - def cris_viirs_collocation(self, product, inputs, dummy=False): - - LOG.info('inputs = {}'.format(inputs)) - input_keys = inputs.keys() - input_keys.sort() - - sounder_keys = [key for key in input_keys if 'sounder' in key] - cris_files = [inputs[key] for key in sounder_keys] + def cris_viirs_collocation(self, product, geo, sounder, dummy=False): - vgeom_file = inputs['geo'] + LOG.debug("geo = \n\t{}".format('\n\t'.join(geo))) + LOG.debug("sounder = \n\t{}".format('\n\t'.join(sounder))) version = product.input('collopak').version crisviirs = support_software.lookup('collopak', version=version) crisviirs_exe = pjoin(crisviirs.path,'bin/crisviirs') - dummy_collo_file = '/mnt/sdata/geoffc/fusion_matlab/work/snpp_temp_outputs/outputs/tmp5PwThb/colloc.cris_snpp.viirs_m_snpp.20180202T183600_183600.nc' + dummy_collo_file = sorted(glob(pjoin('/data/geoffc/fusion_matlab/work/local_processing', + 'snpp_test_case/outputs/tmptkFpCs', + 'colloc.cris_snpp.viirs_m_snpp.*.nc'))) if not dummy: - for cris_file in cris_files: - cmd = '{} {} {} > /dev/null'.format(crisviirs_exe, cris_file, vgeom_file) + for cris, vgeo in zip(sounder, geo): + cmd = '{} {} {} > /dev/null'.format(crisviirs_exe, cris, vgeo) - LOG.info('cmd = {}'.format(cmd)) runscript(cmd, requirements=[]) else: LOG.debug('dummy cmd = "cp {} {}"'.format(dummy_collo_file, abspath(curdir))) - shutil.copy(dummy_collo_file, './') + rc_copy = [shutil.copy(collo_file, './') for collo_file in dummy_collo_file] + LOG.debug('rc_copy = {}'.format(rc_copy)) - collo_files = glob('colloc.*.nc') - collo_files.sort() + collo_files = sorted(glob('colloc.*.nc')) return collo_files def airs_modis_collocation(self, product, inputs, dummy=False): - LOG.info('inputs = {}'.format(inputs)) + LOG.debug('inputs = {}'.format(inputs)) input_keys = inputs.keys() input_keys.sort() @@ -351,7 +396,7 @@ class FUSION_MATLAB(Computation): for airs_file in airs_files: cmd = '{} {} {} > /dev/null'.format(airsmodis_exe, airs_file, modis_file) - LOG.info('cmd = {}'.format(cmd)) + LOG.debug('cmd = {}'.format(cmd)) runscript(cmd, requirements=[]) else: LOG.debug('dummy cmd = "cp {} {}"'.format(' '.join(dummy_collo_files), abspath(curdir))) @@ -363,7 +408,7 @@ class FUSION_MATLAB(Computation): return collo_files - def run_fusion_matlab(self, product, geo_file, l1b_file, sounder_files, collo_files, **kwargs): + def run_fusion_matlab(self, product, geo_files, l1b_files, sounder_files, collo_files, **kwargs): ''' Run the Matlab fusion binary on the input level-1b files to generate the *.mat output file. ''' @@ -379,10 +424,9 @@ class FUSION_MATLAB(Computation): dummy = kwargs['dummy'] if dummy: - if satellite=='snpp': - dummy_matlab_file = '/mnt/sdata/geoffc/fusion_matlab/work/snpp_temp_outputs/outputs/tmp5PwThb/fusion_output.mat' - if satellite=='aqua': - dummy_matlab_file = '/mnt/sdata/geoffc/fusion_matlab/work/aqua_temp_outputs/outputs/tmpMUoHF3/fusion_output.mat' + dummy_matlab_file = pjoin('/data/geoffc/fusion_matlab/work/local_processing', + 'snpp_test_case/outputs/tmptkFpCs', + 'fused_outputs/fusion_output.mat') rc_fusion = 0 @@ -395,8 +439,8 @@ class FUSION_MATLAB(Computation): bin_dir, fusion_binary, support_software.lookup('matlab', matlab_version).path, - geo_file, - l1b_file, + ' '.join(geo_files), + ' '.join(l1b_files), ' '.join(sounder_files), ' '.join(collo_files), ' '.join(anc_paths) @@ -415,7 +459,7 @@ class FUSION_MATLAB(Computation): runscript(cmd, requirements=[], env=env) else: LOG.debug('dummy cmd = "cp {} {}"'.format(dummy_matlab_file, tmp_work_dir)) - shutil.copy(dummy_matlab_file, tmp_work_dir) # DEBUG + shutil.copy(dummy_matlab_file, tmp_work_dir) except CalledProcessError as err: rc_fusion = err.returncode LOG.error("Matlab binary {} returned a value of {}".format(fusion_binary, rc_fusion)) @@ -427,7 +471,7 @@ class FUSION_MATLAB(Computation): matlab_file = matlab_file[0] LOG.debug('Found Matlab file "{}", moving to {}...'.format(matlab_file, fused_output_dir)) if exists(pjoin(fused_output_dir, matlab_file)): - LOG.info('{} exists, removing...'.format(pjoin(fused_output_dir, matlab_file))) + LOG.debug('{} exists, removing...'.format(pjoin(fused_output_dir, matlab_file))) os.remove(pjoin(fused_output_dir, matlab_file)) shutil.move(matlab_file, fused_output_dir) matlab_file = glob(pjoin(fused_output_dir, matlab_file))[0] @@ -503,7 +547,7 @@ class FUSION_MATLAB(Computation): else: LOG.debug('dummy cmd = "cp {} {}"'.format(dummy_fusion_file, pjoin(fused_output_dir, basename(l1b_file)))) - shutil.copy(dummy_fusion_file, pjoin(fused_output_dir, basename(l1b_file))) # DEBUG + shutil.copy(dummy_fusion_file, pjoin(fused_output_dir, basename(l1b_file))) except CalledProcessError as err: rc_fusion = err.returncode LOG.error("CF converter {} returned a value of {}".format(conversion_bin, rc_fusion)) @@ -586,58 +630,58 @@ class FUSION_MATLAB(Computation): return 0 if passfail else 1 - def update_global_attrs(self, netcdf_file, readme_file, **kwargs): + # def update_global_attrs(self, netcdf_file, readme_file, **kwargs): - satellite = kwargs['satellite'] + # satellite = kwargs['satellite'] - # Get the git repo information - repo_attrs = [] - try: - LOG.debug('Opening {}...'.format(readme_file)) - readme_obj = open(readme_file, 'ro') - line_obj = readme_obj.readlines() - for idx, line in enumerate(line_obj): - if '.git' in line: - repo_line = line.lstrip(' -*,*').rstrip(' -*,;') - commit_line = line_obj[idx+1].lstrip(' -*,;').rstrip(' -*,;') - git_line = '{}; {}'.format(repo_line, commit_line).replace('\n', '') - LOG.debug('{}'.format(git_line)) - repo_attrs.append(git_line) - except Exception: - LOG.debug(traceback.format_exc()) - - readme_obj.close() - - # Update the various file global attributes - LOG.debug('Adding attributes to {} ...'.format(netcdf_file)) - if splitext(netcdf_file)[-1] == '.nc': - args = (netcdf_file, "a") - kwargs = {'format': "NETCDF4"} - file_open = Dataset - if splitext(netcdf_file)[-1] == '.hdf': - args = (netcdf_file, SDC.WRITE) - kwargs = {} - file_open = SD + # # Get the git repo information + # repo_attrs = [] + # try: + # LOG.debug('Opening {}...'.format(readme_file)) + # readme_obj = open(readme_file, 'ro') + # line_obj = readme_obj.readlines() + # for idx, line in enumerate(line_obj): + # if '.git' in line: + # repo_line = line.lstrip(' -*,*').rstrip(' -*,;') + # commit_line = line_obj[idx+1].lstrip(' -*,;').rstrip(' -*,;') + # git_line = '{}; {}'.format(repo_line, commit_line).replace('\n', '') + # LOG.debug('{}'.format(git_line)) + # repo_attrs.append(git_line) + # except Exception: + # LOG.debug(traceback.format_exc()) - try: + # readme_obj.close() - file_obj = file_open(*args, **kwargs) + # # Update the various file global attributes + # LOG.debug('Adding attributes to {} ...'.format(netcdf_file)) + # if splitext(netcdf_file)[-1] == '.nc': + # args = (netcdf_file, "a") + # kwargs = {'format': "NETCDF4"} + # file_open = Dataset + # if splitext(netcdf_file)[-1] == '.hdf': + # args = (netcdf_file, SDC.WRITE) + # kwargs = {} + # file_open = SD - # Update the attributes, moving to the end - for idx, attr in enumerate(repo_attrs): - LOG.debug('{}'.format(attr)) - setattr(file_obj, 'source_git_repo_{}'.format(idx), attr) + # try: - except Exception: - LOG.warning("\tProblem setting attributes in output file {}".format(netcdf_file)) - LOG.debug(traceback.format_exc()) + # file_obj = file_open(*args, **kwargs) - if netcdf_file.split('.')[-1] == 'nc': - file_obj.close() - elif netcdf_file.split('.')[-1] == 'hdf': - file_obj.end() + # # Update the attributes, moving to the end + # for idx, attr in enumerate(repo_attrs): + # LOG.debug('{}'.format(attr)) + # setattr(file_obj, 'source_git_repo_{}'.format(idx), attr) - return + # except Exception: + # LOG.warning("\tProblem setting attributes in output file {}".format(netcdf_file)) + # LOG.debug(traceback.format_exc()) + + # if netcdf_file.split('.')[-1] == 'nc': + # file_obj.close() + # elif netcdf_file.split('.')[-1] == 'hdf': + # file_obj.end() + + # return def prepare_env(self, dist_root, inputs, context): LOG.debug("Running prepare_env()...") @@ -671,7 +715,6 @@ class FUSION_MATLAB(Computation): granule = context['granule'] satellite = context['satellite'] - version = context['version'] # Get the location of the binary package product = context['product'] @@ -680,7 +723,7 @@ class FUSION_MATLAB(Computation): dist_root = pjoin(delivery.path, 'dist') envroot = pjoin(dist_root, 'env') - LOG.info("dist_root = '{}'".format(dist_root)) + LOG.debug("dist_root = '{}'".format(dist_root)) # Get the required environment variables env = self.prepare_env(dist_root, inputs, context) @@ -700,24 +743,38 @@ class FUSION_MATLAB(Computation): LOG.debug("Inputs dir = {}".format(inputs_dir)) # Are we doing a dummy run? - #dummy = True + # dummy = True dummy = False + geo_keys = sorted([key for key in inputs.keys() if 'geo' in key]) + l1b_keys = sorted([key for key in inputs.keys() if 'l1b' in key]) + sounder_keys = sorted([key for key in inputs.keys() if 'sounder' in key]) + + LOG.debug("geo_keys = {}".format(geo_keys)) + LOG.debug("l1b_keys = {}".format(l1b_keys)) + LOG.debug("sounder_keys = {}".format(sounder_keys)) + + geo = [inputs[key] for key in geo_keys] + l1b = [inputs[key] for key in l1b_keys] + sounder = [inputs[key] for key in sounder_keys] + + LOG.debug("geo = \n\t{}".format('\n\t'.join(geo))) + LOG.debug("l1b = \n\t{}".format('\n\t'.join(l1b))) + LOG.debug("sounder = \n\t{}".format('\n\t'.join(sounder))) + # Run viirsmend on the viirs level-1b, and generate the CrIS/VIIRS collocation if satellite == 'snpp': - geo = inputs['geo'] - if 'bowtie' in basename(inputs['l1b']): - # Bias corrected l1b has already been mended, so just copy to working dir - l1b = basename(inputs['l1b']) - shutil.copy(inputs['l1b'], l1b) - else: - # Mend bowtie pixels of NASA VIIRS l1b file... - l1b = self.mend_viirs_l1b(product, inputs['geo'], inputs['l1b'], dummy=dummy) + for idx, l1b_file in enumerate(l1b): + if 'bowtie' in basename(l1b_file): + # Bias corrected l1b has already been mended, so just copy to working dir + l1b[idx] = basename(l1b_file) + shutil.copy(l1b_file, l1b[idx]) + else: + # Mend bowtie pixels of NASA VIIRS l1b file... + l1b[idx] = self.mend_viirs_l1b(product, geo[idx], l1b[idx], dummy=dummy) - sounder_keys = [key for key in inputs.keys() if 'sounder' in key] - sounder = [inputs[key] for key in sounder_keys] - collo = self.cris_viirs_collocation(product, inputs, dummy=dummy) + collo = self.cris_viirs_collocation(product, geo, sounder, dummy=dummy) # Generate the AIRS/MODIS collocation if satellite == 'aqua': @@ -727,10 +784,10 @@ class FUSION_MATLAB(Computation): sounder = [inputs[key] for key in sounder_keys] collo = self.airs_modis_collocation(product, inputs, dummy=dummy) - LOG.info('geo = {}'.format(geo)) - LOG.info('l1b = {}'.format(l1b)) - LOG.info('sounder = {}'.format(sounder)) - LOG.info('collo = {}'.format(collo)) + LOG.debug("geo = \n\t{}".format('\n\t'.join(geo))) + LOG.debug("l1b = \n\t{}".format('\n\t'.join(l1b))) + LOG.debug("sounder = \n\t{}".format('\n\t'.join(sounder))) + LOG.debug("collo = \n\t{}".format('\n\t'.join(collo))) bin_dir = pjoin(dist_root, 'bin') anc_dir = pjoin(dist_root, 'luts') @@ -747,12 +804,16 @@ class FUSION_MATLAB(Computation): kwargs['dummy'] = dummy if satellite=='snpp': + geo_file = geo[1] + l1b_file = l1b[1] kwargs['anc_paths'] = [pjoin(anc_dir, 'modis_aqua.srf.nc'), pjoin(anc_dir, 'NG_VIIRS_NPP_RSR_filtered_Oct2011_BA/')] kwargs['fusion_binary'] = 'run_imagersounderfusion_V.sh' kwargs['matlab_file_glob'] = 'fusion_output.mat' kwargs['conversion_bin'] = pjoin(envroot, 'bin', 'l1b-fusion-viirs-cris') elif satellite=='aqua': + geo_file = geo[0] + l1b_file = l1b[0] kwargs['anc_paths'] = [pjoin(anc_dir, 'L2.chan_prop.2005.03.01.v9.5.1.txt'), pjoin(anc_dir, 'modis_aqua.srf.nc'), pjoin(anc_dir, 'modis_conv_error_2005.mat')] @@ -782,7 +843,7 @@ class FUSION_MATLAB(Computation): rc_fusion, fused_l1b_file, output_attrs = self.convert_matlab_to_netcdf( product, matlab_file, - l1b, + l1b_file, **kwargs) LOG.debug('convert_matlab_to_netcdf() return value: {}'.format(rc_fusion)) @@ -816,14 +877,14 @@ class FUSION_MATLAB(Computation): elif v02mod_bt_sc: l1_version = v02mod_bt_sc.input('viirs_l1').version - input_fns, lut_version, lut_created = get_viirs_l1_luts(l1b, geo_fn=geo) + input_fns, lut_version, lut_created = get_viirs_l1_luts(l1b_file, geo_fn=geo_file) ancillary_fns = [] out_compress = nc_compress set_official_product_metadata( output_attrs['esdt'], product.version, output_attrs['collection'], product.input('fusion_matlab').version, context['satellite'], - fused_l1b_file, geo, + fused_l1b_file, geo_file, input_fns, ancillary_fns, l1_version, lut_version, lut_created, product.inputstr(), output_attrs['created']) @@ -873,7 +934,9 @@ class FUSION_MATLAB_QL(Computation): if vgeom == []: raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format( input_name, version, interval)) - if len(vgeom) < 228: + elapsed_time = ((datetime.utcnow()-context['granule']).total_seconds())/86400. + LOG.info("After {:4.2f} days we have {} {} files ({}).".format(elapsed_time, len(vgeom), input_name, version)) + if len(vgeom) < 228 and elapsed_time < 2.: raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format( input_name, version, interval)) for idx, geo_file in enumerate(vgeom): @@ -886,12 +949,14 @@ class FUSION_MATLAB_QL(Computation): version = product.input('V02FSN').version input_name = sipsprod.satellite_esdt('V02FSN', satellite) interval = TimeInterval(granule, granule+timedelta(days=1.00)-timedelta(seconds=1)) - LOG.debug("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version)) + LOG.info("Ingesting input {} ({}) for V02FSN_DailyQL version {}".format(input_name, version, product.version)) vl1b = dawg_catalog.files(satellite, input_name, interval, version=version) if vl1b == []: raise WorkflowNotReady('Missing {} inputs for version {} and interval {}'.format( input_name, version, interval)) - if len(vl1b) < 228: + elapsed_time = ((datetime.utcnow()-context['granule']).total_seconds())/86400. + LOG.info("After {:4.2f} days we have {} {} files ({}).".format(elapsed_time, len(vl1b), input_name, version)) + if len(vl1b) < 228 and elapsed_time < 2.: raise WorkflowNotReady('Number of available {} inputs is < 228, for version {} and interval {}, aborting...'.format( input_name, version, interval)) for idx, l1b_file in enumerate(vl1b): @@ -904,7 +969,7 @@ class FUSION_MATLAB_QL(Computation): ''' LOG.debug("Ingesting inputs for V02FSN_DailyQL version {} ...".format(context['version'])) - # Get the product definition for 'V02FSN' + # Get the product definition for 'V02FSN_DailyQL' product = sipsprod.lookup_product_recurse('V02FSN_DailyQL', version=context['version']) # Ingest the required inputs, defined in the VNP02 product definition for context['version'] @@ -1035,7 +1100,7 @@ class FUSION_MATLAB_QL(Computation): dist_root = pjoin(delivery.path, 'dist') envroot = pjoin(dist_root, 'env') - LOG.info("dist_root = '{}'".format(dist_root)) + LOG.debug("dist_root = '{}'".format(dist_root)) # Get the required environment variables env = self.prepare_env(dist_root, inputs, context) @@ -1105,4 +1170,4 @@ class FUSION_MATLAB_QL(Computation): 'fused_l1b_ql_band27_desc': {'file': [x for x in fusion_ql_files if 'Band27.desc' in x][0], 'extra_attrs': extra_attrs}, 'fused_l1b_ql_band33_asc': {'file': [x for x in fusion_ql_files if 'Band33.asc' in x][0], 'extra_attrs': extra_attrs}, 'fused_l1b_ql_band33_desc': {'file': [x for x in fusion_ql_files if 'Band33.desc' in x][0], 'extra_attrs': extra_attrs}, - } + } \ No newline at end of file diff --git a/source/flo/run_fusion_1gran.py b/source/flo/run_fusion_1gran.py new file mode 100644 index 0000000000000000000000000000000000000000..c2320d348d4ada5e091e363dc47e6ec29ef9887f --- /dev/null +++ b/source/flo/run_fusion_1gran.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +Script to run fusion code (either for MODIS/AIRS or VIIRS/CrIS) +for one selected imager granule + +Note, it is assumed that imager/sounder collocation files are available (one per sounder file) + +Created by Geoff Cureton <geoff.cureton@ssec.wisc.edu> on 2017-08-10. +Copyright (c) 2018 University of Wisconsin Regents. +Licensed under GNU GPLv3. +""" + +import os +from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, splitext, join as pjoin +import sys +import shutil +import logging +import traceback +import time +from glob import glob +from subprocess import call, check_call +from datetime import datetime + +from utils import execution_time, create_dir, satellite_esdt, product_filename + +# every module should have a LOG object +LOG = logging.getLogger(__file__) + +def setup_logging(verbosity): + LOG.debug("Verbosity is {}".format(verbosity)) + + levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] + level = levels[verbosity if verbosity < 4 else 3] + + # Set up the logging + #console_logFormat = '%(asctime)s : %(name)-12s: %(levelname)-8s %(message)s' + #console_logFormat = '%(levelname)s:%(name)s:%(msg)s') # [%(filename)s:%(lineno)d]' + #console_logFormat = '%(asctime)s : %(funcName)s:%(lineno)d: %(message)s' + #console_logFormat = '%(message)s' + console_logFormat = '(%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d: %(message)s' + #console_logFormat = '%(asctime)s : (%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d:' \ + #' %(message)s' + + dateFormat = '%Y-%m-%d %H:%M:%S' + logging.basicConfig( + stream=sys.stdout, level=level, + format=console_logFormat, + datefmt=dateFormat) + +def prepare_env(pkg_root): + + env = dict(os.environ) + env['LD_LIBRARY_PATH'] = ':'.join([pjoin(pkg_root, 'env/lib')]) + env['PATH'] = ':'.join([pjoin(pkg_root, 'env/bin'), + '/usr/bin:/bin']) + + LOG.debug("env['PATH'] = \n\t{}".format(env['PATH'].replace(':','\n\t'))) + LOG.debug("env['LD_LIBRARY_PATH'] = \n\t{}".format(env['LD_LIBRARY_PATH'].replace(':','\n\t'))) + + return env + +def run_fusion_matlab(geo_files, l1b_files, sounder_files, collo_files, **kwargs): + + bin_dir = kwargs['bin_dir'] + anc_paths = kwargs['anc_paths'] + fusion_binary = kwargs['fusion_binary'] + out_dir = kwargs['out_dir'] + matlab_file_glob = kwargs['matlab_file_glob'] + env = kwargs['env'] + + rc_fusion = 0 + + # run matlab + cmd = '{}/{} /opt/matlab/2015b/ {} {} {} {} {}'.format( + bin_dir, + fusion_binary, + ' '.join(geo_files), + ' '.join(l1b_files), + ' '.join(sounder_files), + ' '.join(collo_files), + ' '.join(anc_paths) + ) + #cmd = 'touch fusion_output.mat' # Dummy + #cmd = 'echo "No command"' # Dummy + + # Create the output directory + current_dir = os.getcwd() + create_dir(out_dir) + + # Run the Matlab Fusion code + LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t'))) + + rc_fusion = check_call([cmd], shell=True, env=env) + LOG.debug('check_call() return value: {}'.format(rc_fusion)) + + + # Move matlab file to the output directory + LOG.debug("glob('{}') = {}".format(matlab_file_glob, glob(matlab_file_glob))) + matlab_file = glob(matlab_file_glob) + if len(matlab_file) != 0: + matlab_file = matlab_file[0] + LOG.info('Found Matlab file "{}", moving to {}...'.format(matlab_file, out_dir)) + if exists(pjoin(out_dir, matlab_file)): + LOG.info('{} exists, removing...'.format(pjoin(out_dir, matlab_file))) + os.remove(pjoin(out_dir, matlab_file)) + shutil.move(matlab_file, out_dir) + matlab_file = glob(pjoin(out_dir, matlab_file))[0] + else: + LOG.error('There are no Matlab files "{}" to convert, aborting'.format(matlab_file_glob)) + rc_fusion = 1 + return rc_fusion, None + + return rc_fusion, matlab_file + +def convert_matlab_to_netcdf(matlab_file, l1b_file, **kwargs): + + bin_dir = kwargs['bin_dir'] + out_dir = kwargs['out_dir'] + satellite = kwargs['satellite'] + conversion_bin = kwargs['conversion_bin'] + env = kwargs['env'] + + rc_fusion = 0 + + dt = kwargs['input_dt'] + dt_create = datetime.utcnow() + + # Create the output directory + current_dir = os.getcwd() + + LOG.debug("l1b_file = {}".format(l1b_file)) + + # Copy the un-fused level-1b file to the work directory as a template... + if satellite=='snpp' or satellite=='jpss1': + esdt = satellite_esdt('V02FSN', satellite) + collection = int(basename(l1b_file).split('.')[3]) + fused_l1b_file_new = product_filename(esdt, collection, dt, created=dt_create) + if satellite=='aqua': + esdt = satellite_esdt('M02FSN', satellite) + collection = int(basename(l1b_file).split('.')[3]) + fused_l1b_file_new = product_filename(esdt, collection, dt, created=dt_create) + + unfused_l1b_file = pjoin(out_dir, 'unfused', os.path.basename(fused_l1b_file_new)) + unfused_l1b_dir = os.path.dirname(unfused_l1b_file) + create_dir(unfused_l1b_dir) + + if os.path.exists(unfused_l1b_file): + LOG.info('{} exists, removing...'.format(unfused_l1b_file)) + os.remove(unfused_l1b_file) + + LOG.info('Copying {} to {}'.format(l1b_file, unfused_l1b_file)) + shutil.copy(l1b_file, unfused_l1b_file) + + + # Removing the fused file if it exists + fused_l1b_file = pjoin(out_dir, os.path.basename(unfused_l1b_file)) + if os.path.exists(fused_l1b_file): + LOG.info('{} exists, removing...'.format(fused_l1b_file)) + os.remove(fused_l1b_file) + + # Convert the Matlab file to the desired format... + #cmd = 'cp {} {}'.format(unfused_l1b_file, out_dir) # Dummy + cmd = '{} {} {} {}'.format(conversion_bin, unfused_l1b_file, matlab_file, out_dir) + LOG.debug(cmd) + rc_fusion = check_call([cmd], shell=True, env=env) + + # Determine success... + fused_l1b_file = glob(fused_l1b_file) + if len(fused_l1b_file) != 0: + fused_l1b_file = fused_l1b_file[0] + else: + LOG.error('There is no fused file {}, aborting'.format(fused_l1b_file)) + rc_fusion = 1 + return rc_fusion, None + + # Remove the unfused dir... + LOG.info('Removing the unfused level-1b dir {} ...'.format(unfused_l1b_dir)) + shutil.rmtree(unfused_l1b_dir) + + return rc_fusion, fused_l1b_file + +def main(args): + + pkg_root = args[0] + inst_pair = 'modis_airs' if int(args[1])==1 else 'cris_viirs' + in_dir = args[2] + out_dir = args[3] + + LOG.info('Package root = {}'.format(pkg_root)) + LOG.info('Instrument pair = {}'.format(inst_pair)) + LOG.info('Input dir = {}'.format(in_dir)) + LOG.info('Output dir = {}'.format(out_dir)) + + py_env_dir = pjoin(pkg_root, 'env') + bin_dir = pjoin(pkg_root, 'bin') + anc_dir = pjoin(pkg_root, 'luts') + + LOG.debug('py_env_dir = {}'.format(py_env_dir)) + + env = prepare_env(pkg_root) + + kwargs = {} + kwargs['bin_dir'] = bin_dir + kwargs['env'] = env + kwargs['out_dir'] = out_dir + + # Creating the matlab fusion file... + + if inst_pair == 'modis_airs': + + imager = 'modis' + sounder = 'airs' + + geo_files = sorted(glob(pjoin(in_dir, 'MYD03.*.hdf'))) #imager geolocation file + l1b_files = sorted(glob(pjoin(in_dir, 'MYD021KM.*.hdf'))) #imager L1B file + + # sounder files (as many as needed to cover the imager file): + sounder_files = sorted(glob(pjoin(in_dir, 'AIRS.*.hdf'))) + + # imager/sounder collocation files (one per sounder file): + collo_files = sorted(glob(pjoin(in_dir, 'colloc*airs*modis*.nc'))) + + kwargs['input_dt'] = [ + datetime.strptime('.'.join(basename(geo_file).split('.')[1:3]), 'A%Y%j.%H%M') for geo_file in geo_files] + + # Various lut files and directories: + kwargs['anc_paths'] = [pjoin(anc_dir, 'L2.chan_prop.2005.03.01.v9.5.1.txt'), + pjoin(anc_dir, 'modis_aqua.srf.nc'), + pjoin(anc_dir, 'modis_conv_error_2005.mat')] + + LOG.info('geo_files: \n\t{}'.format(' \n\t'.join(geo_files))) + LOG.info('l1b_files: \n\t{}'.format(' \n\t'.join(l1b_files))) + LOG.info('sounder_files: \n\t{}'.format(' \n\t'.join(sounder_files))) + LOG.info('collo_files: \n\t{}'.format(' \n\t'.join(collo_files))) + LOG.info('input_dt: \n\t{}'.format(' \n\t'.join([str(dt) for dt in kwargs['input_dt']]))) + + kwargs['satellite'] = {'MOD':'terra', 'MYD':'aqua'}[basename(geo_files[0]).split('.')[0][:3]] + kwargs['fusion_binary'] = 'run_imagersounderfusion_M.sh' + kwargs['matlab_file_glob'] = 'fusion_output.mat' + kwargs['conversion_bin'] = pjoin(py_env_dir, 'bin', 'l1b-fusion-modis-airs') + + # The l1b file for which we are making the fusion file... + l1b_file = l1b_files[0] + kwargs['input_dt'] = kwargs['input_dt'][0] + + elif inst_pair == 'cris_viirs': + + imager = 'viirs' + sounder = 'cris' + + geo_files = sorted(glob(pjoin(in_dir, 'VNP03MOD.*.nc'))) #imager geolocation file + l1b_files = sorted(glob(pjoin(in_dir, 'VNP02MOD.*.nc'))) #imager L1B file + + # sounder files (as many as needed to cover the imager file): + sounder_files = sorted(glob(pjoin(in_dir, 'SNDR.*.CRIS.*.nc'))) + + # imager/sounder collocation files (one per sounder file): + collo_files = sorted(glob(pjoin(in_dir, 'colloc*cris*viirs*.nc'))) + + #kwargs['input_dt'] = datetime.strptime('.'.join(basename(geo_file).split('.')[1:3]), 'A%Y%j.%H%M') + kwargs['input_dt'] = [ + datetime.strptime('.'.join(basename(geo_file).split('.')[1:3]), 'A%Y%j.%H%M') for geo_file in geo_files] + + # Process the 1st granule in the original list, requirering +-1 context granules. + gran_idx = 1 + geo_files = geo_files[gran_idx-1:gran_idx+2] + l1b_files = l1b_files[gran_idx-1:gran_idx+2] + sounder_files = sounder_files[gran_idx-1:gran_idx+2] + collo_files = collo_files[gran_idx-1:gran_idx+2] + kwargs['input_dt'] = kwargs['input_dt'][gran_idx-1:gran_idx+2] + + LOG.info('geo_files: \n\t{}'.format(' \n\t'.join(geo_files))) + LOG.info('l1b_files: \n\t{}'.format(' \n\t'.join(l1b_files))) + LOG.info('sounder_files: \n\t{}'.format(' \n\t'.join(sounder_files))) + LOG.info('collo_files: \n\t{}'.format(' \n\t'.join(collo_files))) + LOG.info('input_dt: \n\t{}'.format(' \n\t'.join([str(dt) for dt in kwargs['input_dt']]))) + + # Various lut files and directories: + kwargs['anc_paths'] = [pjoin(anc_dir, 'modis_aqua.srf.nc'), + pjoin(anc_dir, 'NG_VIIRS_NPP_RSR_filtered_Oct2011_BA/')] + + kwargs['satellite'] = {'VNP':'snpp', 'VJ1':'jpss1'}[basename(geo_files[0]).split('.')[0][:3]] + kwargs['fusion_binary'] = 'run_imagersounderfusion_V.sh' + kwargs['matlab_file_glob'] = 'fusion_output.mat' + kwargs['conversion_bin'] = pjoin(py_env_dir, 'bin', 'l1b-fusion-viirs-cris') + + # The l1b file for which we are making the fusion file... + gran_idx = 1 + l1b_file = l1b_files[gran_idx] + kwargs['input_dt'] = kwargs['input_dt'][gran_idx] + LOG.debug('l1b_file = {}'.format(l1b_file)) + LOG.debug('input_dt = {}'.format(kwargs['input_dt'])) + + rc_fusion, matlab_file = run_fusion_matlab(geo_files, l1b_files, sounder_files, collo_files, **kwargs) + + # Creating the matlab fusion file failed, exiting... + if rc_fusion != 0: + return rc_fusion + + LOG.debug('run_fusion_matlab() return value: {}'.format(rc_fusion)) + LOG.debug('run_fusion_matlab() generated {}...'.format(matlab_file)) + + # Now that we've computed the Matlab file, convert to a NetCDF file... + rc_fusion, fused_l1b_file = convert_matlab_to_netcdf(matlab_file, l1b_file, **kwargs) + + # Converting the Matlab fusion file to NetCDF failed, exiting... + if rc_fusion != 0: + return rc_fusion + + LOG.debug('convert_matlab_to_netcdf() generated {}...'.format(fused_l1b_file)) + + LOG.debug('python return value = {}'.format(rc_fusion)) + return rc_fusion + + + +if __name__ == '__main__': + args = sys.argv[1:] + + usage = \ + """ + Usage: 'run_fusion_1gran.sh PACKAGE_ROOT INSTRUMENT_PAIR INDIR OUTDIR' + where: + PKG_ROOT : Top level package dir (e.g.: 'dist/' + INSTRUMENT_PAIR : 1 (for MODIS/AIRS) or 2 (for VIIRS/CrIS) " + """ + + if len(args) != 4: + print(usage) + + verbosity = 3 + setup_logging(verbosity) + + sys.exit(main(args)) diff --git a/source/flo/run_fusion_global_ql.py b/source/flo/run_fusion_global_ql.py new file mode 100644 index 0000000000000000000000000000000000000000..7d7020a0c215ef2fec8af9c40b555687a27a0fd7 --- /dev/null +++ b/source/flo/run_fusion_global_ql.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +Script to global quicklook images from CrIS/VIIRS Fusion product files. + +Created by Geoff Cureton <geoff.cureton@ssec.wisc.edu> on 2018-06-18. +Copyright (c) 2018 University of Wisconsin Regents. +Licensed under GNU GPLv3. +""" + +import os +from os.path import basename, dirname, curdir, abspath, isdir, isfile, exists, splitext, join as pjoin +import sys +import shutil +import logging +import traceback +import time +from glob import glob +from subprocess import call, check_call +from datetime import datetime + +from utils import execution_time, create_dir, satellite_esdt, product_filename + +# every module should have a LOG object +LOG = logging.getLogger(__file__) + +def setup_logging(verbosity): + LOG.debug("Verbosity is {}".format(verbosity)) + + levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] + level = levels[verbosity if verbosity < 4 else 3] + + # Set up the logging + #console_logFormat = '%(asctime)s : %(name)-12s: %(levelname)-8s %(message)s' + #console_logFormat = '%(levelname)s:%(name)s:%(msg)s') # [%(filename)s:%(lineno)d]' + #console_logFormat = '%(asctime)s : %(funcName)s:%(lineno)d: %(message)s' + #console_logFormat = '%(message)s' + console_logFormat = '(%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d: %(message)s' + #console_logFormat = '%(asctime)s : (%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d:' \ + #' %(message)s' + + dateFormat = '%Y-%m-%d %H:%M:%S' + logging.basicConfig( + stream=sys.stdout, level=level, + format=console_logFormat, + datefmt=dateFormat) + +def prepare_env(pkg_root): + + env = dict(os.environ) + env['LD_LIBRARY_PATH'] = ':'.join([pjoin(pkg_root, 'env/lib')]) + env['PATH'] = ':'.join([pjoin(pkg_root, 'env/bin'), + '/usr/bin:/bin']) + + LOG.debug("env['PATH'] = \n\t{}".format(env['PATH'].replace(':','\n\t'))) + LOG.debug("env['LD_LIBRARY_PATH'] = \n\t{}".format(env['LD_LIBRARY_PATH'].replace(':','\n\t'))) + + return env + +def run_fusion_quicklooks(fusion_dir, geo_dir, **kwargs): + + bin_dir = kwargs['bin_dir'] + out_dir = kwargs['out_dir'] + fusion_ql_binary = kwargs['fusion_ql_binary'] + satellite = kwargs['satellite'] + env = kwargs['env'] + + fusion_prefix = {'snpp':'VNP', 'jpss1':'VJ1'}[satellite] + fusion_files = sorted(glob(pjoin(fusion_dir,'{}02FSN*.nc'.format(fusion_prefix)))) + geo_files = sorted(glob(pjoin(geo_dir,'{}03MOD*.nc'.format(fusion_prefix)))) + + LOG.info('fusion_prefix: {}'.format(fusion_prefix)) + LOG.info('fusion_glob: {}'.format(pjoin(fusion_dir,'{}02FSN*.nc'.format(fusion_prefix)))) + LOG.info('fusion_files: {}'.format(fusion_files)) + LOG.info('geo_files: \n\t{}'.format(' \n\t'.join(geo_files))) + LOG.info('fusion_files: \n\t{}'.format(' \n\t'.join(fusion_files))) + + dt = datetime.strptime(fusion_files[0].split('.')[1],'A%Y%j') + year = dt.utctimetuple().tm_year + jday = dt.utctimetuple().tm_yday + + # Create the output directory, and change to it. + current_dir = os.getcwd() + create_dir(out_dir) + os.chdir(out_dir) + + rc_fusion_ql = 0 + + #run matlab + cmd = '{}/{} /opt/matlab/2015b/ {} {} {}/ {}/ >> fusion_quicklooks.log'.format( + bin_dir, + fusion_ql_binary, + year, + jday, + fusion_dir, + geo_dir + ) + + # Run the Matlab Fusion code + LOG.debug("cmd = \\\n\t{}".format(cmd.replace(' ',' \\\n\t'))) + rc_fusion_ql = check_call([cmd], shell=True, env=env) + LOG.debug('check_call() return value: {}'.format(rc_fusion_ql)) + + # Move matlab file to the output directory + fusion_ql_files = glob('*.png') + if len(fusion_ql_files) != 0: + LOG.info('Found Fusion quicklook files {}.'.format( + ', '.join([basename(x) for x in fusion_ql_files]) + )) + else: + LOG.error('There are no Fusion quicklook files "*.png", aborting') + rc_fusion_ql = 1 + return rc_fusion_ql + + os.chdir(current_dir) + + return rc_fusion_ql, fusion_ql_files + +def main(args): + + pkg_root = abspath(args[0]) + fusion_dir = abspath(args[1]) + geo_dir = abspath(args[2]) + out_dir = abspath(args[3]) + satellite = args[4] + + LOG.info('Package root = {}'.format(pkg_root)) + LOG.info('Fusion dir = {}'.format(fusion_dir)) + LOG.info('Geo dir = {}'.format(geo_dir)) + LOG.info('Output dir = {}'.format(out_dir)) + LOG.info('satellite = {}'.format(satellite)) + + py_env_dir = pjoin(pkg_root, 'env') + bin_dir = pjoin(pkg_root, 'bin') + #work_dir = os.path.abspath(os.path.curdir) + + LOG.debug('py_env_dir = {}'.format(py_env_dir)) + + env = prepare_env(pkg_root) + + kwargs = {} + kwargs['bin_dir'] = bin_dir + kwargs['out_dir'] = out_dir + kwargs['env'] = env + + # Creating the fusion quicklooks... + + + kwargs['satellite'] = satellite + kwargs['fusion_ql_binary'] = 'run_plot_globalVIIRSfusion_fct.sh' + + rc_fusion_ql, fusion_ql_files = run_fusion_quicklooks(fusion_dir, geo_dir, **kwargs) + + # Creating the matlab fusion file failed, exiting... + if rc_fusion_ql != 0: + return rc_fusion_ql + + LOG.debug('run_fusion_quicklooks() return value: {}'.format(rc_fusion_ql)) + LOG.debug('run_fusion_quicklooks() generated {}...'.format(fusion_ql_files)) + + LOG.debug('python return value = {}'.format(rc_fusion_ql)) + return rc_fusion_ql + + + +if __name__ == '__main__': + args = sys.argv[1:] + + usage = \ + """ + Usage: 'run_fusion_1gran.sh PACKAGE_ROOT FUSION_DIR GEO_DIR OUTPUT_DIR SATELLITE' + where: + PKG_ROOT : Top level package dir (e.g.: 'dist/' + FUSION_DIR : Directory containing V02FSN files" + GEO_DIR : Directory containing V03MOD files matching the files in FUSION_DIR" + OUTPUT_DIR : Directory where fusion quicklooks are generated" + SATELLITE : The satellite providing the input data (snpp or jpss1)" + """ + + if len(args) != 5: + print(usage) + + verbosity = 3 + setup_logging(verbosity) + + sys.exit(main(args)) diff --git a/source/flo/utils.py b/source/flo/utils.py index 7ee01d7f68ff6bd6298aa1a04957c20088e76a3e..52f89edfc69bc2f77146ce83547881e8d6ed1be4 100644 --- a/source/flo/utils.py +++ b/source/flo/utils.py @@ -28,7 +28,7 @@ from datetime import datetime LOG = logging.getLogger(__name__) def setup_logging(verbosity): - LOG.debug("Verbosity is {}".format(verbosity)) + print("Verbosity is {}".format(verbosity)) levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] level = levels[verbosity if verbosity < 4 else 3] @@ -38,9 +38,13 @@ def setup_logging(verbosity): #console_logFormat = '%(levelname)s:%(name)s:%(msg)s') # [%(filename)s:%(lineno)d]' #console_logFormat = '%(asctime)s : %(funcName)s:%(lineno)d: %(message)s' #console_logFormat = '%(message)s' - console_logFormat = '(%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d: %(message)s' + #console_logFormat = '(%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d: %(message)s' #console_logFormat = '%(asctime)s : (%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d:' \ #' %(message)s' + if level == logging.DEBUG: + console_logFormat = '(%(levelname)s):%(filename)s:%(funcName)s:%(lineno)d: %(message)s' + else: + console_logFormat = '(%(levelname)s): %(message)s' dateFormat = '%Y-%m-%d %H:%M:%S' logging.basicConfig(