diff --git a/agg_new.py b/agg_new.py new file mode 100644 index 0000000000000000000000000000000000000000..610d81926762e3a5fbb8c444a81b0dec16705eaa --- /dev/null +++ b/agg_new.py @@ -0,0 +1,226 @@ +import numpy +import os +import sys + +from pyhdf.SD import SD, SDC +from util import HdfWriter + +grouper_definitions = { + 'AIRS_BT_Hist': (Histogram, 200.0, 350.0, 150), + 'MODIS_BT_Hist': (Histogram, 200.0, 350.0, 150), + 'BT_Diff_Hist': (Histogram, -10.0, 10.0, 100), + 'AIRS_BT_By_Location': (Stats, + (-90.0, 90.0), + (-180.0, 180.0), + (180, 360)), + 'BT_Diff_By_Location': (Stats, + (-90.0, 90.0), + (-180.0, 180.0), + (180, 360)), + 'BT_Diff_By_AIRS_BT': (Stats, 200.0, 350.0, 150), + 'BT_Diff_By_Scan_Angle': (Stats, -55.0, 55.0, 110), + 'BT_Diff_By_Solar_Zenith': (Stats, 0.0, 180.0, 180) +} + +# helper functions to allow the classes below to be used with either +# one or two dimensions + +def histogram_init_helper(num_bins, dtype): + + if type(num_bins) == tuple: + return numpy.zeros(num_bins, dtype) + else: + return numpy.zeros((num_bins,), dtype) + +def histogram_helper(arr, **kwargs): + + if type(arr) == tuple: + return numpy.histogram2d(arr[0], arr[1], **kwargs)[0] + else: + return numpy.histogram(arr, **kwargs)[0] + +# class for building up a histogram in parts +class Histogram: + + def __init__(self, min_value, max_value, num_bins): + + self.range = (min_value, max_value) + self.num_bins = num_bins + self.hist = histogram_init_helper(num_bins, numpy.int32) + + def add(self, arr): + + self.hist += histogram_helper(arr, + bins=self.num_bins, + range=self.range) + + def get(self): + + return self.hist + +# class for taking statistics on some value (e.g., brightness +# temperature differences between AIRS and MODIS) in terms of some +# basis (e.g. Earth location or instrument scan angle). like +# Histogram, designed for building up these stats via successive +# calls to add() +class Stats: + + def __init__(self, min_basis, max_basis, num_bins): + + self.num_bins = num_bins + self.range = (min_basis, max_basis) + self.hist = Histogram(min_basis, max_basis, num_bins) + self.sum = histogram_init_helper(num_bins, numpy.float64) + self.squared_sum = histogram_init_helper(num_bins, numpy.float64) + + def add(self, basis, values): + + self.hist.add(basis) + + self.sum += histogram_helper(basis, + bins=self.num_bins, + range=self.range, + weights=values) + + squared_values = values * values + self.squared_sum += histogram_helper(basis, + bins=self.num_bins, + range=self.range, + weights=squared_values) + + def get(self): + + n = self.hist.get() + mean = self.sum / n + std = numpy.sqrt((self.squared_sum / n) - (mean * mean)) + return n, mean, std + +# helper class to save repetition for all the per-band data +# structures needed below +class PerBandHelper: + + def __init__(self, ctor, *args): + + self.data = [ctor(*args) for band in range(16)] + + def __getitem__(self, key): + + return self.data[key] + + def get(self): + + return self.data + +# verify command line inputs +if len(sys.argv) != 3: + print 'usage: python %s <input_dir> <output_file>' % sys.argv[0] + sys.exit(1) +input_dir = sys.argv[1] +output_file = sys.argv[2] + +# set up per-band histogram and stats objects +airs_bt_hist = PerBandHelper(Histogram, 200.0, 350.0, 150) +modis_bt_hist = PerBandHelper(Histogram, 200.0, 350.0, 150) +bt_diff_hist = PerBandHelper(Histogram, -10.0, 10.0, 100) +latlon_airs_stats = PerBandHelper(Stats, + (-90.0, 90.0), + (-180.0, 180.0), + (180, 360)) +latlon_diff_stats = PerBandHelper(Stats, + (-90.0, 90.0), + (-180.0, 180.0), + (180, 360)) +bt_diff_stats = PerBandHelper(Stats, 200.0, 350.0, 150) +scan_ang_diff_stats = PerBandHelper(Stats, -55.0, 55.0, 110) +sol_zen_diff_stats = PerBandHelper(Stats, 0.0, 180.0, 180) + +# loop over each granule in the input directory +for gran in range(1, 241): + + print 'Processing granule %d' % gran + + # make sure all the needed files are present + airs_file = '%s/intercal.a2m.2008.12.27.%03d.hdf' % (input_dir, gran) + modis_file = '%s/intercal.m2a.2008.12.27.%03d.hdf' % (input_dir, gran) + meta_file = '%s/intercal.meta.2008.12.27.%03d.hdf' % (input_dir, gran) + if not (os.path.exists(airs_file) and + os.path.exists(modis_file) and + os.path.exists(meta_file)): + print ' Granule %d missing' % gran + continue + + # grab AIRS data + airs_sd = SD(airs_file) + airs_bt = airs_sd.select('AIRS_Brightness_Temp').get() + airs_flags = airs_sd.select('AIRS_Flags').get() + + # MODIS data + modis_sd = SD(modis_file) + modis_bt = modis_sd.select('MODIS_Brightness_Temp').get() + modis_flags = modis_sd.select('MODIS_Flags').get() + + # "meta" data + meta_sd = SD(meta_file) + lats = meta_sd.select('Latitude').get() + lons = meta_sd.select('Longitude').get() + scan_ang = meta_sd.select('Scan_Angle').get() + sol_zen = meta_sd.select('Solar_Zenith').get() + + # AIRS - MODIS BTs + bt_diff = airs_bt - modis_bt + + # we'll mask off all pixels where the data is suspect for either + # instrument + # FIXME: should nan BTs be discarded or zeroed? + mask = numpy.logical_and(airs_flags == 0, modis_flags == 0) + mask = numpy.logical_and(mask, numpy.isnan(bt_diff) == False) + + # now update our per-band statistics + for band in range(16): + + # set up some shorthands for convenience and to avoid + # duplicate mask applications + m = mask[band] + abt = airs_bt[band][m] + mbt = modis_bt[band][m] + btd = bt_diff[band][m] + la = lats[m] + lo = lons[m] + sa = scan_ang[m] + sz = sol_zen[m] + + # do the actual updates + airs_bt_hist[band].add(abt) + modis_bt_hist[band].add(mbt) + bt_diff_hist[band].add(btd) + latlon_airs_stats[band].add((la, lo), abt) + latlon_diff_stats[band].add((la, lo), btd) + bt_diff_stats[band].add(abt, btd) + scan_ang_diff_stats[band].add(sa, btd) + sol_zen_diff_stats[band].add(sz, btd) + +# output results to HDF +# FIXME: handle the limits more coherently +writer = HdfWriter(output_file) +writer.write_seq('AIRS_BT_Hist', + [o.get() for o in airs_bt_hist.get()], + min_bucket=200.0, max_bucket=350.0) +writer.write_seq('MODIS_BT_Hist', + [o.get() for o in modis_bt_hist.get()], + min_bucket=200.0, max_bucket=350.0) +writer.write_seq('BT_Diff_Hist', + [o.get() for o in bt_diff_hist.get()], + min_bucket=-10.0, max_bucket=10.0) +writer.write_seq('AIRS_BT_By_Location', + [o.get()[1] for o in latlon_airs_stats.get()]) +writer.write_seq('BT_Diff_By_Location', + [o.get()[1] for o in latlon_diff_stats.get()]) +writer.write_seq('BT_Diff_By_AIRS_BT', + [o.get()[1] for o in bt_diff_stats.get()], + min_bucket=200.0, max_bucket=350.0) +writer.write_seq('BT_Diff_By_Scan_Angle', + [o.get()[1] for o in scan_ang_diff_stats.get()], + min_bucket=-55.0, max_bucket=55.0) +writer.write_seq('BT_Diff_By_Solar_Zenith', + [o.get()[1] for o in sol_zen_diff_stats.get()], + min_bucket=0.0, max_bucket=180.0) diff --git a/aggregate.py b/aggregate.py index f00546da60d8de1d7281ab5653e8840c95bec733..00f2f4180c17ffafdea813218331a9101bba3bbc 100644 --- a/aggregate.py +++ b/aggregate.py @@ -123,9 +123,9 @@ for gran in range(1, 241): print 'Processing granule %d' % gran # make sure all the needed files are present - airs_file = '%s/a2m_%03d.hdf' % (input_dir, gran) - modis_file = '%s/m2a_%03d.hdf' % (input_dir, gran) - meta_file = '%s/meta_%03d.hdf' % (input_dir, gran) + airs_file = '%s/intercal.a2m.2008.12.31.%03d.hdf' % (input_dir, gran) + modis_file = '%s/intercal.m2a.2008.12.31.%03d.hdf' % (input_dir, gran) + meta_file = '%s/intercal.meta.2008.12.31.%03d.hdf' % (input_dir, gran) if not (os.path.exists(airs_file) and os.path.exists(modis_file) and os.path.exists(meta_file)): diff --git a/grouper.py b/grouper.py new file mode 100644 index 0000000000000000000000000000000000000000..957faa4e24ef641474a760acb2aa9ecd73d38987 --- /dev/null +++ b/grouper.py @@ -0,0 +1,122 @@ +import glob +import sys + +# helper functions to allow the classes below to be used with either +# one or two dimensions + +def histogram_init_helper(num_bins, dtype): + + if type(num_bins) == tuple: + return numpy.zeros(num_bins, dtype) + else: + return numpy.zeros((num_bins,), dtype) + +def histogram_helper(arr, **kwargs): + + if type(arr) == tuple: + return numpy.histogram2d(arr[0], arr[1], **kwargs)[0] + else: + return numpy.histogram(arr, **kwargs)[0] + +# class for building up a histogram in parts +class Histogram: + + def __init__(self, min_value, max_value, num_bins): + + self.range = (min_value, max_value) + self.num_bins = num_bins + self.hist = histogram_init_helper(num_bins, numpy.int32) + + def add(self, arr): + + self.hist += histogram_helper(arr, + bins=self.num_bins, + range=self.range) + + def get(self): + + return self.hist + +# class for taking statistics on some value (e.g., brightness +# temperature differences between AIRS and MODIS) in terms of some +# basis (e.g. Earth location or instrument scan angle). like +# Histogram, designed for building up these stats via successive +# calls to add() +class Stats: + + def __init__(self, min_basis, max_basis, num_bins): + + self.num_bins = num_bins + self.range = (min_basis, max_basis) + self.hist = Histogram(min_basis, max_basis, num_bins) + self.sum = histogram_init_helper(num_bins, numpy.float64) + self.squared_sum = histogram_init_helper(num_bins, numpy.float64) + + def add(self, basis, values): + + self.hist.add(basis) + + self.sum += histogram_helper(basis, + bins=self.num_bins, + range=self.range, + weights=values) + + squared_values = values * values + self.squared_sum += histogram_helper(basis, + bins=self.num_bins, + range=self.range, + weights=squared_values) + + def get(self): + + n = self.hist.get() + mean = self.sum / n + std = numpy.sqrt((self.squared_sum / n) - (mean * mean)) + return n, mean, std + +# helper class to save repetition for all the per-band data +# structures needed below +class PerBandHelper: + + def __init__(self, ctor, *args): + + self.data = [ctor(*args) for band in range(16)] + + def __getitem__(self, key): + + return self.data[key] + + def get(self): + + return self.data + +grouper_definitions = { + 'AIRS_BT_Hist': (Histogram, 200.0, 350.0, 150), + 'MODIS_BT_Hist': (Histogram, 200.0, 350.0, 150), + 'BT_Diff_Hist': (Histogram, -10.0, 10.0, 100), + 'AIRS_BT_By_Location': (Stats, + (-90.0, 90.0), + (-180.0, 180.0), + (180, 360)), + 'BT_Diff_By_Location': (Stats, + (-90.0, 90.0), + (-180.0, 180.0), + (180, 360)), + 'BT_Diff_By_AIRS_BT': (Stats, 200.0, 350.0, 150), + 'BT_Diff_By_Scan_Angle': (Stats, -55.0, 55.0, 110), + 'BT_Diff_By_Solar_Zenith': (Stats, 0.0, 180.0, 180) +} + +# check command line +if len(sys.argv) < 5: + print 'usage: python %s <year> <month> <day> <grouper> ...' % sys.argv[0] + sys.exit(1) +year = int(sys.argv[1]) +month = int(sys.argv[2]) +day = int(sys.argv[3]) +grouper_names = sys.argv[4:] + +# loop over each granule in the input directory +files = glob.glob('intercal.*.%04d.%02d.%02d.*.hdf' % (year, month, day)) +files.sort() +for gran in range(1, 241): diff --git a/intercal.condor b/intercal.condor index eda30c4d74f79f830700d5ca959988bd28159114..e5b9b53f5681f2593d6dfab4710a42d08b6618c8 100644 --- a/intercal.condor +++ b/intercal.condor @@ -16,4 +16,4 @@ log = log.$(Cluster).$(Process) notification = never -queue 200 +queue diff --git a/intercal_db.py b/intercal_db.py index 9c49733034c52b00b901ea5735eee3c9c4d72a8f..855e1e6a26579ba5234cbbe69c16a8111e919363 100644 --- a/intercal_db.py +++ b/intercal_db.py @@ -4,13 +4,24 @@ import pgdb import time +# helper for connecting to the database +def connect(): + + return pgdb.connect(database='intercal') + +# helper for getting the current UTC time in a PostgreSQL-friendly +# format +def get_time(): + + return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) + # claim a job for forward processing. claiming a job just sets its # status as 'running' in the DB, but it is done in such a way as to # ensure that only one process can claim a given job. returns an # (airs_date, airs_chunk) pair. the processing system name ('funnel' # or 'peate' for starters) and an identifier within that system are # input parameters -def get_forward_job(system, execution_id): +def get_forward_job(system, system_id): # connect to DB con = connect() @@ -24,23 +35,23 @@ def get_forward_job(system, execution_id): # find the most recent 'ready' job cur.execute("select airs_date, airs_chunk " + - "from jobs " + + "from intercal_jobs " + "where status = 'ready' " + "order by airs_date desc, airs_chunk desc " + "limit 1") airs_date, airs_chunk = cur.fetchone() # try and claim it - if get_job(airs_date, airs_chunk, system, execution_id, con): + if get_intercal_job(airs_date, airs_chunk, system, system_id, con): return airs_date, airs_chunk # again, this really shouldn't happen raise Exception('could not get a job to execute') # claim the job with the given data and chunk number. system and -# execution_id are the same as for get_forward_job(). con can be +# system_id are the same as for get_forward_job(). con can be # set if a DB connection is already established -def get_job(airs_date, airs_chunk, system, execution_id, con=None): +def get_intercal_job(airs_date, airs_chunk, system, system_id, con=None): # connect to the DB if not already if con == None: @@ -54,7 +65,7 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): # no longer satisfied). if the other transaction is rolled back, # then our transaction will update one row cur = con.cursor() - cur.execute("update jobs set status = 'running' " + + cur.execute("update intercal_jobs set status = 'running' " + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s and " % airs_chunk + "status = 'ready'") @@ -64,11 +75,11 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): # we got the claim. now add a row to the executions table to # document our process start_time = get_time() - cur.execute("insert into executions (airs_date, airs_chunk, " + - "system, id, " + - "start_time) " + + cur.execute("insert into intercal_executions (airs_date, airs_chunk, " + + "system, id, " + + "start_time) " + "values ('%s', %s, " % (airs_date, airs_chunk) + - "'%s', '%s'," % (system, execution_id) + + "'%s', '%s'," % (system, system_id) + "'%s')" % start_time) # don't forgot to commit! @@ -76,29 +87,18 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): return True # for marking a job as successfuly completed -def job_done(airs_date, airs_chunk): +def intercal_done(airs_date, airs_chunk): - job_done_or_failed(airs_date, airs_chunk, 'done') + intercal_done_or_failed(airs_date, airs_chunk, 'done') # for marking a job as failed -def job_failed(airs_date, airs_chunk): +def intercal_failed(airs_date, airs_chunk): - job_done_or_failed(airs_date, airs_chunk, 'failed') - -# helper for connecting to the database -def connect(): - - return pgdb.connect(database='intercal') - -# helper for getting the current UTC time in a PostgreSQL-friendly -# format -def get_time(): - - return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) + intercal_done_or_failed(airs_date, airs_chunk, 'failed') # helper for updating the database once execution is completed, # either with success or failure -def job_done_or_failed(airs_date, airs_chunk, status): +def intercal_done_or_failed(airs_date, airs_chunk, status): # connect to the DB con = connect() @@ -106,14 +106,108 @@ def job_done_or_failed(airs_date, airs_chunk, status): # record the end-time of this execution stop_time = get_time() - cur.execute("update executions set stop_time = '%s' " % stop_time + + cur.execute("update intercal_executions " + + "set stop_time = '%s' " % stop_time + "where airs_date = '%s' and " % airs_date + "airs_chunk = '%s'" % airs_chunk) # update the jobs table so this job is no longer marked 'running' - cur.execute("update jobs set status = '%s' " % status + + cur.execute("update intercal_jobs " + + "set status = '%s' " % status + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s" % airs_chunk) # don't forget to commit! con.commit() + +def start_groupers(airs_date, airs_chunk, system, system_id): + + # connect to DB + con = connect() + cur = con.cursor() + + # set all grouper jobs for this chunk to running + cur.execute("update grouper_jobs set status = 'running' " + + "where airs_date = '%s' and " % airs_date + + "airs_chunk = %s and " % airs_chunk + + "status = 'idle' " + + "returning grouper_id") + grouper_ids = [row[0] for row in cur] + + # create a grouper_execution row + start_time = get_time() + cur.execute("insert into grouper_executions (type, system, id, " + + "start_time) " + + "values ('forward', " + + "'%s', '%s', " % (system,system_id) + + "'%s') " % start_time + + "returning execution_id") + execution_id = cur.fetchone()[0] + + # create grouper execution mappings + # FIXME: it's tricky to follow what's going on with the + # format specifiers here + cur.executemany("insert into grouper_execution_mappings (execution_id, " + + "airs_date, " + + "airs_chunk, " + + "grouper_id) " + + "values (%s, " % execution_id + + "'%s', %s, " % (airs_date, airs_chunk) + + "%s)", + ((grouper_id,) for grouper_id in grouper_ids)) + + # updates are done now, so commit + con.commit() + + # get the names of the groupers so we can return them + # FIXME: better way to do this? + grouper_names = [] + for grouper_id in grouper_ids: + cur.execute("select name from groupers " + + "where grouper_id = %s" % grouper_id) + grouper_names.append(cur.fetchone()[0]) + + return execution_id, grouper_ids, grouper_names + +def groupers_done(airs_date, airs_chunk, execution_id, grouper_ids): + + groupers_done_or_failed(airs_date, airs_chunk, + execution_id, + grouper_ids, + 'done') + +def groupers_failed(airs_date, airs_chunk, execution_id, grouper_ids): + + groupers_done_or_failed(airs_date, airs_chunk, + execution_id, + grouper_ids, + 'failed') + +def groupers_done_or_failed(airs_date, airs_chunk, + execution_id, + grouper_ids, + status): + + # connect to the DB + con = connect() + cur = con.cursor() + + # record the end-time of this execution + stop_time = get_time() + cur.execute("update grouper_executions " + + "set stop_time = '%s' " % stop_time + + "where execution_id = %s" % execution_id) + + # update the grouper_jobs table so these grouper jobs are no + # longer marked 'running' + # FIXME: it's tricky to follow what's going on with the + # format specifiers here + cur.executemany("update grouper_jobs " + + "set status = '%s' " % status + + "where airs_date = '%s' and " % airs_date + + "airs_chunk = %s and " % airs_chunk + + "grouper_id = %s", + ((grouper_id,) for grouper_id in grouper_ids)) + + # don't forget to commit! + con.commit() diff --git a/make_groupers.sql b/make_groupers.sql new file mode 100644 index 0000000000000000000000000000000000000000..852d63916b775042429ba521814970418232566d --- /dev/null +++ b/make_groupers.sql @@ -0,0 +1,5 @@ +insert into groupers (name) values ('Test1'); +insert into groupers (name) values ('Test2'); +insert into groupers (name) values ('Test3'); +insert into groupers (name) values ('Test4'); +insert into groupers (name) values ('Test5'); diff --git a/make_jobs.py b/make_jobs.py new file mode 100644 index 0000000000000000000000000000000000000000..b3917a34eb7d1f23b2481bd7ba8b03a269efa697 --- /dev/null +++ b/make_jobs.py @@ -0,0 +1,35 @@ +import pgdb + +con = pgdb.connect(database='intercal') +cur = con.cursor() + +query = """ +insert into intercal_jobs + select '2003-01-01'::date + s1.i, s2.i, 'held' + from generate_series(0, current_date - '2003-01-01') as s1(i), + generate_series(0, 23) as s2(i) + where s1.i > coalesce((select max(airs_date) - '2003-01-01' + from intercal_jobs), + -1) +""" +cur.execute(query) +print 'Inserted %s rows into intercal_jobs' % cur.rowcount + +query = """ +insert into grouper_jobs + select '2003-01-01'::date + s1.i, s2.i, %s, 'idle' + from generate_series(0, current_date - '2003-01-01') as s1(i), + generate_series(0, 23) as s2(i) + where s1.i > coalesce((select max(airs_date) - '2003-01-01' + from grouper_jobs + where grouper_id = %s), + -1) +""" +cur.execute("select grouper_id, name from groupers") +for grouper_id, grouper_name in cur: + cur2 = con.cursor() + cur2.execute(query % (grouper_id, grouper_id)) + print 'Inserted %s rows into grouper_jobs for grouper %s' % (cur2.rowcount, + grouper_name) + +con.commit() diff --git a/make_jobs.sql b/make_jobs.sql index 51b690a1352a0003f956906eff8284bac29860af..9b370f513d4f7a6fc9c70fa5573e5dea3a70ed5d 100644 --- a/make_jobs.sql +++ b/make_jobs.sql @@ -3,9 +3,19 @@ -- granules up through the current day. this should be run daily to -- keep the jobs table current -insert into jobs +insert into intercal_jobs select '2003-01-01'::date + s1.i, s2.i, 'held' from generate_series(0, current_date - '2003-01-01') as s1(i), generate_series(0, 23) as s2(i) - where s1.i > coalesce((select max(airs_date) - '2003-01-01' from jobs), + where s1.i > coalesce((select max(airs_date) - '2003-01-01' + from intercal_jobs), + -1); + +insert into grouper_jobs + select '2003-01-01'::date + s1.i, s2.i, 1, 'idle' + from generate_series(0, current_date - '2003-01-01') as s1(i), + generate_series(0, 23) as s2(i) + where s1.i > coalesce((select max(airs_date) - '2003-01-01' + from grouper_jobs + where grouper_id = 1), -1); diff --git a/modis_l1b.py b/modis_l1b.py index 949504a17778b743abe65780ee133f986337c9b0..017e2c4cd8d990bff9bb21eeca38ff47a0285e61 100644 --- a/modis_l1b.py +++ b/modis_l1b.py @@ -66,7 +66,7 @@ def get_ancillary_file(file_type, t): # get the type id for Steve's database and the filename prefix # based on the file type - d = {'A': (8, 'PM1ATTRN'), 'E': (9, 'PM1EPHND')} + d = {'A': (8, 'PM1ATTNR'), 'E': (9, 'PM1EPHND')} db_type, prefix = d[file_type] # see if we already have this ancillary file diff --git a/ready_watch.py b/ready_watch.py index 694d56919e9e363ef47f71d9d1324a93a876135f..448326486978dec27438a648c5fb360b660bfea0 100644 --- a/ready_watch.py +++ b/ready_watch.py @@ -24,7 +24,7 @@ i_con = pgdb.connect(database='intercal') i_cur = i_con.cursor() # determine the current range of AIRS granules to consider -i_cur.execute("select min(airs_date), max(airs_date) from jobs") +i_cur.execute("select min(airs_date), max(airs_date) from intercal_jobs") min_date, max_date = (parse_time(s) for s in i_cur.fetchone()) num_days = ((max_date - min_date) / 86400) + 1 @@ -86,7 +86,8 @@ eph_mask[offsets] = True # below is dependent on the fact that we are processing the AIRS # granules an hour at a time ready_list = [] -i_cur.execute("select airs_date, airs_chunk from jobs where status = 'held'") +i_cur.execute("select airs_date, airs_chunk from intercal_jobs " + + "where status = 'held'") for airs_date, airs_chunk in i_cur: # determine the nominal time for this AIRS granule @@ -133,7 +134,7 @@ for airs_date, airs_chunk in i_cur: # update the InterCal processing DB with the jobs we now consider # ready -i_cur.executemany("update jobs set status = 'ready' " + +i_cur.executemany("update intercal_jobs set status = 'ready' " + "where airs_date = %s and airs_chunk = %s", ready_list) i_con.commit() diff --git a/ready_watch_apsps.py b/ready_watch_apsps.py new file mode 100644 index 0000000000000000000000000000000000000000..cdf09c3e672c6f3f6a1d0fbf6dea0f31e9734673 --- /dev/null +++ b/ready_watch_apsps.py @@ -0,0 +1,148 @@ + +# script to check Steve's database for InterCal "jobs" (groups of +# 10 AIRS granules) that are ready for processing. besides the AIRS +# L1B files themselves, we also need MODIS L1A files and the attitude +# and ephemeris files needed to process them into L1B. this needs to +# be run frequently to keep the database current + +import calendar +import numpy +import pgdb +import subprocess +import time + +# helper for parsing PostgreSQL dates +def parse_time(time_str): + return calendar.timegm(time.strptime(time_str, '%Y-%m-%d')) + +# helper for getting the nominal granule time from an AIRS filename +def parse_airs_filename(filename): + return (calendar.timegm(time.strptime(filename[5:15], '%Y.%m.%d')) + + 360 * (int(filename[16:19]) - 1)) + +# helper for going from "AIRS offset" to a date and chunk number +def print_time_and_chunk(t): + print '%s %s' % (time.strftime('%Y-%m-%d', time.gmtime(t)), + t % 86400 / (60 * 60)) + +# hard-code range of AIRS dates to consider +min_date = parse_time('2007-07-01') +max_date = parse_time('2008-06-30') +num_days = ((max_date - min_date) / 86400) + 1 + +# get available AIRS times via 'dmsls' +airs_mask = numpy.zeros((num_days * 240.), numpy.bool) +cmd = ('dmsls', 'AIRS.*') +p = subprocess.Popen(cmd, stdout=subprocess.PIPE) +for line in p.stdout: + if line[:4] != 'AIRS': + continue + t = parse_airs_filename(line) + if (t < min_date) or (t >= (max_date + 86400)): + continue + airs_mask[(t - min_date) / (6 * 60)] = True +p.wait() + +# connect to Steve's database +p_con = pgdb.connect(host='twister', database='PEATE_v3', password='abc') +p_cur = p_con.cursor() + +# get times of all other needed files from the DB, sorted +min_date_str = time.strftime('%Y.%m.%d', time.gmtime(min_date)) +max_date_str = time.strftime('%Y.%m.%d', time.gmtime(max_date)) +p_cur.execute("select fn " + + "from files " + + "where type = 41 and " + + "date >= '%s' and " % min_date_str + + "date <= '%s' " % max_date_str + + "order by fn") +airs_times = numpy.array([parse_airs_filename(row[0]) for row in p_cur]) +p_cur.execute("select unix_t " + + "from files " + + "where type < 4 and " + + "date >= '%s' and " % min_date_str + + "date <= '%s' " % max_date_str + + "order by unix_t") +mod_times = numpy.array(p_cur.fetchall()).flatten() +cut_date_min = min_date +cut_date_max = max_date + (24 * 60 * 60) +p_cur.execute("select unix_t " + + "from anc_files " + + "where type = 8 and " + + "unix_t >= %s and " % cut_date_min + + "unix_t < %s " % cut_date_max + + "order by unix_t") +att_times = numpy.array(p_cur.fetchall()).flatten() +cut_date_min -= (24 * 60 * 60) +p_cur.execute("select unix_t " + + "from anc_files " + + "where type = 9 and " + + "unix_t >= %s and " % cut_date_min + + "unix_t < %s " % cut_date_max + + "order by unix_t") +eph_times = numpy.array(p_cur.fetchall()).flatten() + +# create masks of which MODIS, attitude, and ephemeris files are +# available + +offsets = (mod_times - min_date) / (5 * 60) +mod_mask = numpy.zeros((num_days * 288,), numpy.bool) +mod_mask[offsets] = True + +offsets = (att_times - min_date) / (2 * 60 * 60) +att_mask = numpy.zeros((num_days * 12,), numpy.bool) +att_mask[offsets] = True + +offsets = (eph_times - min_date + (12 * 60 * 60)) / (24 * 60 * 60) +eph_mask = numpy.zeros((num_days + 1,), numpy.bool) +eph_mask[offsets] = True + +# now loop through InterCal jobs labeled 'held' and for each check +# if all its needed dependencies are available. note that the code +# below is dependent on the fact that we are processing the AIRS +# granules an hour at a time +for airs_time in range(min_date, max_date + (24 * 60 * 60), (60 * 60)): + + # determine the nominal time for this AIRS granule + airs_date = t / (24 * 60 * 60) * (24 * 60 * 60) + airs_chunk = t % (24 * 60 * 60) / (60 * 60) + + # check for the AIRS file itself + airs_offset = (airs_time - min_date) / (6 * 60) + if not airs_mask[airs_offset]: + print_time_and_chunk(airs_time) + continue + + # we need the 14 MODIS L1A files starting with the same nominal + # time as the first AIRS file (note that AIRS nominal times are + # about 5.5 minutes late compared to MODIS) + num_mod_files = 14 + mod_offset_ini = (airs_time - min_date) / (5 * 60) + mod_offset_fin = mod_offset_ini + num_mod_files + if not mod_mask[mod_offset_ini:mod_offset_fin].all(): + print_time_and_chunk(airs_time) + continue + + # unless the MODIS files cross a two-hour boundary, they all fit + # between 2 attitude files. otherwise, we'll need 3 + if (airs_chunk % 2) == 1: + num_att_files = 3 + else: + num_att_files = 2 + att_offset_ini = (airs_time - min_date) / (2 * 60 * 60) + att_offset_fin = att_offset_ini + num_att_files + if not att_mask[att_offset_ini:att_offset_fin].all(): + print_time_and_chunk(airs_time) + continue + + # unless the MODIS files cross noon UTC, the all fit between 2 + # ephemeris files. otherwise, we'll need 3 + if airs_chunk == 11: + num_eph_files = 3 + else: + num_eph_files = 2 + eph_offset_ini = (airs_time - min_date + (12 * 60 * 60)) / (24 * 60 * 60) + eph_offset_fin = eph_offset_ini + num_eph_files + if not eph_mask[eph_offset_ini:eph_offset_fin].all(): + print_time_and_chunk(airs_time) + continue diff --git a/setup.sql b/setup.sql index 6e2dcc5b368aca818cca907c8467d554f1b62b0e..c143fe13c5880416dad284ee7d9137d545ed25e8 100644 --- a/setup.sql +++ b/setup.sql @@ -1,35 +1,79 @@ -- SQL to initialize the InterCal processing database --- the jobs table contains one row for every hour of AIRS data. new --- jobs are added via make_jobs.sql. when jobs are first added they --- are placed in the 'held' status. when the ready_watch.py script --- determines that all needed data is available, the job is moved to --- 'ready'. it can then be claimed (set 'running') by a process in --- the cluster or on funnel. after processing, it is set to 'failed' --- or 'done' depending on the result. these later 3 stages are --- managed via the intercal_db.py module -create domain status text check (value = 'held' or - value = 'ready' or - value = 'running' or - value = 'failed' or - value = 'done'); -create table jobs ( +create domain system text check (value = 'funnel' or value = 'peate'); +create domain chunk integer check (value >= 0 and value <= 23); + +-- the intercal_jobs table contains one row for every hour of AIRS +-- data. new jobs are added via make_jobs.sql. when jobs are first +-- added they are placed in the 'held' status. when the +-- ready_watch.py script determines that all needed data is +-- available, the job is moved to 'ready'. it can then be claimed +-- (set 'running') by a process in the cluster or on funnel. after +-- processing, it is set to 'failed' or 'done' depending on the +-- result. these later 3 stages are managed via the intercal_db.py +-- module +create domain intercal_status text check (value = 'held' or + value = 'ready' or + value = 'running' or + value = 'failed' or + value = 'done'); +create table intercal_jobs ( airs_date date, - airs_chunk integer check (airs_chunk >= 0 and airs_chunk <= 23), - status status not null, + airs_chunk chunk, + status intercal_status not null, primary key (airs_date, airs_chunk) ); -- each time a process executes an InterCal job, it records an entry --- in the executions table for posterity -create domain system text check (value = 'funnel' or value = 'peate'); -create table executions ( +-- in the intercal_executions table +create table intercal_executions ( airs_date date, airs_chunk integer, system system not null, id text not null, start_time timestamp not null, stop_time timestamp, - foreign key (airs_date, airs_chunk) references jobs + foreign key (airs_date, airs_chunk) references intercal_jobs +); + +create table groupers ( + grouper_id serial primary key, + name text unique not null +); + +-- TODO: is 'merged' needed? +create domain grouper_status text check (value = 'idle' or + value = 'running' or + value = 'done' or + value = 'failed'); +create table grouper_jobs ( + airs_date date, + airs_chunk chunk, + grouper_id integer references groupers, + status grouper_status not null, + primary key (airs_date, airs_chunk, grouper_id) +); + +create domain grouper_type text check (value = 'forward' or + value = 'catch-up'); +create table grouper_executions ( + execution_id serial primary key, + type grouper_type not null, + system system not null, + id text not null, + start_time timestamp not null, + stop_time timestamp +); + +create table grouper_execution_mappings ( + execution_id integer references grouper_executions, + airs_date date, + airs_chunk chunk, + grouper_id integer, + foreign key (airs_date, airs_chunk, grouper_id) references grouper_jobs +); + +create table grouper_catch_ups ( + airs_date date primary key ); diff --git a/wrapper_funnel.py b/wrapper_funnel.py index 97ee8af91fd0584a06e6c3aae75ae93c1f6d11fa..e55fbab8266ba8a682ff0d03035f7343090ed417 100644 --- a/wrapper_funnel.py +++ b/wrapper_funnel.py @@ -23,14 +23,17 @@ def get_time(): if len(sys.argv) != 2: print 'usage: %s <execution_id>' % sys.argv[0] sys.exit(1) -execution_id = sys.argv[1] +condor_id = sys.argv[1] # switch into the condor execute directory so our disk space will be # cleaned up when we're done os.chdir(os.environ['_CONDOR_SCRATCH_DIR']) # get a job to work on -airs_date, airs_chunk = intercal_db.get_forward_job('funnel', execution_id) +airs_date, airs_chunk = intercal_db.get_forward_job('funnel', condor_id) + +# DEBUG +return_code = 0 # run the driver year = airs_date[0:4] @@ -45,11 +48,11 @@ cmd = ('python', year, month, day, str(start_gran_num), str(num_granules), output_prefix) -return_code = subprocess.call(cmd) +#return_code = subprocess.call(cmd) # if the driver exited with failure, bail if return_code != 0: - intercal_db.job_failed(airs_date, airs_chunk) + intercal_db.intercal_failed(airs_date, airs_chunk) raise Exception('driver failed with return code %s' % return_code) # upload the files into the DMS; if any of these uploads fail, @@ -61,10 +64,20 @@ for i in range(10): year, month, day, start_gran_num + i) cmd = ('dmsput', filename) - return_code = subprocess.call(cmd) + #return_code = subprocess.call(cmd) if return_code != 0: - intercal_db.job_failed(airs_date, airs_chunk) + intercal_db.intercal_failed(airs_date, airs_chunk) raise Exception(' failed: %s' % return_code) # looks like everything worked! -intercal_db.job_done(airs_date, airs_chunk) +intercal_db.intercal_done(airs_date, airs_chunk) + +# now on to the groupers +# TODO: do something real here +(execution_id, + grouper_ids, + grouper_names) = intercal_db.start_groupers(airs_date, airs_chunk, + 'funnel', condor_id) +for grouper_name in grouper_names: + print 'GROUPER: %s' % grouper_name +intercal_db.groupers_done(airs_date, airs_chunk, execution_id, grouper_ids)