Skip to content
Snippets Groups Projects
Commit d5a99bf7 authored by Geoff Cureton's avatar Geoff Cureton
Browse files

Updated Aqua build_task() methods to better handle and report missing data and bad db entries.

parent 03810364
No related branches found
No related tags found
No related merge requests found
......@@ -54,15 +54,16 @@ from glob import glob
from subprocess import check_output, check_call, CalledProcessError
from flo.computation import Computation
from flo.builder import WorkflowNotReady
from timeutil import TimeInterval, datetime, timedelta
from flo.util import symlink_inputs_to_working_dir
from flo.sw.viirsmend import ViirsMend
from flo.sw.collocation import CrisViirsCollocation
#from flo.sw.viirsmend import ViirsMend
#from flo.sw.collocation import CrisViirsCollocation
from glutil.software import delivered_software, support_software, runscript
from glutil.nc import nc_compress
from glutil.hdf import hdf_compress
#from glutil.nc import nc_compress
#from glutil.hdf import hdf_compress
from glutil.catalogs import dawg_catalog
from orbnav_client import Client
......@@ -78,7 +79,7 @@ class FUSION_MATLAB(Computation):
outputs = ['fused_l1b']
def find_contexts_gran_length(self, time_interval, satellite, delivery_id):
def find_contexts(self, time_interval, satellite, delivery_id):
'''
Here we assume that the granule boundaries fall along 6-minute (snpp) or 5-minute (aqua)
increments, starting at the top of the hour:
......@@ -97,109 +98,59 @@ class FUSION_MATLAB(Computation):
return [{'satellite': satellite, 'version': delivery_id, 'granule': g}
for g in time_interval.contained_series(granule_length)]
def find_contexts_orbnav(self, time_interval, satellite, delivery_id):
'''
Return the desired contexts falling within the given time interval.
time_interval: TimeInterval(datetime_1, datetime_2)
granule_length: timedelta(seconds=360)
'''
LOG.info("Running find_contexts()...")
#
# Here we assume that the granule boundaries fall along 6-minute
# increments, starting at the top of the hour, so that there would be
# 10 granules per hour at minutes:
# [0., 6., 12., 18., 24., 30., 36., 42., 48., 54.]
#
if satellite=='snpp':
granule_length = timedelta(minutes=6)
elif satellite=='aqua':
granule_length = timedelta(minutes=5)
else:
return []
# Generating List of all possible granules
allGranuals = {
g.left for g in time_interval.overlapping_interval_series(granule_length)}
# Not all contexts are valid for and aerosol product: we only want the
# day granules. This corresponds to a solar zenith angle < 85 degrees,
# in this case. We will use OrbNav to determine whether a context
# granule has any pixels in daylight.
# Setting up OrbNav Client
orbnavSatName = {'aqua': 'AQUA', 'snpp': 'SUOMI NPP'}
orbnavClient = Client()
try:
# Get the datetimes when the satellite enters and/or leaves a
# circle where the solar zenith angle is LESS than 85 degrees.
orbnavOutput = orbnavClient.suncirc(sat=orbnavSatName[satellite],
start=time_interval.left,
end=time_interval.right,
zenangl='85'
)['data']
# For each datetime when the satellite crosses the "terminator"
# defined by solZenAngle=85 degrees, compute a 6-minute time
# interval centered on the crossing. Any 6-minute granule which
# has a start time falling in this interval will have daytime
# pixels.
half_gran_delta = timedelta(seconds=0.5 * granule_length.seconds)
dayTimeIntervals = [
TimeInterval(
terminator_intersect_time[0][0] - half_gran_delta,
terminator_intersect_time[1][0] + half_gran_delta,
right_open=True)
for terminator_intersect_time in orbnavOutput
]
# Generating list of OrbNav selected granules
orbnavGranules = set()
for interval in dayTimeIntervals:
orbnavGranules = orbnavGranules | {
g.left for g in interval.overlapping_interval_series(granule_length)}
# Finding Overlapping Granuals
overlappingGranules = sorted(orbnavGranules & allGranuals)
except:
overlappingGranules = allGranuals
return [{'satellite': satellite, 'version': delivery_id, 'granule': g}
for g in overlappingGranules]
def find_contexts(self, time_interval, satellite, delivery_id):
return self.find_contexts_gran_length(time_interval, satellite, delivery_id)
def _add_modis_l1b_geo_input(self, context, task):
satellite = context['satellite']
granule = context['granule']
myd03 = dawg_catalog.file('aqua','MOD03', granule, version='c6')
task.input('geo', myd03)
granule_length = timedelta(minutes=5)
myd03_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
myd03 = dawg_catalog.files('aqua','MOD03', myd03_interval, version='c6')
if myd03 == []:
raise WorkflowNotReady('Unable to find matching MYD03 granule for interval {}'.format(myd03_interval))
myd03_file = myd03[0]
LOG.debug('MYD03 granule path: {}'.format(myd03_file.path))
if not exists(myd03_file.path):
raise WorkflowNotReady('Nominally valid MYD03 path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd03_file.path, granule, myd03_interval))
task.input('geo', myd03_file)
def _add_modis_l1b_m_input(self, context, task):
satellite = context['satellite']
granule = context['granule']
myd021km = dawg_catalog.file('aqua','MOD021KM', granule, version='c6')
task.input('l1b', myd021km)
granule_length = timedelta(minutes=5)
myd021km_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
myd021km = dawg_catalog.files('aqua','MOD021KM', myd021km_interval, version='c6')
if myd021km == []:
raise WorkflowNotReady('Unable to find matching MYD021KM granule for interval {}'.format(myd021km_interval))
myd021km_file = myd021km[0]
LOG.debug('MYD021KM granule path: {}'.format(myd021km_file.path))
if not exists(myd021km_file.path):
raise WorkflowNotReady('Nominally valid MYD021KM path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd021km_file.path, granule, myd021km_interval))
task.input('l1b', myd021km_file)
def _add_airs_l1b_input(self, context, task):
satellite = context['satellite']
granule = context['granule']
granule_length = timedelta(minutes=5)
myd03 = dawg_catalog.file('aqua','MOD03', granule, version='c6')
LOG.debug('MODIS granule: {} -> {}'.format(myd03.begin_time, myd03.end_time))
myd03_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
myd03 = dawg_catalog.files('aqua','MOD03', myd03_interval, version='c6')
if myd03 == []:
raise WorkflowNotReady('Unable to find matching MYD03 granule for interval {}'.format(myd03_interval))
myd03_file = myd03[0]
LOG.debug('MYD03 granule path: {}'.format(myd03_file.path))
if not exists(myd03_file.path):
raise WorkflowNotReady('Nominally valid MYD03 path {} for granule {} or interval {} does not exist, possible DB corruption'.format(myd03_file.path, granule, myd03_interval))
buf = 0 # seconds
airs_begin = myd03.begin_time - timedelta(seconds=buf)
airs_end = myd03.end_time + timedelta(seconds=buf)
airs_begin = myd03_file.begin_time - timedelta(seconds=buf)
airs_end = myd03_file.end_time + timedelta(seconds=buf)
airs_interval = TimeInterval(airs_begin, airs_end)
airs = dawg_catalog.files('aqua', 'AIRIBRAD', airs_interval)
if airs == []:
raise ValueError('Unable to find matching AIRS granules.')
raise WorkflowNotReady('Unable to find matching AIRS granules for interval {}'.format(airs_interval))
for idx, airs_file in enumerate(airs):
LOG.debug('AIRS granule {}: {} -> {}'.format(idx, airs_file.begin_time, airs_file.end_time))
......@@ -233,8 +184,9 @@ class FUSION_MATLAB(Computation):
granule = context['granule']
granule_length = timedelta(minutes=6)
cris_interval = TimeInterval(granule, granule+granule_length-timedelta(seconds=1))
#cris = dawg_catalog.file(satellite, 'CL1B', granule, version='v1.0rc8')
cris = dawg_catalog.files(satellite, 'CL1B', cris_interval, version='v1.0rc8')
if cris == []:
raise WorkflowNotReady('Unable to find matching CrIS granules for interval {}'.format(cris_interval))
for idx, cris_file in enumerate(cris):
LOG.debug('CrIS granule {}: {} -> {}'.format(idx, cris_file.begin_time, cris_file.end_time))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment