Skip to content
Snippets Groups Projects
Commit 400d9dc2 authored by Greg Quinn's avatar Greg Quinn
Browse files

Checking in the "grouper" line of development

parent 450138d0
No related branches found
Tags v1-master
No related merge requests found
import numpy
import os
import sys
from pyhdf.SD import SD, SDC
from util import HdfWriter
grouper_definitions = {
'AIRS_BT_Hist': (Histogram, 200.0, 350.0, 150),
'MODIS_BT_Hist': (Histogram, 200.0, 350.0, 150),
'BT_Diff_Hist': (Histogram, -10.0, 10.0, 100),
'AIRS_BT_By_Location': (Stats,
(-90.0, 90.0),
(-180.0, 180.0),
(180, 360)),
'BT_Diff_By_Location': (Stats,
(-90.0, 90.0),
(-180.0, 180.0),
(180, 360)),
'BT_Diff_By_AIRS_BT': (Stats, 200.0, 350.0, 150),
'BT_Diff_By_Scan_Angle': (Stats, -55.0, 55.0, 110),
'BT_Diff_By_Solar_Zenith': (Stats, 0.0, 180.0, 180)
}
# helper functions to allow the classes below to be used with either
# one or two dimensions
def histogram_init_helper(num_bins, dtype):
if type(num_bins) == tuple:
return numpy.zeros(num_bins, dtype)
else:
return numpy.zeros((num_bins,), dtype)
def histogram_helper(arr, **kwargs):
if type(arr) == tuple:
return numpy.histogram2d(arr[0], arr[1], **kwargs)[0]
else:
return numpy.histogram(arr, **kwargs)[0]
# class for building up a histogram in parts
class Histogram:
def __init__(self, min_value, max_value, num_bins):
self.range = (min_value, max_value)
self.num_bins = num_bins
self.hist = histogram_init_helper(num_bins, numpy.int32)
def add(self, arr):
self.hist += histogram_helper(arr,
bins=self.num_bins,
range=self.range)
def get(self):
return self.hist
# class for taking statistics on some value (e.g., brightness
# temperature differences between AIRS and MODIS) in terms of some
# basis (e.g. Earth location or instrument scan angle). like
# Histogram, designed for building up these stats via successive
# calls to add()
class Stats:
def __init__(self, min_basis, max_basis, num_bins):
self.num_bins = num_bins
self.range = (min_basis, max_basis)
self.hist = Histogram(min_basis, max_basis, num_bins)
self.sum = histogram_init_helper(num_bins, numpy.float64)
self.squared_sum = histogram_init_helper(num_bins, numpy.float64)
def add(self, basis, values):
self.hist.add(basis)
self.sum += histogram_helper(basis,
bins=self.num_bins,
range=self.range,
weights=values)
squared_values = values * values
self.squared_sum += histogram_helper(basis,
bins=self.num_bins,
range=self.range,
weights=squared_values)
def get(self):
n = self.hist.get()
mean = self.sum / n
std = numpy.sqrt((self.squared_sum / n) - (mean * mean))
return n, mean, std
# helper class to save repetition for all the per-band data
# structures needed below
class PerBandHelper:
def __init__(self, ctor, *args):
self.data = [ctor(*args) for band in range(16)]
def __getitem__(self, key):
return self.data[key]
def get(self):
return self.data
# verify command line inputs
if len(sys.argv) != 3:
print 'usage: python %s <input_dir> <output_file>' % sys.argv[0]
sys.exit(1)
input_dir = sys.argv[1]
output_file = sys.argv[2]
# set up per-band histogram and stats objects
airs_bt_hist = PerBandHelper(Histogram, 200.0, 350.0, 150)
modis_bt_hist = PerBandHelper(Histogram, 200.0, 350.0, 150)
bt_diff_hist = PerBandHelper(Histogram, -10.0, 10.0, 100)
latlon_airs_stats = PerBandHelper(Stats,
(-90.0, 90.0),
(-180.0, 180.0),
(180, 360))
latlon_diff_stats = PerBandHelper(Stats,
(-90.0, 90.0),
(-180.0, 180.0),
(180, 360))
bt_diff_stats = PerBandHelper(Stats, 200.0, 350.0, 150)
scan_ang_diff_stats = PerBandHelper(Stats, -55.0, 55.0, 110)
sol_zen_diff_stats = PerBandHelper(Stats, 0.0, 180.0, 180)
# loop over each granule in the input directory
for gran in range(1, 241):
print 'Processing granule %d' % gran
# make sure all the needed files are present
airs_file = '%s/intercal.a2m.2008.12.27.%03d.hdf' % (input_dir, gran)
modis_file = '%s/intercal.m2a.2008.12.27.%03d.hdf' % (input_dir, gran)
meta_file = '%s/intercal.meta.2008.12.27.%03d.hdf' % (input_dir, gran)
if not (os.path.exists(airs_file) and
os.path.exists(modis_file) and
os.path.exists(meta_file)):
print ' Granule %d missing' % gran
continue
# grab AIRS data
airs_sd = SD(airs_file)
airs_bt = airs_sd.select('AIRS_Brightness_Temp').get()
airs_flags = airs_sd.select('AIRS_Flags').get()
# MODIS data
modis_sd = SD(modis_file)
modis_bt = modis_sd.select('MODIS_Brightness_Temp').get()
modis_flags = modis_sd.select('MODIS_Flags').get()
# "meta" data
meta_sd = SD(meta_file)
lats = meta_sd.select('Latitude').get()
lons = meta_sd.select('Longitude').get()
scan_ang = meta_sd.select('Scan_Angle').get()
sol_zen = meta_sd.select('Solar_Zenith').get()
# AIRS - MODIS BTs
bt_diff = airs_bt - modis_bt
# we'll mask off all pixels where the data is suspect for either
# instrument
# FIXME: should nan BTs be discarded or zeroed?
mask = numpy.logical_and(airs_flags == 0, modis_flags == 0)
mask = numpy.logical_and(mask, numpy.isnan(bt_diff) == False)
# now update our per-band statistics
for band in range(16):
# set up some shorthands for convenience and to avoid
# duplicate mask applications
m = mask[band]
abt = airs_bt[band][m]
mbt = modis_bt[band][m]
btd = bt_diff[band][m]
la = lats[m]
lo = lons[m]
sa = scan_ang[m]
sz = sol_zen[m]
# do the actual updates
airs_bt_hist[band].add(abt)
modis_bt_hist[band].add(mbt)
bt_diff_hist[band].add(btd)
latlon_airs_stats[band].add((la, lo), abt)
latlon_diff_stats[band].add((la, lo), btd)
bt_diff_stats[band].add(abt, btd)
scan_ang_diff_stats[band].add(sa, btd)
sol_zen_diff_stats[band].add(sz, btd)
# output results to HDF
# FIXME: handle the limits more coherently
writer = HdfWriter(output_file)
writer.write_seq('AIRS_BT_Hist',
[o.get() for o in airs_bt_hist.get()],
min_bucket=200.0, max_bucket=350.0)
writer.write_seq('MODIS_BT_Hist',
[o.get() for o in modis_bt_hist.get()],
min_bucket=200.0, max_bucket=350.0)
writer.write_seq('BT_Diff_Hist',
[o.get() for o in bt_diff_hist.get()],
min_bucket=-10.0, max_bucket=10.0)
writer.write_seq('AIRS_BT_By_Location',
[o.get()[1] for o in latlon_airs_stats.get()])
writer.write_seq('BT_Diff_By_Location',
[o.get()[1] for o in latlon_diff_stats.get()])
writer.write_seq('BT_Diff_By_AIRS_BT',
[o.get()[1] for o in bt_diff_stats.get()],
min_bucket=200.0, max_bucket=350.0)
writer.write_seq('BT_Diff_By_Scan_Angle',
[o.get()[1] for o in scan_ang_diff_stats.get()],
min_bucket=-55.0, max_bucket=55.0)
writer.write_seq('BT_Diff_By_Solar_Zenith',
[o.get()[1] for o in sol_zen_diff_stats.get()],
min_bucket=0.0, max_bucket=180.0)
...@@ -123,9 +123,9 @@ for gran in range(1, 241): ...@@ -123,9 +123,9 @@ for gran in range(1, 241):
print 'Processing granule %d' % gran print 'Processing granule %d' % gran
# make sure all the needed files are present # make sure all the needed files are present
airs_file = '%s/a2m_%03d.hdf' % (input_dir, gran) airs_file = '%s/intercal.a2m.2008.12.31.%03d.hdf' % (input_dir, gran)
modis_file = '%s/m2a_%03d.hdf' % (input_dir, gran) modis_file = '%s/intercal.m2a.2008.12.31.%03d.hdf' % (input_dir, gran)
meta_file = '%s/meta_%03d.hdf' % (input_dir, gran) meta_file = '%s/intercal.meta.2008.12.31.%03d.hdf' % (input_dir, gran)
if not (os.path.exists(airs_file) and if not (os.path.exists(airs_file) and
os.path.exists(modis_file) and os.path.exists(modis_file) and
os.path.exists(meta_file)): os.path.exists(meta_file)):
......
import glob
import sys
# helper functions to allow the classes below to be used with either
# one or two dimensions
def histogram_init_helper(num_bins, dtype):
if type(num_bins) == tuple:
return numpy.zeros(num_bins, dtype)
else:
return numpy.zeros((num_bins,), dtype)
def histogram_helper(arr, **kwargs):
if type(arr) == tuple:
return numpy.histogram2d(arr[0], arr[1], **kwargs)[0]
else:
return numpy.histogram(arr, **kwargs)[0]
# class for building up a histogram in parts
class Histogram:
def __init__(self, min_value, max_value, num_bins):
self.range = (min_value, max_value)
self.num_bins = num_bins
self.hist = histogram_init_helper(num_bins, numpy.int32)
def add(self, arr):
self.hist += histogram_helper(arr,
bins=self.num_bins,
range=self.range)
def get(self):
return self.hist
# class for taking statistics on some value (e.g., brightness
# temperature differences between AIRS and MODIS) in terms of some
# basis (e.g. Earth location or instrument scan angle). like
# Histogram, designed for building up these stats via successive
# calls to add()
class Stats:
def __init__(self, min_basis, max_basis, num_bins):
self.num_bins = num_bins
self.range = (min_basis, max_basis)
self.hist = Histogram(min_basis, max_basis, num_bins)
self.sum = histogram_init_helper(num_bins, numpy.float64)
self.squared_sum = histogram_init_helper(num_bins, numpy.float64)
def add(self, basis, values):
self.hist.add(basis)
self.sum += histogram_helper(basis,
bins=self.num_bins,
range=self.range,
weights=values)
squared_values = values * values
self.squared_sum += histogram_helper(basis,
bins=self.num_bins,
range=self.range,
weights=squared_values)
def get(self):
n = self.hist.get()
mean = self.sum / n
std = numpy.sqrt((self.squared_sum / n) - (mean * mean))
return n, mean, std
# helper class to save repetition for all the per-band data
# structures needed below
class PerBandHelper:
def __init__(self, ctor, *args):
self.data = [ctor(*args) for band in range(16)]
def __getitem__(self, key):
return self.data[key]
def get(self):
return self.data
grouper_definitions = {
'AIRS_BT_Hist': (Histogram, 200.0, 350.0, 150),
'MODIS_BT_Hist': (Histogram, 200.0, 350.0, 150),
'BT_Diff_Hist': (Histogram, -10.0, 10.0, 100),
'AIRS_BT_By_Location': (Stats,
(-90.0, 90.0),
(-180.0, 180.0),
(180, 360)),
'BT_Diff_By_Location': (Stats,
(-90.0, 90.0),
(-180.0, 180.0),
(180, 360)),
'BT_Diff_By_AIRS_BT': (Stats, 200.0, 350.0, 150),
'BT_Diff_By_Scan_Angle': (Stats, -55.0, 55.0, 110),
'BT_Diff_By_Solar_Zenith': (Stats, 0.0, 180.0, 180)
}
# check command line
if len(sys.argv) < 5:
print 'usage: python %s <year> <month> <day> <grouper> ...' % sys.argv[0]
sys.exit(1)
year = int(sys.argv[1])
month = int(sys.argv[2])
day = int(sys.argv[3])
grouper_names = sys.argv[4:]
# loop over each granule in the input directory
files = glob.glob('intercal.*.%04d.%02d.%02d.*.hdf' % (year, month, day))
files.sort()
for gran in range(1, 241):
...@@ -16,4 +16,4 @@ log = log.$(Cluster).$(Process) ...@@ -16,4 +16,4 @@ log = log.$(Cluster).$(Process)
notification = never notification = never
queue 200 queue
...@@ -4,13 +4,24 @@ ...@@ -4,13 +4,24 @@
import pgdb import pgdb
import time import time
# helper for connecting to the database
def connect():
return pgdb.connect(database='intercal')
# helper for getting the current UTC time in a PostgreSQL-friendly
# format
def get_time():
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
# claim a job for forward processing. claiming a job just sets its # claim a job for forward processing. claiming a job just sets its
# status as 'running' in the DB, but it is done in such a way as to # status as 'running' in the DB, but it is done in such a way as to
# ensure that only one process can claim a given job. returns an # ensure that only one process can claim a given job. returns an
# (airs_date, airs_chunk) pair. the processing system name ('funnel' # (airs_date, airs_chunk) pair. the processing system name ('funnel'
# or 'peate' for starters) and an identifier within that system are # or 'peate' for starters) and an identifier within that system are
# input parameters # input parameters
def get_forward_job(system, execution_id): def get_forward_job(system, system_id):
# connect to DB # connect to DB
con = connect() con = connect()
...@@ -24,23 +35,23 @@ def get_forward_job(system, execution_id): ...@@ -24,23 +35,23 @@ def get_forward_job(system, execution_id):
# find the most recent 'ready' job # find the most recent 'ready' job
cur.execute("select airs_date, airs_chunk " + cur.execute("select airs_date, airs_chunk " +
"from jobs " + "from intercal_jobs " +
"where status = 'ready' " + "where status = 'ready' " +
"order by airs_date desc, airs_chunk desc " + "order by airs_date desc, airs_chunk desc " +
"limit 1") "limit 1")
airs_date, airs_chunk = cur.fetchone() airs_date, airs_chunk = cur.fetchone()
# try and claim it # try and claim it
if get_job(airs_date, airs_chunk, system, execution_id, con): if get_intercal_job(airs_date, airs_chunk, system, system_id, con):
return airs_date, airs_chunk return airs_date, airs_chunk
# again, this really shouldn't happen # again, this really shouldn't happen
raise Exception('could not get a job to execute') raise Exception('could not get a job to execute')
# claim the job with the given data and chunk number. system and # claim the job with the given data and chunk number. system and
# execution_id are the same as for get_forward_job(). con can be # system_id are the same as for get_forward_job(). con can be
# set if a DB connection is already established # set if a DB connection is already established
def get_job(airs_date, airs_chunk, system, execution_id, con=None): def get_intercal_job(airs_date, airs_chunk, system, system_id, con=None):
# connect to the DB if not already # connect to the DB if not already
if con == None: if con == None:
...@@ -54,7 +65,7 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): ...@@ -54,7 +65,7 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
# no longer satisfied). if the other transaction is rolled back, # no longer satisfied). if the other transaction is rolled back,
# then our transaction will update one row # then our transaction will update one row
cur = con.cursor() cur = con.cursor()
cur.execute("update jobs set status = 'running' " + cur.execute("update intercal_jobs set status = 'running' " +
"where airs_date = '%s' and " % airs_date + "where airs_date = '%s' and " % airs_date +
"airs_chunk = %s and " % airs_chunk + "airs_chunk = %s and " % airs_chunk +
"status = 'ready'") "status = 'ready'")
...@@ -64,11 +75,11 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): ...@@ -64,11 +75,11 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
# we got the claim. now add a row to the executions table to # we got the claim. now add a row to the executions table to
# document our process # document our process
start_time = get_time() start_time = get_time()
cur.execute("insert into executions (airs_date, airs_chunk, " + cur.execute("insert into intercal_executions (airs_date, airs_chunk, " +
"system, id, " + "system, id, " +
"start_time) " + "start_time) " +
"values ('%s', %s, " % (airs_date, airs_chunk) + "values ('%s', %s, " % (airs_date, airs_chunk) +
"'%s', '%s'," % (system, execution_id) + "'%s', '%s'," % (system, system_id) +
"'%s')" % start_time) "'%s')" % start_time)
# don't forgot to commit! # don't forgot to commit!
...@@ -76,29 +87,18 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): ...@@ -76,29 +87,18 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
return True return True
# for marking a job as successfuly completed # for marking a job as successfuly completed
def job_done(airs_date, airs_chunk): def intercal_done(airs_date, airs_chunk):
job_done_or_failed(airs_date, airs_chunk, 'done') intercal_done_or_failed(airs_date, airs_chunk, 'done')
# for marking a job as failed # for marking a job as failed
def job_failed(airs_date, airs_chunk): def intercal_failed(airs_date, airs_chunk):
job_done_or_failed(airs_date, airs_chunk, 'failed') intercal_done_or_failed(airs_date, airs_chunk, 'failed')
# helper for connecting to the database
def connect():
return pgdb.connect(database='intercal')
# helper for getting the current UTC time in a PostgreSQL-friendly
# format
def get_time():
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
# helper for updating the database once execution is completed, # helper for updating the database once execution is completed,
# either with success or failure # either with success or failure
def job_done_or_failed(airs_date, airs_chunk, status): def intercal_done_or_failed(airs_date, airs_chunk, status):
# connect to the DB # connect to the DB
con = connect() con = connect()
...@@ -106,14 +106,108 @@ def job_done_or_failed(airs_date, airs_chunk, status): ...@@ -106,14 +106,108 @@ def job_done_or_failed(airs_date, airs_chunk, status):
# record the end-time of this execution # record the end-time of this execution
stop_time = get_time() stop_time = get_time()
cur.execute("update executions set stop_time = '%s' " % stop_time + cur.execute("update intercal_executions " +
"set stop_time = '%s' " % stop_time +
"where airs_date = '%s' and " % airs_date + "where airs_date = '%s' and " % airs_date +
"airs_chunk = '%s'" % airs_chunk) "airs_chunk = '%s'" % airs_chunk)
# update the jobs table so this job is no longer marked 'running' # update the jobs table so this job is no longer marked 'running'
cur.execute("update jobs set status = '%s' " % status + cur.execute("update intercal_jobs " +
"set status = '%s' " % status +
"where airs_date = '%s' and " % airs_date + "where airs_date = '%s' and " % airs_date +
"airs_chunk = %s" % airs_chunk) "airs_chunk = %s" % airs_chunk)
# don't forget to commit! # don't forget to commit!
con.commit() con.commit()
def start_groupers(airs_date, airs_chunk, system, system_id):
# connect to DB
con = connect()
cur = con.cursor()
# set all grouper jobs for this chunk to running
cur.execute("update grouper_jobs set status = 'running' " +
"where airs_date = '%s' and " % airs_date +
"airs_chunk = %s and " % airs_chunk +
"status = 'idle' " +
"returning grouper_id")
grouper_ids = [row[0] for row in cur]
# create a grouper_execution row
start_time = get_time()
cur.execute("insert into grouper_executions (type, system, id, " +
"start_time) " +
"values ('forward', " +
"'%s', '%s', " % (system,system_id) +
"'%s') " % start_time +
"returning execution_id")
execution_id = cur.fetchone()[0]
# create grouper execution mappings
# FIXME: it's tricky to follow what's going on with the
# format specifiers here
cur.executemany("insert into grouper_execution_mappings (execution_id, " +
"airs_date, " +
"airs_chunk, " +
"grouper_id) " +
"values (%s, " % execution_id +
"'%s', %s, " % (airs_date, airs_chunk) +
"%s)",
((grouper_id,) for grouper_id in grouper_ids))
# updates are done now, so commit
con.commit()
# get the names of the groupers so we can return them
# FIXME: better way to do this?
grouper_names = []
for grouper_id in grouper_ids:
cur.execute("select name from groupers " +
"where grouper_id = %s" % grouper_id)
grouper_names.append(cur.fetchone()[0])
return execution_id, grouper_ids, grouper_names
def groupers_done(airs_date, airs_chunk, execution_id, grouper_ids):
groupers_done_or_failed(airs_date, airs_chunk,
execution_id,
grouper_ids,
'done')
def groupers_failed(airs_date, airs_chunk, execution_id, grouper_ids):
groupers_done_or_failed(airs_date, airs_chunk,
execution_id,
grouper_ids,
'failed')
def groupers_done_or_failed(airs_date, airs_chunk,
execution_id,
grouper_ids,
status):
# connect to the DB
con = connect()
cur = con.cursor()
# record the end-time of this execution
stop_time = get_time()
cur.execute("update grouper_executions " +
"set stop_time = '%s' " % stop_time +
"where execution_id = %s" % execution_id)
# update the grouper_jobs table so these grouper jobs are no
# longer marked 'running'
# FIXME: it's tricky to follow what's going on with the
# format specifiers here
cur.executemany("update grouper_jobs " +
"set status = '%s' " % status +
"where airs_date = '%s' and " % airs_date +
"airs_chunk = %s and " % airs_chunk +
"grouper_id = %s",
((grouper_id,) for grouper_id in grouper_ids))
# don't forget to commit!
con.commit()
insert into groupers (name) values ('Test1');
insert into groupers (name) values ('Test2');
insert into groupers (name) values ('Test3');
insert into groupers (name) values ('Test4');
insert into groupers (name) values ('Test5');
import pgdb
con = pgdb.connect(database='intercal')
cur = con.cursor()
query = """
insert into intercal_jobs
select '2003-01-01'::date + s1.i, s2.i, 'held'
from generate_series(0, current_date - '2003-01-01') as s1(i),
generate_series(0, 23) as s2(i)
where s1.i > coalesce((select max(airs_date) - '2003-01-01'
from intercal_jobs),
-1)
"""
cur.execute(query)
print 'Inserted %s rows into intercal_jobs' % cur.rowcount
query = """
insert into grouper_jobs
select '2003-01-01'::date + s1.i, s2.i, %s, 'idle'
from generate_series(0, current_date - '2003-01-01') as s1(i),
generate_series(0, 23) as s2(i)
where s1.i > coalesce((select max(airs_date) - '2003-01-01'
from grouper_jobs
where grouper_id = %s),
-1)
"""
cur.execute("select grouper_id, name from groupers")
for grouper_id, grouper_name in cur:
cur2 = con.cursor()
cur2.execute(query % (grouper_id, grouper_id))
print 'Inserted %s rows into grouper_jobs for grouper %s' % (cur2.rowcount,
grouper_name)
con.commit()
...@@ -3,9 +3,19 @@ ...@@ -3,9 +3,19 @@
-- granules up through the current day. this should be run daily to -- granules up through the current day. this should be run daily to
-- keep the jobs table current -- keep the jobs table current
insert into jobs insert into intercal_jobs
select '2003-01-01'::date + s1.i, s2.i, 'held' select '2003-01-01'::date + s1.i, s2.i, 'held'
from generate_series(0, current_date - '2003-01-01') as s1(i), from generate_series(0, current_date - '2003-01-01') as s1(i),
generate_series(0, 23) as s2(i) generate_series(0, 23) as s2(i)
where s1.i > coalesce((select max(airs_date) - '2003-01-01' from jobs), where s1.i > coalesce((select max(airs_date) - '2003-01-01'
from intercal_jobs),
-1);
insert into grouper_jobs
select '2003-01-01'::date + s1.i, s2.i, 1, 'idle'
from generate_series(0, current_date - '2003-01-01') as s1(i),
generate_series(0, 23) as s2(i)
where s1.i > coalesce((select max(airs_date) - '2003-01-01'
from grouper_jobs
where grouper_id = 1),
-1); -1);
...@@ -66,7 +66,7 @@ def get_ancillary_file(file_type, t): ...@@ -66,7 +66,7 @@ def get_ancillary_file(file_type, t):
# get the type id for Steve's database and the filename prefix # get the type id for Steve's database and the filename prefix
# based on the file type # based on the file type
d = {'A': (8, 'PM1ATTRN'), 'E': (9, 'PM1EPHND')} d = {'A': (8, 'PM1ATTNR'), 'E': (9, 'PM1EPHND')}
db_type, prefix = d[file_type] db_type, prefix = d[file_type]
# see if we already have this ancillary file # see if we already have this ancillary file
......
...@@ -24,7 +24,7 @@ i_con = pgdb.connect(database='intercal') ...@@ -24,7 +24,7 @@ i_con = pgdb.connect(database='intercal')
i_cur = i_con.cursor() i_cur = i_con.cursor()
# determine the current range of AIRS granules to consider # determine the current range of AIRS granules to consider
i_cur.execute("select min(airs_date), max(airs_date) from jobs") i_cur.execute("select min(airs_date), max(airs_date) from intercal_jobs")
min_date, max_date = (parse_time(s) for s in i_cur.fetchone()) min_date, max_date = (parse_time(s) for s in i_cur.fetchone())
num_days = ((max_date - min_date) / 86400) + 1 num_days = ((max_date - min_date) / 86400) + 1
...@@ -86,7 +86,8 @@ eph_mask[offsets] = True ...@@ -86,7 +86,8 @@ eph_mask[offsets] = True
# below is dependent on the fact that we are processing the AIRS # below is dependent on the fact that we are processing the AIRS
# granules an hour at a time # granules an hour at a time
ready_list = [] ready_list = []
i_cur.execute("select airs_date, airs_chunk from jobs where status = 'held'") i_cur.execute("select airs_date, airs_chunk from intercal_jobs " +
"where status = 'held'")
for airs_date, airs_chunk in i_cur: for airs_date, airs_chunk in i_cur:
# determine the nominal time for this AIRS granule # determine the nominal time for this AIRS granule
...@@ -133,7 +134,7 @@ for airs_date, airs_chunk in i_cur: ...@@ -133,7 +134,7 @@ for airs_date, airs_chunk in i_cur:
# update the InterCal processing DB with the jobs we now consider # update the InterCal processing DB with the jobs we now consider
# ready # ready
i_cur.executemany("update jobs set status = 'ready' " + i_cur.executemany("update intercal_jobs set status = 'ready' " +
"where airs_date = %s and airs_chunk = %s", "where airs_date = %s and airs_chunk = %s",
ready_list) ready_list)
i_con.commit() i_con.commit()
# script to check Steve's database for InterCal "jobs" (groups of
# 10 AIRS granules) that are ready for processing. besides the AIRS
# L1B files themselves, we also need MODIS L1A files and the attitude
# and ephemeris files needed to process them into L1B. this needs to
# be run frequently to keep the database current
import calendar
import numpy
import pgdb
import subprocess
import time
# helper for parsing PostgreSQL dates
def parse_time(time_str):
return calendar.timegm(time.strptime(time_str, '%Y-%m-%d'))
# helper for getting the nominal granule time from an AIRS filename
def parse_airs_filename(filename):
return (calendar.timegm(time.strptime(filename[5:15], '%Y.%m.%d')) +
360 * (int(filename[16:19]) - 1))
# helper for going from "AIRS offset" to a date and chunk number
def print_time_and_chunk(t):
print '%s %s' % (time.strftime('%Y-%m-%d', time.gmtime(t)),
t % 86400 / (60 * 60))
# hard-code range of AIRS dates to consider
min_date = parse_time('2007-07-01')
max_date = parse_time('2008-06-30')
num_days = ((max_date - min_date) / 86400) + 1
# get available AIRS times via 'dmsls'
airs_mask = numpy.zeros((num_days * 240.), numpy.bool)
cmd = ('dmsls', 'AIRS.*')
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.stdout:
if line[:4] != 'AIRS':
continue
t = parse_airs_filename(line)
if (t < min_date) or (t >= (max_date + 86400)):
continue
airs_mask[(t - min_date) / (6 * 60)] = True
p.wait()
# connect to Steve's database
p_con = pgdb.connect(host='twister', database='PEATE_v3', password='abc')
p_cur = p_con.cursor()
# get times of all other needed files from the DB, sorted
min_date_str = time.strftime('%Y.%m.%d', time.gmtime(min_date))
max_date_str = time.strftime('%Y.%m.%d', time.gmtime(max_date))
p_cur.execute("select fn " +
"from files " +
"where type = 41 and " +
"date >= '%s' and " % min_date_str +
"date <= '%s' " % max_date_str +
"order by fn")
airs_times = numpy.array([parse_airs_filename(row[0]) for row in p_cur])
p_cur.execute("select unix_t " +
"from files " +
"where type < 4 and " +
"date >= '%s' and " % min_date_str +
"date <= '%s' " % max_date_str +
"order by unix_t")
mod_times = numpy.array(p_cur.fetchall()).flatten()
cut_date_min = min_date
cut_date_max = max_date + (24 * 60 * 60)
p_cur.execute("select unix_t " +
"from anc_files " +
"where type = 8 and " +
"unix_t >= %s and " % cut_date_min +
"unix_t < %s " % cut_date_max +
"order by unix_t")
att_times = numpy.array(p_cur.fetchall()).flatten()
cut_date_min -= (24 * 60 * 60)
p_cur.execute("select unix_t " +
"from anc_files " +
"where type = 9 and " +
"unix_t >= %s and " % cut_date_min +
"unix_t < %s " % cut_date_max +
"order by unix_t")
eph_times = numpy.array(p_cur.fetchall()).flatten()
# create masks of which MODIS, attitude, and ephemeris files are
# available
offsets = (mod_times - min_date) / (5 * 60)
mod_mask = numpy.zeros((num_days * 288,), numpy.bool)
mod_mask[offsets] = True
offsets = (att_times - min_date) / (2 * 60 * 60)
att_mask = numpy.zeros((num_days * 12,), numpy.bool)
att_mask[offsets] = True
offsets = (eph_times - min_date + (12 * 60 * 60)) / (24 * 60 * 60)
eph_mask = numpy.zeros((num_days + 1,), numpy.bool)
eph_mask[offsets] = True
# now loop through InterCal jobs labeled 'held' and for each check
# if all its needed dependencies are available. note that the code
# below is dependent on the fact that we are processing the AIRS
# granules an hour at a time
for airs_time in range(min_date, max_date + (24 * 60 * 60), (60 * 60)):
# determine the nominal time for this AIRS granule
airs_date = t / (24 * 60 * 60) * (24 * 60 * 60)
airs_chunk = t % (24 * 60 * 60) / (60 * 60)
# check for the AIRS file itself
airs_offset = (airs_time - min_date) / (6 * 60)
if not airs_mask[airs_offset]:
print_time_and_chunk(airs_time)
continue
# we need the 14 MODIS L1A files starting with the same nominal
# time as the first AIRS file (note that AIRS nominal times are
# about 5.5 minutes late compared to MODIS)
num_mod_files = 14
mod_offset_ini = (airs_time - min_date) / (5 * 60)
mod_offset_fin = mod_offset_ini + num_mod_files
if not mod_mask[mod_offset_ini:mod_offset_fin].all():
print_time_and_chunk(airs_time)
continue
# unless the MODIS files cross a two-hour boundary, they all fit
# between 2 attitude files. otherwise, we'll need 3
if (airs_chunk % 2) == 1:
num_att_files = 3
else:
num_att_files = 2
att_offset_ini = (airs_time - min_date) / (2 * 60 * 60)
att_offset_fin = att_offset_ini + num_att_files
if not att_mask[att_offset_ini:att_offset_fin].all():
print_time_and_chunk(airs_time)
continue
# unless the MODIS files cross noon UTC, the all fit between 2
# ephemeris files. otherwise, we'll need 3
if airs_chunk == 11:
num_eph_files = 3
else:
num_eph_files = 2
eph_offset_ini = (airs_time - min_date + (12 * 60 * 60)) / (24 * 60 * 60)
eph_offset_fin = eph_offset_ini + num_eph_files
if not eph_mask[eph_offset_ini:eph_offset_fin].all():
print_time_and_chunk(airs_time)
continue
-- SQL to initialize the InterCal processing database -- SQL to initialize the InterCal processing database
-- the jobs table contains one row for every hour of AIRS data. new create domain system text check (value = 'funnel' or value = 'peate');
-- jobs are added via make_jobs.sql. when jobs are first added they create domain chunk integer check (value >= 0 and value <= 23);
-- are placed in the 'held' status. when the ready_watch.py script
-- determines that all needed data is available, the job is moved to -- the intercal_jobs table contains one row for every hour of AIRS
-- 'ready'. it can then be claimed (set 'running') by a process in -- data. new jobs are added via make_jobs.sql. when jobs are first
-- the cluster or on funnel. after processing, it is set to 'failed' -- added they are placed in the 'held' status. when the
-- or 'done' depending on the result. these later 3 stages are -- ready_watch.py script determines that all needed data is
-- managed via the intercal_db.py module -- available, the job is moved to 'ready'. it can then be claimed
create domain status text check (value = 'held' or -- (set 'running') by a process in the cluster or on funnel. after
value = 'ready' or -- processing, it is set to 'failed' or 'done' depending on the
value = 'running' or -- result. these later 3 stages are managed via the intercal_db.py
value = 'failed' or -- module
value = 'done'); create domain intercal_status text check (value = 'held' or
create table jobs ( value = 'ready' or
value = 'running' or
value = 'failed' or
value = 'done');
create table intercal_jobs (
airs_date date, airs_date date,
airs_chunk integer check (airs_chunk >= 0 and airs_chunk <= 23), airs_chunk chunk,
status status not null, status intercal_status not null,
primary key (airs_date, airs_chunk) primary key (airs_date, airs_chunk)
); );
-- each time a process executes an InterCal job, it records an entry -- each time a process executes an InterCal job, it records an entry
-- in the executions table for posterity -- in the intercal_executions table
create domain system text check (value = 'funnel' or value = 'peate'); create table intercal_executions (
create table executions (
airs_date date, airs_date date,
airs_chunk integer, airs_chunk integer,
system system not null, system system not null,
id text not null, id text not null,
start_time timestamp not null, start_time timestamp not null,
stop_time timestamp, stop_time timestamp,
foreign key (airs_date, airs_chunk) references jobs foreign key (airs_date, airs_chunk) references intercal_jobs
);
create table groupers (
grouper_id serial primary key,
name text unique not null
);
-- TODO: is 'merged' needed?
create domain grouper_status text check (value = 'idle' or
value = 'running' or
value = 'done' or
value = 'failed');
create table grouper_jobs (
airs_date date,
airs_chunk chunk,
grouper_id integer references groupers,
status grouper_status not null,
primary key (airs_date, airs_chunk, grouper_id)
);
create domain grouper_type text check (value = 'forward' or
value = 'catch-up');
create table grouper_executions (
execution_id serial primary key,
type grouper_type not null,
system system not null,
id text not null,
start_time timestamp not null,
stop_time timestamp
);
create table grouper_execution_mappings (
execution_id integer references grouper_executions,
airs_date date,
airs_chunk chunk,
grouper_id integer,
foreign key (airs_date, airs_chunk, grouper_id) references grouper_jobs
);
create table grouper_catch_ups (
airs_date date primary key
); );
...@@ -23,14 +23,17 @@ def get_time(): ...@@ -23,14 +23,17 @@ def get_time():
if len(sys.argv) != 2: if len(sys.argv) != 2:
print 'usage: %s <execution_id>' % sys.argv[0] print 'usage: %s <execution_id>' % sys.argv[0]
sys.exit(1) sys.exit(1)
execution_id = sys.argv[1] condor_id = sys.argv[1]
# switch into the condor execute directory so our disk space will be # switch into the condor execute directory so our disk space will be
# cleaned up when we're done # cleaned up when we're done
os.chdir(os.environ['_CONDOR_SCRATCH_DIR']) os.chdir(os.environ['_CONDOR_SCRATCH_DIR'])
# get a job to work on # get a job to work on
airs_date, airs_chunk = intercal_db.get_forward_job('funnel', execution_id) airs_date, airs_chunk = intercal_db.get_forward_job('funnel', condor_id)
# DEBUG
return_code = 0
# run the driver # run the driver
year = airs_date[0:4] year = airs_date[0:4]
...@@ -45,11 +48,11 @@ cmd = ('python', ...@@ -45,11 +48,11 @@ cmd = ('python',
year, month, day, year, month, day,
str(start_gran_num), str(num_granules), str(start_gran_num), str(num_granules),
output_prefix) output_prefix)
return_code = subprocess.call(cmd) #return_code = subprocess.call(cmd)
# if the driver exited with failure, bail # if the driver exited with failure, bail
if return_code != 0: if return_code != 0:
intercal_db.job_failed(airs_date, airs_chunk) intercal_db.intercal_failed(airs_date, airs_chunk)
raise Exception('driver failed with return code %s' % return_code) raise Exception('driver failed with return code %s' % return_code)
# upload the files into the DMS; if any of these uploads fail, # upload the files into the DMS; if any of these uploads fail,
...@@ -61,10 +64,20 @@ for i in range(10): ...@@ -61,10 +64,20 @@ for i in range(10):
year, month, day, year, month, day,
start_gran_num + i) start_gran_num + i)
cmd = ('dmsput', filename) cmd = ('dmsput', filename)
return_code = subprocess.call(cmd) #return_code = subprocess.call(cmd)
if return_code != 0: if return_code != 0:
intercal_db.job_failed(airs_date, airs_chunk) intercal_db.intercal_failed(airs_date, airs_chunk)
raise Exception(' failed: %s' % return_code) raise Exception(' failed: %s' % return_code)
# looks like everything worked! # looks like everything worked!
intercal_db.job_done(airs_date, airs_chunk) intercal_db.intercal_done(airs_date, airs_chunk)
# now on to the groupers
# TODO: do something real here
(execution_id,
grouper_ids,
grouper_names) = intercal_db.start_groupers(airs_date, airs_chunk,
'funnel', condor_id)
for grouper_name in grouper_names:
print 'GROUPER: %s' % grouper_name
intercal_db.groupers_done(airs_date, airs_chunk, execution_id, grouper_ids)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment